use d_engine_server::{RocksDBStateMachine, RocksDBStorageEngine};
use std::sync::Arc;
use std::time::Duration;
use d_engine_server::EmbeddedEngine;
use tracing::info;
use tracing_test::traced_test;
use crate::common::get_available_ports;
use crate::common::node_config;
#[tokio::test]
#[traced_test]
async fn test_scale_single_to_cluster() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let db_root_dir = temp_dir.path().join("db");
let log_dir = temp_dir.path().join("logs");
let mut port_guard = get_available_ports(3).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
info!("Phase 1: Starting single-node mode");
let node1_config = format!(
r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[0],
ports[0],
db_root_dir.display(),
log_dir.display()
);
let node1_config_path = "/tmp/scale_test_node1.toml";
tokio::fs::write(node1_config_path, &node1_config).await?;
let config = node_config(&node1_config);
let storage_path = config.cluster.db_root_dir.join("node1/storage");
let sm_path = config.cluster.db_root_dir.join("node1/state_machine");
tokio::fs::create_dir_all(&storage_path).await?;
tokio::fs::create_dir_all(&sm_path).await?;
let storage1 = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let sm1 = Arc::new(RocksDBStateMachine::new(sm_path)?);
let engine1 = EmbeddedEngine::start_custom(storage1, sm1, Some(node1_config_path)).await?;
let leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Node 1 became leader: {} (term {})",
leader.leader_id, leader.term
);
assert_eq!(leader.leader_id, 1, "Node 1 should be leader");
engine1.client().put(b"dev-key".to_vec(), b"dev-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
assert_eq!(val.as_deref(), Some(b"dev-value".as_ref()));
info!("Data written to single-node cluster");
info!("Phase 2: Starting node 2 and node 3 to join cluster");
let node2_config = format!(
r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[1],
ports[0],
ports[1],
db_root_dir.display(),
log_dir.display()
);
let node2_config_path = "/tmp/scale_test_node2.toml";
tokio::fs::write(node2_config_path, &node2_config).await?;
let config2 = node_config(&node2_config);
let storage_path2 = config2.cluster.db_root_dir.join("node2/storage");
let sm_path2 = config2.cluster.db_root_dir.join("node2/state_machine");
tokio::fs::create_dir_all(&storage_path2).await?;
tokio::fs::create_dir_all(&sm_path2).await?;
let storage2 = Arc::new(RocksDBStorageEngine::new(storage_path2)?);
let sm2 = Arc::new(RocksDBStateMachine::new(sm_path2)?);
let engine2 = EmbeddedEngine::start_custom(storage2, sm2, Some(node2_config_path)).await?;
info!("Node 2 started, joining as Learner");
tokio::time::sleep(Duration::from_secs(3)).await;
let node3_config = format!(
r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 5000
"#,
ports[2],
ports[0],
ports[1],
ports[2],
db_root_dir.display(),
log_dir.display()
);
let node3_config_path = "/tmp/scale_test_node3.toml";
tokio::fs::write(node3_config_path, &node3_config).await?;
let config3 = node_config(&node3_config);
let storage_path3 = config3.cluster.db_root_dir.join("node3/storage");
let sm_path3 = config3.cluster.db_root_dir.join("node3/state_machine");
tokio::fs::create_dir_all(&storage_path3).await?;
tokio::fs::create_dir_all(&sm_path3).await?;
let storage3 = Arc::new(RocksDBStorageEngine::new(storage_path3)?);
let sm3 = Arc::new(RocksDBStateMachine::new(sm_path3)?);
let engine3 = EmbeddedEngine::start_custom(storage3, sm3, Some(node3_config_path)).await?;
info!("Node 3 started, joining as Learner");
tokio::time::sleep(Duration::from_secs(5)).await;
info!("Phase 3: Verifying cluster health");
let current_leader = engine1.wait_ready(Duration::from_secs(2)).await?;
assert_eq!(
current_leader.leader_id, 1,
"Node 1 should remain as leader"
);
info!("Verified: Node 1 is still leader after expansion");
let old_val = engine1.client().get_linearizable(b"dev-key".to_vec()).await?;
assert_eq!(
old_val.as_deref(),
Some(b"dev-value".as_ref()),
"Old data should be preserved"
);
info!("Verified: Old data preserved");
engine1.client().put(b"cluster-key".to_vec(), b"cluster-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
let val = engine.client().get_eventual(b"cluster-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"cluster-value".as_ref()),
"Node {} should have replicated data",
i + 1
);
}
info!("Verified: All 3 nodes can read replicated data");
engine3.stop().await?;
engine2.stop().await?;
engine1.stop().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_leader_failover_after_dynamic_scaling() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let db_root_dir = temp_dir.path().join("db");
let log_dir = temp_dir.path().join("logs");
let mut port_guard = get_available_ports(3).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
let db_root = format!("{}_failover", db_root_dir.display());
let log_dir = format!("{}_failover", log_dir.display());
info!("Phase 1: Starting single-node cluster");
let node1_config = format!(
r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }}
]
db_root_dir = '{}'
log_dir = '{}'
[raft]
general_raft_timeout_duration_in_ms = 100
[raft.election]
election_timeout_min = 300
election_timeout_max = 600
"#,
ports[0], ports[0], db_root, log_dir
);
let node1_config_path = "/tmp/failover_test_node1.toml";
tokio::fs::write(node1_config_path, &node1_config).await?;
let config1 = node_config(&node1_config);
let node1_db_root = config1.cluster.db_root_dir.join("node1");
let storage_path1 = node1_db_root.join("storage");
let sm_path1 = node1_db_root.join("state_machine");
tokio::fs::create_dir_all(&storage_path1).await?;
tokio::fs::create_dir_all(&sm_path1).await?;
let storage1 = Arc::new(RocksDBStorageEngine::new(storage_path1)?);
let sm1 = Arc::new(RocksDBStateMachine::new(sm_path1)?);
let engine1 = EmbeddedEngine::start_custom(storage1, sm1, Some(node1_config_path)).await?;
let initial_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Phase 1 Complete: Node {} elected as leader (term {})",
initial_leader.leader_id, initial_leader.term
);
assert_eq!(
initial_leader.leader_id, 1,
"Node 1 should be initial leader"
);
engine1.client().put(b"phase1-key".to_vec(), b"phase1-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let val = engine1.client().get_linearizable(b"phase1-key".to_vec()).await?;
assert_eq!(val.as_deref(), Some(b"phase1-value".as_ref()));
info!("Phase 1: Baseline data written successfully");
info!("Phase 2: Expanding to 3-node cluster");
let node2_config = format!(
r#"
[cluster]
node_id = 2
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'
[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
ports[1], ports[0], ports[1]
);
let node2_config_path = "/tmp/failover_test_node2.toml";
tokio::fs::write(node2_config_path, &node2_config).await?;
let config2 = node_config(&node2_config);
let node2_db_root = config2.cluster.db_root_dir.join("node2");
let storage_path2 = node2_db_root.join("storage");
let sm_path2 = node2_db_root.join("state_machine");
tokio::fs::create_dir_all(&storage_path2).await?;
tokio::fs::create_dir_all(&sm_path2).await?;
let storage2 = Arc::new(RocksDBStorageEngine::new(storage_path2)?);
let sm2 = Arc::new(RocksDBStateMachine::new(sm_path2)?);
let engine2 = EmbeddedEngine::start_custom(storage2, sm2, Some(node2_config_path)).await?;
info!("Node 2 started as Learner");
tokio::time::sleep(Duration::from_secs(3)).await;
let node3_config = format!(
r#"
[cluster]
node_id = 3
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 3, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 4, status = 1 }}
]
db_root_dir = '{db_root}'
log_dir = '{log_dir}'
[raft]
general_raft_timeout_duration_in_ms = 5000
[raft.election]
election_timeout_min = 3000
election_timeout_max = 6000
"#,
ports[2], ports[0], ports[1], ports[2]
);
let node3_config_path = "/tmp/failover_test_node3.toml";
tokio::fs::write(node3_config_path, &node3_config).await?;
let config3 = node_config(&node3_config);
let node3_db_root = config3.cluster.db_root_dir.join("node3");
let storage_path3 = node3_db_root.join("storage");
let sm_path3 = node3_db_root.join("state_machine");
tokio::fs::create_dir_all(&storage_path3).await?;
tokio::fs::create_dir_all(&sm_path3).await?;
let storage3 = Arc::new(RocksDBStorageEngine::new(storage_path3)?);
let sm3 = Arc::new(RocksDBStateMachine::new(sm_path3)?);
let engine3 = EmbeddedEngine::start_custom(storage3, sm3, Some(node3_config_path)).await?;
info!("Node 3 started as Learner");
tokio::time::sleep(Duration::from_secs(15)).await;
let leader_before_failover = engine1.wait_ready(Duration::from_secs(2)).await?;
assert_eq!(
leader_before_failover.leader_id, 1,
"Node 1 should still be leader"
);
engine1.client().put(b"phase2-key".to_vec(), b"phase2-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine1, &engine2, &engine3].iter().enumerate() {
let val = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"phase2-value".as_ref()),
"Node {} should have phase2 data",
i + 1
);
}
info!("Phase 2 Complete: 3-node cluster operational, data replicated");
info!("Phase 3: Simulating leader crash (stopping Node 1)");
engine1.stop().await?;
info!("Node 1 stopped - cluster should detect leader failure and start election");
tokio::time::sleep(Duration::from_secs(8)).await;
info!("Phase 4: Verifying new leader election");
let new_leader = engine2.wait_ready(Duration::from_secs(5)).await?;
info!(
"New leader elected: Node {} (term {})",
new_leader.leader_id, new_leader.term
);
assert_ne!(
new_leader.leader_id, 1,
"New leader cannot be the crashed Node 1"
);
assert!(
new_leader.leader_id == 2 || new_leader.leader_id == 3,
"New leader must be Node 2 or Node 3"
);
assert!(
new_leader.term > initial_leader.term,
"New leader must have higher term than initial leader"
);
let new_leader_engine = if new_leader.leader_id == 2 {
&engine2
} else {
&engine3
};
info!("Phase 5: Verifying service continuity with new leader");
new_leader_engine
.client()
.put(b"phase3-key".to_vec(), b"phase3-value".to_vec())
.await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (i, engine) in [&engine2, &engine3].iter().enumerate() {
let node_id = if i == 0 { 2 } else { 3 };
let val1 = engine.client().get_eventual(b"phase1-key".to_vec()).await?;
assert_eq!(
val1.as_deref(),
Some(b"phase1-value".as_ref()),
"Node {node_id} should preserve phase1 data"
);
let val2 = engine.client().get_eventual(b"phase2-key".to_vec()).await?;
assert_eq!(
val2.as_deref(),
Some(b"phase2-value".as_ref()),
"Node {node_id} should preserve phase2 data"
);
let val3 = engine.client().get_eventual(b"phase3-key".to_vec()).await?;
assert_eq!(
val3.as_deref(),
Some(b"phase3-value".as_ref()),
"Node {node_id} should have replicated phase3 data"
);
}
info!("Phase 5 Complete: All data preserved, cluster operational with 2/3 nodes");
info!("Phase 6: Restarting Node 1 as follower and verifying linearizable read");
let node1_db_root = std::path::PathBuf::from(&db_root).join("node1");
let node1_storage_path = node1_db_root.join("storage");
let node1_sm_path = node1_db_root.join("state_machine");
let node1_storage = Arc::new(RocksDBStorageEngine::new(node1_storage_path)?);
let node1_state_machine = Arc::new(RocksDBStateMachine::new(node1_sm_path)?);
let node1_config_str = format!(
r#"
[cluster]
node_id = 1
listen_address = '127.0.0.1:{}'
initial_cluster = [
{{ id = 1, name = 'n1', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 2, name = 'n2', address = '127.0.0.1:{}', role = 1, status = 3 }},
{{ id = 3, name = 'n3', address = '127.0.0.1:{}', role = 1, status = 3 }}
]
db_root_dir = '{}'
[raft]
election_timeout_ms = 150
heartbeat_interval_ms = 50
"#,
ports[0], ports[0], ports[1], ports[2], db_root
);
let node1_config_path = "/tmp/d-engine-test-node1-phase6.toml".to_string();
tokio::fs::write(&node1_config_path, &node1_config_str).await?;
let engine1 =
EmbeddedEngine::start_custom(node1_storage, node1_state_machine, Some(&node1_config_path))
.await?;
info!("Node 1 restarted, waiting for leader recognition");
tokio::time::sleep(Duration::from_secs(2)).await;
let node1_leader = engine1.wait_ready(Duration::from_secs(5)).await?;
info!(
"Node 1 back in cluster: leader is {} (term {})",
node1_leader.leader_id, node1_leader.term
);
assert!(
node1_leader.leader_id == 2 || node1_leader.leader_id == 3,
"Node 1 should recognize current leader"
);
info!("Phase 6a: Waiting for Node 1 to sync data after rejoin");
let phase1_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase1-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase1_val.as_deref(),
Some(b"phase1-value".as_ref()),
"Node 1 should have phase1 data after sync"
);
let phase2_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase2-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase2_val.as_deref(),
Some(b"phase2-value".as_ref()),
"Node 1 should have phase2 data after sync"
);
let phase3_val = {
let mut val = None;
for _ in 0..20 {
val = engine1.client().get_eventual(b"phase3-key".to_vec()).await?;
if val.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
val
};
assert_eq!(
phase3_val.as_deref(),
Some(b"phase3-value".as_ref()),
"Node 1 should have phase3 data after sync"
);
info!("Phase 6a Complete: All data synced to Node 1 after rejoin");
info!("Phase 6b: Verifying 3-node cluster consistency");
new_leader_engine
.client()
.put(b"phase6-key".to_vec(), b"phase6-value".to_vec())
.await?;
tokio::time::sleep(Duration::from_millis(500)).await;
for (engine, node_id) in [
(&engine1, 1),
(new_leader_engine, new_leader.leader_id),
(&engine3, 3),
] {
if node_id == new_leader.leader_id && new_leader.leader_id != 3 {
continue; }
let val = engine.client().get_eventual(b"phase6-key".to_vec()).await?;
assert_eq!(
val.as_deref(),
Some(b"phase6-value".as_ref()),
"Node {node_id} should have phase6 data"
);
}
info!("Phase 6b Complete: 3-node cluster fully synchronized");
info!("Phase 6 Complete: Node 1 rejoin and linearizable read validation passed");
engine3.stop().await?;
engine2.stop().await?;
info!("Test completed successfully - dynamic scaling with leader failover validated");
Ok(())
}