use d_engine_server::EmbeddedEngine;
use std::time::Duration;
use tracing_test::traced_test;
#[tokio::test]
async fn test_single_node_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let data_dir = temp_dir.path().join("db");
unsafe {
std::env::set_var("RAFT__CLUSTER__NODE_ID", "1");
std::env::set_var("RAFT__CLUSTER__LISTEN_ADDRESS", "127.0.0.1:9001");
std::env::set_var("RAFT__CLUSTER__DB_ROOT_DIR", data_dir.to_str().unwrap());
}
let engine = EmbeddedEngine::start().await?;
unsafe {
std::env::remove_var("RAFT__CLUSTER__NODE_ID");
std::env::remove_var("RAFT__CLUSTER__LISTEN_ADDRESS");
std::env::remove_var("RAFT__CLUSTER__DB_ROOT_DIR");
}
let leader_info = engine.wait_ready(Duration::from_secs(5)).await?;
assert_eq!(
leader_info.leader_id, 1,
"Single node should elect itself as leader"
);
assert!(leader_info.term >= 1, "Term should be at least 1");
let client = engine.client();
let put_result = client.put(b"test-key".to_vec(), b"test-value".to_vec()).await;
assert!(
put_result.is_ok(),
"Put operation failed: {:?}",
put_result.err()
);
tokio::time::sleep(Duration::from_millis(100)).await;
let value = client.get_linearizable(b"test-key".to_vec()).await?;
assert_eq!(
value.as_deref(),
Some(b"test-value".as_ref()),
"Get after put should return the value"
);
client.delete(b"test-key".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let deleted = client.get_linearizable(b"test-key".to_vec()).await?;
assert_eq!(deleted, None, "Get after delete should return None");
engine.stop().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_leader_notification() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let data_dir = temp_dir.path().join("db");
unsafe {
std::env::set_var("RAFT__CLUSTER__NODE_ID", "1");
std::env::set_var("RAFT__CLUSTER__LISTEN_ADDRESS", "127.0.0.1:9002");
std::env::set_var("RAFT__CLUSTER__DB_ROOT_DIR", data_dir.to_str().unwrap());
}
let engine = EmbeddedEngine::start().await?;
unsafe {
std::env::remove_var("RAFT__CLUSTER__NODE_ID");
std::env::remove_var("RAFT__CLUSTER__LISTEN_ADDRESS");
std::env::remove_var("RAFT__CLUSTER__DB_ROOT_DIR");
}
let leader_info = engine.wait_ready(Duration::from_secs(5)).await?;
assert_eq!(
leader_info.leader_id, 1,
"Single node should elect itself as leader"
);
let leader_rx = engine.leader_change_notifier();
let leader = *leader_rx.borrow();
assert!(leader.is_some(), "Leader should already be elected");
engine.stop().await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_data_persistence() -> Result<(), Box<dyn std::error::Error>> {
use std::time::Duration;
use d_engine_server::EmbeddedEngine;
let temp_dir = tempfile::tempdir()?;
let data_dir = temp_dir.path().join("db");
unsafe {
std::env::set_var("RAFT__CLUSTER__NODE_ID", "1");
std::env::set_var("RAFT__CLUSTER__LISTEN_ADDRESS", "127.0.0.1:9003");
std::env::set_var("RAFT__CLUSTER__DB_ROOT_DIR", data_dir.to_str().unwrap());
}
{
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;
engine.client().put(b"persist-key".to_vec(), b"persist-value".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
engine.stop().await?;
}
tokio::time::sleep(Duration::from_millis(100)).await;
{
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let value = engine.client().get_linearizable(b"persist-key".to_vec()).await?;
assert_eq!(
value.as_deref(),
Some(b"persist-value".as_ref()),
"Data should persist across restarts"
);
engine.stop().await?;
}
unsafe {
std::env::remove_var("RAFT__CLUSTER__NODE_ID");
std::env::remove_var("RAFT__CLUSTER__LISTEN_ADDRESS");
std::env::remove_var("RAFT__CLUSTER__DB_ROOT_DIR");
}
Ok(())
}