#[cfg(test)]
mod embedded_engine_tests {
use std::sync::Arc;
use std::time::Duration;
use crate::api::EmbeddedEngine;
use crate::storage::FileStateMachine;
use crate::storage::FileStorageEngine;
async fn create_test_storage_and_sm() -> (
Arc<FileStorageEngine>,
Arc<FileStateMachine>,
tempfile::TempDir,
) {
unsafe {
std::env::remove_var("CONFIG_PATH");
}
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let storage_path = temp_dir.path().join("storage");
let sm_path = temp_dir.path().join("sm");
std::fs::create_dir_all(&storage_path).unwrap();
std::fs::create_dir_all(&sm_path).unwrap();
let storage =
Arc::new(FileStorageEngine::new(storage_path).expect("Failed to create storage"));
let sm =
Arc::new(FileStateMachine::new(sm_path).await.expect("Failed to create state machine"));
(storage, sm, temp_dir)
}
#[tokio::test]
async fn test_wait_ready_single_node_success() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let result = engine.wait_ready(Duration::from_secs(5)).await;
assert!(
result.is_ok(),
"Leader election should succeed in single-node mode"
);
let leader_info = result.unwrap();
assert_eq!(
leader_info.leader_id, 1,
"Single node should elect itself as leader"
);
assert!(leader_info.term > 0, "Term should be positive");
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_wait_ready_timeout() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let very_short_timeout = Duration::from_nanos(1);
let _ = engine.wait_ready(very_short_timeout).await;
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_leader_change_notifier_basic() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let mut leader_rx = engine.leader_change_notifier();
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
tokio::time::timeout(Duration::from_secs(1), leader_rx.changed())
.await
.expect("Should receive leader notification within timeout")
.expect("Should receive change event");
let current_leader = *leader_rx.borrow();
assert!(current_leader.is_some(), "Leader should be elected");
if let Some(info) = current_leader {
assert_eq!(info.leader_id, 1);
assert!(info.term > 0);
}
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_ready_and_wait_ready_sequence() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let start = std::time::Instant::now();
let leader_info = engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let duration = start.elapsed();
assert!(
duration < Duration::from_secs(2),
"Leader election should be fast in single-node"
);
assert_eq!(leader_info.leader_id, 1);
assert!(leader_info.term > 0);
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_client_available_after_wait_ready() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let client = engine.client();
let result = client.put(b"test_key".to_vec(), b"test_value".to_vec()).await;
assert!(
result.is_ok(),
"Put operation should succeed after leader election"
);
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_multiple_leader_change_notifier_subscribers() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let mut rx1 = engine.leader_change_notifier();
let mut rx2 = engine.leader_change_notifier();
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
tokio::time::timeout(Duration::from_secs(1), rx1.changed())
.await
.expect("Subscriber 1 should receive within timeout")
.expect("Subscriber 1 should receive change");
tokio::time::timeout(Duration::from_secs(1), rx2.changed())
.await
.expect("Subscriber 2 should receive within timeout")
.expect("Subscriber 2 should receive change");
let leader1 = *rx1.borrow();
let leader2 = *rx2.borrow();
assert_eq!(leader1, leader2, "Both subscribers should see same leader");
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_engine_stop_cleans_up() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let stop_result = engine.stop().await;
assert!(stop_result.is_ok(), "Stop should succeed");
}
#[tokio::test]
async fn test_wait_ready_race_condition_already_elected() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let first_result = engine.wait_ready(Duration::from_secs(5)).await;
assert!(first_result.is_ok(), "First wait_ready should succeed");
let first_info = first_result.unwrap();
let second_start = std::time::Instant::now();
let second_result = engine.wait_ready(Duration::from_secs(5)).await;
let second_duration = second_start.elapsed();
assert!(second_result.is_ok(), "Second wait_ready should succeed");
let second_info = second_result.unwrap();
assert!(
second_duration < Duration::from_millis(100),
"wait_ready should return immediately when leader already elected, took {second_duration:?}"
);
assert_eq!(first_info.leader_id, second_info.leader_id);
assert_eq!(first_info.term, second_info.term);
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_wait_ready_multiple_calls_concurrent() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = Arc::new(
EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine"),
);
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Initial leader election should succeed");
let mut handles = vec![];
for _ in 0..10 {
let engine_clone = engine.clone();
let handle = tokio::spawn(async move {
let start = std::time::Instant::now();
let result = engine_clone.wait_ready(Duration::from_secs(5)).await;
let duration = start.elapsed();
(result, duration)
});
handles.push(handle);
}
for handle in handles {
let (result, duration) = handle.await.expect("Task should not panic");
assert!(result.is_ok(), "wait_ready should succeed");
assert!(
duration < Duration::from_millis(100),
"Should return immediately, took {duration:?}"
);
}
Arc::try_unwrap(engine)
.ok()
.expect("Arc should have single owner")
.stop()
.await
.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_wait_ready_check_current_value_first() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let leader_rx = engine.leader_change_notifier();
let current_leader = *leader_rx.borrow();
assert!(current_leader.is_some(), "Current leader should be set");
let start = std::time::Instant::now();
let result = engine.wait_ready(Duration::from_secs(5)).await;
let duration = start.elapsed();
assert!(result.is_ok(), "Should succeed");
assert!(
duration < Duration::from_millis(50),
"Should check current value first and return immediately, took {duration:?}"
);
engine.stop().await.expect("Failed to stop engine");
}
#[cfg(feature = "rocksdb")]
mod config_validation_tests {
use serial_test::serial;
use super::*;
#[tokio::test]
#[cfg(debug_assertions)]
#[serial(tmp_db)]
async fn test_start_without_config_path_env_allows_in_debug() {
unsafe {
std::env::remove_var("CONFIG_PATH");
}
let _ = std::fs::remove_dir_all("/tmp/db");
let result = EmbeddedEngine::start().await;
assert!(
result.is_ok(),
"start() should allow default /tmp/db in debug mode without CONFIG_PATH. Error: {:?}",
result.as_ref().err()
);
if let Ok(engine) = result {
engine.stop().await.ok();
}
let _ = std::fs::remove_dir_all("/tmp/db");
}
#[tokio::test]
#[cfg(not(debug_assertions))]
#[serial]
async fn test_start_without_config_path_env_rejects_in_release() {
unsafe {
std::env::remove_var("CONFIG_PATH");
}
let result = EmbeddedEngine::start().await;
assert!(
result.is_err(),
"start() should reject default /tmp/db in release mode without CONFIG_PATH"
);
if let Err(e) = result {
let err_msg = format!("{:?}", e);
assert!(err_msg.contains("/tmp/db") || err_msg.contains("db_root_dir"));
}
}
#[tokio::test]
async fn test_start_with_valid_config() {
let _temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let config_path = _temp_dir.path().join("test_config.toml");
let data_dir = _temp_dir.path().join("data");
let config_content = format!(
r#"
[cluster]
node_id = 1
db_root_dir = "{}"
[cluster.rpc]
listen_addr = "127.0.0.1:0"
[raft]
heartbeat_interval_ms = 500
election_timeout_min_ms = 1500
election_timeout_max_ms = 3000
"#,
data_dir.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let result = EmbeddedEngine::start_with(config_path.to_str().unwrap()).await;
assert!(
result.is_ok(),
"start_with() should succeed with valid config"
);
if let Ok(engine) = result {
engine.stop().await.ok();
}
}
#[tokio::test]
async fn test_start_with_nonexistent_config() {
let result = EmbeddedEngine::start_with("/nonexistent/config.toml").await;
assert!(
result.is_err(),
"start_with() should fail with nonexistent config"
);
}
#[tokio::test]
#[cfg(debug_assertions)]
#[serial(tmp_db)]
async fn test_start_with_tmp_db_allows_in_debug() {
let _temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let config_path = _temp_dir.path().join("test_config.toml");
let _ = std::fs::remove_dir_all("/tmp/db");
let config_content = r#"
[cluster]
node_id = 1
db_root_dir = "/tmp/db"
[cluster.rpc]
listen_addr = "127.0.0.1:0"
"#;
std::fs::write(&config_path, config_content).expect("Failed to write config");
let result = EmbeddedEngine::start_with(config_path.to_str().unwrap()).await;
assert!(
result.is_ok(),
"start_with() should allow /tmp/db in debug mode"
);
if let Ok(engine) = result {
engine.stop().await.ok();
}
let _ = std::fs::remove_dir_all("/tmp/db");
}
#[tokio::test]
#[cfg(not(debug_assertions))]
#[serial]
async fn test_start_with_tmp_db_rejects_in_release() {
let _temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let config_path = _temp_dir.path().join("test_config.toml");
let config_content = r#"
[cluster]
node_id = 1
db_root_dir = "/tmp/db"
[cluster.rpc]
listen_addr = "127.0.0.1:0"
"#;
std::fs::write(&config_path, config_content).expect("Failed to write config");
let result = EmbeddedEngine::start_with(config_path.to_str().unwrap()).await;
assert!(
result.is_err(),
"start_with() should reject /tmp/db in release mode"
);
if let Err(e) = result {
let err_msg = format!("{:?}", e);
assert!(err_msg.contains("/tmp/db") || err_msg.contains("db_root_dir"));
}
}
#[tokio::test]
async fn test_drop_without_stop_warning() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
drop(engine);
}
}
#[cfg(feature = "watch")]
mod watch_tests {
use serial_test::serial;
use super::*;
#[tokio::test]
#[serial]
async fn test_watch_registers_successfully() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let result = engine.client().watch(b"test_key");
assert!(
result.is_ok(),
"watch() should succeed when feature is enabled"
);
if let Ok(mut handle) = result {
tokio::time::sleep(Duration::from_millis(100)).await;
let client = engine.client();
client
.put(b"test_key".to_vec(), b"value1".to_vec())
.await
.expect("Put should succeed");
let event =
tokio::time::timeout(Duration::from_secs(2), handle.receiver_mut().recv())
.await
.expect("Should receive event within timeout");
assert!(event.is_some(), "Should receive watch event");
}
engine.stop().await.expect("Failed to stop engine");
}
}
#[cfg(feature = "watch")]
mod watch_tempdir_tests {
use super::*;
#[tokio::test]
async fn test_watch_with_tempdir_dropped() {
let (storage, sm) = {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let storage_path = temp_dir.path().join("storage");
let sm_path = temp_dir.path().join("sm");
std::fs::create_dir_all(&storage_path).unwrap();
std::fs::create_dir_all(&sm_path).unwrap();
let storage = Arc::new(
FileStorageEngine::new(storage_path).expect("Failed to create storage"),
);
let sm = Arc::new(
FileStateMachine::new(sm_path).await.expect("Failed to create state machine"),
);
(storage, sm)
};
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let _ = engine.wait_ready(Duration::from_secs(5)).await;
let watch_result = engine.client().watch(b"test_key");
tokio::time::sleep(Duration::from_millis(500)).await;
let put_result = engine.client().put(b"test_key".to_vec(), b"value1".to_vec()).await;
assert!(
put_result.is_err(),
"PUT should fail after storage path is removed: engine must crash on HardState I/O failure"
);
if let Ok(mut handle) = watch_result {
let event_result =
tokio::time::timeout(Duration::from_secs(1), handle.receiver_mut().recv())
.await;
assert!(
event_result.is_err() || event_result.unwrap().is_none(),
"Watch should not receive events after engine crash"
);
}
let _ = engine.stop().await;
}
#[tokio::test]
async fn test_watch_with_tempdir_alive() {
println!("\n=== Testing Watch with TempDir ALIVE ===");
let _temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let storage_path = _temp_dir.path().join("storage");
let sm_path = _temp_dir.path().join("sm");
std::fs::create_dir_all(&storage_path).unwrap();
std::fs::create_dir_all(&sm_path).unwrap();
let storage =
Arc::new(FileStorageEngine::new(storage_path).expect("Failed to create storage"));
let sm = Arc::new(
FileStateMachine::new(sm_path).await.expect("Failed to create state machine"),
);
println!("Created storage and SM, TempDir kept alive");
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
println!("Engine started and leader elected");
let result = engine.client().watch(b"test_key");
println!("Watch registration result: {:?}", result.is_ok());
assert!(result.is_ok(), "watch() should succeed");
if let Ok(mut handle) = result {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("Performing PUT operation...");
let client = engine.client();
client
.put(b"test_key".to_vec(), b"value1".to_vec())
.await
.expect("Put should succeed");
println!("PUT succeeded, waiting for watch event...");
let event_result =
tokio::time::timeout(Duration::from_secs(2), handle.receiver_mut().recv())
.await;
match event_result {
Ok(Some(_)) => {
println!("✅ Watch event RECEIVED (expected!)");
}
Ok(None) => {
println!("❌ Watch channel closed (unexpected)");
panic!("Watch channel should not be closed");
}
Err(_) => {
println!("❌ Watch event TIMEOUT (unexpected)");
panic!("Watch event should have been received");
}
}
}
engine.stop().await.expect("Failed to stop engine");
}
}
#[tokio::test]
async fn test_is_leader_single_node() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
assert!(
engine.is_leader(),
"Single node should be the leader after election"
);
let info = engine.leader_info();
assert!(info.is_some(), "Leader info should be available");
assert_eq!(info.unwrap().leader_id, 1, "Leader ID should be 1");
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_leader_info_before_election() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
let initial_is_leader = engine.is_leader();
let initial_info = engine.leader_info();
assert!(
initial_is_leader == initial_info.is_some(),
"is_leader() and leader_info() should be consistent"
);
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
assert!(engine.is_leader(), "Should be leader after wait_ready");
assert!(
engine.leader_info().is_some(),
"Leader info must be available after wait_ready"
);
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_is_leader_concurrent_access() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = Arc::new(
EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine"),
);
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let results = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = vec![];
for _ in 0..10 {
let engine_clone = Arc::clone(&engine);
let results_clone = Arc::clone(&results);
let handle = tokio::spawn(async move {
for _ in 0..100 {
let is_leader = engine_clone.is_leader();
let info = engine_clone.leader_info();
results_clone.lock().unwrap().push((is_leader, info));
assert_eq!(
is_leader,
info.is_some(),
"is_leader() and leader_info() should be consistent"
);
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task should not panic");
}
{
let results = results.lock().unwrap();
let first_info = results[0].1;
for (is_leader, info) in results.iter() {
assert!(*is_leader, "All calls should see this node as leader");
assert_eq!(
*info, first_info,
"All concurrent calls should return same LeaderInfo"
);
}
}
let engine = Arc::try_unwrap(engine).unwrap_or_else(|arc| {
panic!(
"Failed to unwrap Arc, remaining references: {}",
Arc::strong_count(&arc)
)
});
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_leader_info_consistency() {
let (storage, sm, _temp_dir) = create_test_storage_and_sm().await;
let engine = EmbeddedEngine::start_custom(storage, sm, None)
.await
.expect("Failed to start engine");
engine
.wait_ready(Duration::from_secs(5))
.await
.expect("Leader should be elected");
let info1 = engine.leader_info();
let info2 = engine.leader_info();
let info3 = engine.leader_info();
assert_eq!(info1, info2, "leader_info() should be consistent");
assert_eq!(info2, info3, "leader_info() should be consistent");
if let Some(info) = info1 {
assert_eq!(info.leader_id, 1);
assert!(info.term > 0);
} else {
panic!("Leader info should be available");
}
engine.stop().await.expect("Failed to stop engine");
}
}