use d_engine_server::RocksDBUnifiedEngine;
use std::sync::Arc;
use std::time::Duration;
use d_engine_server::EmbeddedEngine;
use tracing::info;
use tracing_test::traced_test;
use crate::common::create_rejoin_node_config;
use crate::common::get_available_ports;
use crate::common::node_config;
#[tokio::test]
#[traced_test]
async fn test_scale_single_to_cluster() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let db_root_dir = temp_dir.path().join("db");
let log_dir = temp_dir.path().join("logs");
let mut port_guard = get_available_ports(3).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
info!("Phase 1: Starting single-node mode");
let node1_config = format!(
r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[0],
ports[0],
db_root_dir.display(),
log_dir.display()
);
let node1_config_path = "/tmp/scale_test_node1.toml";
tokio::fs::write(node1_config_path, &node1_config).await?;
let config = node_config(&node1_config);
let db_path = config.cluster.db_root_dir.join("node1/db");
tokio::fs::create_dir_all(&db_path).await?;
let (storage1, sm1) = RocksDBUnifiedEngine::open(&db_path)?;
let engine1 =
EmbeddedEngine::start_custom(Arc::new(storage1), Arc::new(sm1), Some(node1_config_path))
.await?;
let leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Node 1 became leader: {} (term {})",
leader.leader_id, leader.term
);
assert_eq!(leader.leader_id, 1, "Node 1 should be leader");
engine1.client().put(b"dev-key".to_vec(), b"dev-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
assert_eq!(val.as_deref(), Some(b"dev-value".as_ref()));
info!("Data written to single-node cluster");
info!("Phase 2: Starting node 2 and node 3 to join cluster");
let node2_config = format!(
r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[1],
ports[0],
ports[1],
db_root_dir.display(),
log_dir.display()
);
let node2_config_path = "/tmp/scale_test_node2.toml";
tokio::fs::write(node2_config_path, &node2_config).await?;
let config2 = node_config(&node2_config);
let db_path2 = config2.cluster.db_root_dir.join("node2/db");
tokio::fs::create_dir_all(&db_path2).await?;
let (storage2, sm2) = RocksDBUnifiedEngine::open(&db_path2)?;
let engine2 =
EmbeddedEngine::start_custom(Arc::new(storage2), Arc::new(sm2), Some(node2_config_path))
.await?;
info!("Node 2 started, joining as Learner");
tokio::time::sleep(Duration::from_secs(3)).await;
let node3_config = format!(
r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[2],
ports[0],
ports[1],
ports[2],
db_root_dir.display(),
log_dir.display()
);
let node3_config_path = "/tmp/scale_test_node3.toml";
tokio::fs::write(node3_config_path, &node3_config).await?;
let config3 = node_config(&node3_config);
let db_path3 = config3.cluster.db_root_dir.join("node3/db");
tokio::fs::create_dir_all(&db_path3).await?;
let (storage3, sm3) = RocksDBUnifiedEngine::open(&db_path3)?;
let engine3 =
EmbeddedEngine::start_custom(Arc::new(storage3), Arc::new(sm3), Some(node3_config_path))
.await?;
info!("Node 3 started, joining as Learner");
tokio::time::sleep(Duration::from_secs(5)).await;
info!("Phase 3: Verifying cluster health");
let current_leader = engine1.wait_ready(Duration::from_secs(2)).await?;
assert_eq!(
current_leader.leader_id, 1,
"Node 1 should remain as leader"
);
info!("Verified: Node 1 is still leader after expansion");
let old_val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
assert_eq!(
old_val.as_deref(),
Some(b"dev-value".as_ref()),
"Old data should be preserved"
);
info!("Verified: Old data preserved");
engine1.client().put(b"cluster-key".to_vec(), b"cluster-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
let val = engine.client().get_eventual(b"cluster-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"cluster-value".as_ref()),
"Node {} should have replicated data",
i + 1
);
}
info!("Verified: All 3 nodes can read replicated data");
engine3.stop().await?;
engine2.stop().await?;
engine1.stop().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_leader_failover_after_dynamic_scaling() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let db_root_dir = temp_dir.path().join("db");
let log_dir = temp_dir.path().join("logs");
let mut port_guard = get_available_ports(3).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
let db_root = format!("{}_failover", db_root_dir.display());
let log_dir = format!("{}_failover", log_dir.display());
info!("Phase 1: Starting single-node cluster");
let node1_config = format!(
r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 100
[raft.election]
election_timeout_min = 300
election_timeout_max = 600
"#,
ports[0], ports[0], db_root, log_dir
);
let node1_config_path = "/tmp/failover_test_node1.toml";
tokio::fs::write(node1_config_path, &node1_config).await?;
let config1 = node_config(&node1_config);
let node1_db_root = config1.cluster.db_root_dir.join("node1");
let db_path1 = node1_db_root.join("db");
tokio::fs::create_dir_all(&db_path1).await?;
let (storage1, sm1) = RocksDBUnifiedEngine::open(&db_path1)?;
let engine1 =
EmbeddedEngine::start_custom(Arc::new(storage1), Arc::new(sm1), Some(node1_config_path))
.await?;
let initial_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Phase 1 Complete: Node {} elected as leader (term {})",
initial_leader.leader_id, initial_leader.term
);
assert_eq!(
initial_leader.leader_id, 1,
"Node 1 should be initial leader"
);
engine1.client().put(b"phase1-key".to_vec(), b"phase1-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let val = engine1.client().get_linearizable(b"phase1-key".to_vec()).await?;
assert_eq!(val.as_deref(), Some(b"phase1-value".as_ref()));
info!("Phase 1: Baseline data written successfully");
info!("Phase 2: Expanding to 3-node cluster");
let node2_config = format!(
r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'
[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
ports[1], ports[0], ports[1]
);
let node2_config_path = "/tmp/failover_test_node2.toml";
tokio::fs::write(node2_config_path, &node2_config).await?;
let config2 = node_config(&node2_config);
let node2_db_root = config2.cluster.db_root_dir.join("node2");
let db_path2 = node2_db_root.join("db");
tokio::fs::create_dir_all(&db_path2).await?;
let (storage2, sm2) = RocksDBUnifiedEngine::open(&db_path2)?;
let engine2 =
EmbeddedEngine::start_custom(Arc::new(storage2), Arc::new(sm2), Some(node2_config_path))
.await?;
info!("Node 2 started as Learner");
tokio::time::sleep(Duration::from_secs(3)).await;
let node3_config = format!(
r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'
[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
ports[2], ports[0], ports[1], ports[2]
);
let node3_config_path = "/tmp/failover_test_node3.toml";
tokio::fs::write(node3_config_path, &node3_config).await?;
let config3 = node_config(&node3_config);
let node3_db_root = config3.cluster.db_root_dir.join("node3");
let db_path3 = node3_db_root.join("db");
tokio::fs::create_dir_all(&db_path3).await?;
let (storage3, sm3) = RocksDBUnifiedEngine::open(&db_path3)?;
let engine3 =
EmbeddedEngine::start_custom(Arc::new(storage3), Arc::new(sm3), Some(node3_config_path))
.await?;
info!("Node 3 started as Learner");
tokio::time::sleep(Duration::from_secs(15)).await;
let leader_before_failover = engine1.wait_ready(Duration::from_secs(2)).await?;
assert_eq!(
leader_before_failover.leader_id, 1,
"Node 1 should still be leader"
);
engine1.client().put(b"phase2-key".to_vec(), b"phase2-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
let val = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"phase2-value".as_ref()),
"Node {} should have phase2 data",
i + 1
);
}
info!("Phase 2 Complete: 3-node cluster operational, data replicated");
println!("\n========== PRE-FAILOVER STATE CHECK ==========");
println!("About to stop Node 1 (current leader)");
println!("Node 2 and Node 3 should have been promoted to Voters by now");
println!("==============================================\n");
info!("Phase 3: Simulating leader crash (stopping Node 1)");
engine1.stop().await?;
info!("Node 1 stopped - cluster should detect leader failure and start election");
tokio::time::sleep(Duration::from_secs(2)).await;
println!("\n========== 2s AFTER NODE 1 STOPPED ==========");
println!("Node 2 and Node 3 should detect leader failure");
println!("If they are Voters with election timers, they should start election soon");
println!("==============================================\n");
tokio::time::sleep(Duration::from_secs(6)).await;
info!("Phase 4: Verifying new leader election");
let new_leader = engine2.wait_ready(Duration::from_secs(5)).await?;
info!(
"New leader elected: Node {} (term {})",
new_leader.leader_id, new_leader.term
);
assert_ne!(
new_leader.leader_id, 1,
"New leader cannot be the crashed Node 1"
);
assert!(
new_leader.leader_id == 2 || new_leader.leader_id == 3,
"New leader must be Node 2 or Node 3"
);
assert!(
new_leader.term > initial_leader.term,
"New leader must have higher term than initial leader"
);
let new_leader_engine = if new_leader.leader_id == 2 {
&engine2
} else {
&engine3
};
info!("Phase 5: Verifying service continuity with new leader");
new_leader_engine
.client()
.put(b"phase3-key".to_vec(), b"phase3-value".to_vec())
.await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine2, &engine3].iter().enumerate() {
let node_id = if i == 0 { 2 } else { 3 };
let val1 = engine.client().get_eventual(b"phase1-key".to_vec()).await?;
assert_eq!(
val1.as_deref(),
Some(b"phase1-value".as_ref()),
"Node {node_id} should preserve phase1 data"
);
let val2 = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
assert_eq!(
val2.as_deref(),
Some(b"phase2-value".as_ref()),
"Node {node_id} should preserve phase2 data"
);
let val3 = engine.client().get_eventual(b"phase3-key".to_vec()).await?;
assert_eq!(
val3.as_deref(),
Some(b"phase3-value".as_ref()),
"Node {node_id} should have replicated phase3 data"
);
}
info!("Phase 5 Complete: All data preserved, cluster operational with 2/3 nodes");
info!("Phase 6: Restarting Node 1 as follower and verifying linearizable read");
let node1_db_root = std::path::PathBuf::from(&db_root).join("node1");
let node1_db_path = node1_db_root.join("db");
let (node1_storage, node1_state_machine) = RocksDBUnifiedEngine::open(&node1_db_path)?;
let node1_config_str = create_rejoin_node_config(
1,
ports[0],
&[(1, ports[0]), (2, ports[1]), (3, ports[2])],
&db_root,
);
let node1_config_path = "/tmp/d-engine-test-node1-phase6.toml".to_string();
tokio::fs::write(&node1_config_path, &node1_config_str).await?;
let engine1 = EmbeddedEngine::start_custom(
Arc::new(node1_storage),
Arc::new(node1_state_machine),
Some(&node1_config_path),
)
.await?;
info!("Node 1 restarted, waiting for leader recognition");
tokio::time::sleep(Duration::from_secs(2)).await;
let node1_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Node 1 back in cluster: leader is {} (term {})",
node1_leader.leader_id, node1_leader.term
);
assert!(
node1_leader.leader_id == 2 || node1_leader.leader_id == 3,
"Node 1 should recognize current leader"
);
info!("Phase 6a: Waiting for Node 1 to sync data after rejoin");
let phase1_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase1-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase1_val.as_deref(),
Some(b"phase1-value".as_ref()),
"Node 1 should have phase1 data after sync"
);
let phase2_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase2-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase2_val.as_deref(),
Some(b"phase2-value".as_ref()),
"Node 1 should have phase2 data after sync"
);
let phase3_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase3-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase3_val.as_deref(),
Some(b"phase3-value".as_ref()),
"Node 1 should have phase3 data after sync"
);
info!("Phase 6a Complete: All data synced to Node 1 after rejoin");
info!("Phase 6b: Verifying 3-node cluster consistency");
new_leader_engine
.client()
.put(b"phase6-key".to_vec(), b"phase6-value".to_vec())
.await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (engine, node_id) in [
(&engine1, 1),
(new_leader_engine, new_leader.leader_id),
(&engine3, 3),
] {
if node_id == new_leader.leader_id && new_leader.leader_id != 3 {
continue; }
let val = engine.client().get_eventual(b"phase6-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"phase6-value".as_ref()),
"Node {node_id} should have phase6 data"
);
}
info!("Phase 6b Complete: 3-node cluster fully synchronized");
info!("Phase 6 Complete: Node 1 rejoin and linearizable read validation passed");
engine3.stop().await?;
engine2.stop().await?;
info!("Test completed successfully - dynamic scaling with leader failover validated");
Ok(())
}