use crate::common::{create_node_config, get_available_ports, node_config};
use d_engine_server::EmbeddedEngine;
use d_engine_server::RocksDBStateMachine;
use d_engine_server::RocksDBStorageEngine;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::Instant;
use tracing_test::traced_test;
async fn create_test_engine(test_name: &str) -> (EmbeddedEngine, TempDir) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join(test_name);
let config_path = temp_dir.path().join("d-engine.toml");
let mut port_guard = get_available_ports(1).await;
port_guard.release_listeners();
let port = port_guard.as_slice()[0];
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft.read_consistency]
state_machine_sync_timeout_ms = 2000
"#,
port,
db_path.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let engine = EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Engine not ready");
(engine, temp_dir)
}
#[tokio::test]
#[traced_test]
async fn test_linearizable_read_consistency_with_writes() {
let (engine, _temp_dir) = create_test_engine("concurrent_writes").await;
let client = engine.client();
let mut last_seen_value = 0;
for i in 1..=10 {
client.put(b"counter", i.to_string().as_bytes()).await.expect("Write failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result = client.get_linearizable(b"counter").await.expect("Read failed");
let value_str = String::from_utf8(result.unwrap().to_vec()).expect("Invalid UTF-8");
let current_value: u32 = value_str.parse().expect("Invalid number");
assert!(
current_value >= last_seen_value,
"Read returned stale value: saw {current_value}, expected >= {last_seen_value}"
);
last_seen_value = current_value;
}
assert_eq!(
last_seen_value, 10,
"Final linearizable read should see last committed value"
);
println!("✅ Linearizable read consistency verified across 10 writes");
}
#[tokio::test]
#[traced_test]
async fn test_read_index_with_noop_tracking() {
let (engine, _temp_dir) = create_test_engine("fixed_index").await;
let client = engine.client();
client.put(b"test_key", b"test_value").await.expect("PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let result = client.get_linearizable(b"test_key").await.expect("Linearizable read failed");
assert_eq!(
result.as_deref(),
Some(b"test_value".as_ref()),
"Linearizable read should return correct value"
);
println!("✅ Linearizable read with noop tracking succeeded");
}
#[tokio::test]
#[traced_test]
async fn test_eventual_consistency_reads_unaffected() {
let (engine, _temp_dir) = create_test_engine("high_load").await;
let client = engine.client();
client.put(b"ec_test", b"v1").await.expect("PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let ec_start = Instant::now();
let ec_result = client.get_eventual(b"ec_test").await.expect("EC read failed");
let ec_latency = ec_start.elapsed();
let lin_start = Instant::now();
let lin_result = client.get_linearizable(b"ec_test").await.expect("Linearizable read failed");
let lin_latency = lin_start.elapsed();
assert_eq!(
ec_result.as_deref(),
Some(b"v1".as_ref()),
"EC read should return correct value"
);
assert_eq!(
lin_result.as_deref(),
Some(b"v1".as_ref()),
"Linearizable read should return correct value"
);
println!("✅ EC read: {ec_latency:?}, Linearizable read: {lin_latency:?}");
}
#[tokio::test]
#[traced_test]
async fn test_linearizable_read_sequential_writes() {
let (engine, _temp_dir) = create_test_engine("slow_apply").await;
let client = engine.client();
client.put(b"seq_key", b"v1").await.expect("First PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let read1 = client.get_linearizable(b"seq_key").await.expect("First read failed");
assert_eq!(
read1.as_deref(),
Some(b"v1".as_ref()),
"First read should return v1"
);
client.put(b"seq_key", b"v2").await.expect("Second PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let read2 = client.get_linearizable(b"seq_key").await.expect("Second read failed");
assert_eq!(
read2.as_deref(),
Some(b"v2".as_ref()),
"Second read should return v2"
);
println!("✅ Sequential writes with linearizable reads succeeded");
}
#[tokio::test]
#[traced_test]
#[cfg(feature = "rocksdb")]
async fn test_read_index_fixed_with_concurrent_writes_multi_node()
-> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let db_root_dir = temp_dir.path().join("db");
let log_dir = temp_dir.path().join("logs");
let mut port_guard = get_available_ports(3).await;
port_guard.release_listeners();
let ports = port_guard.as_slice();
let mut engines = Vec::new();
for i in 0..3 {
let node_id = (i + 1) as u64;
let base_config_str = create_node_config(
node_id,
ports[i],
ports,
db_root_dir.to_str().unwrap(),
log_dir.to_str().unwrap(),
)
.await;
let config_str = format!(
"{base_config_str}\n[raft.read_consistency]\nstate_machine_sync_timeout_ms = 5000\n",
);
let config = node_config(&config_str);
let node_db_root = config.cluster.db_root_dir.join(format!("node{node_id}"));
let storage_path = node_db_root.join("storage");
let sm_path = node_db_root.join("state_machine");
tokio::fs::create_dir_all(&storage_path).await?;
tokio::fs::create_dir_all(&sm_path).await?;
let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let state_machine = Arc::new(RocksDBStateMachine::new(sm_path)?);
let config_path = format!("/tmp/d-engine-test-linear-read-node{node_id}.toml");
tokio::fs::write(&config_path, &config_str).await?;
let engine =
EmbeddedEngine::start_custom(storage, state_machine, Some(&config_path)).await?;
engines.push(engine);
}
let leader_info = engines[0].wait_ready(Duration::from_secs(10)).await?;
let leader_idx = (leader_info.leader_id - 1) as usize;
let leader_client = engines[leader_idx].client();
leader_client.put(b"counter", b"v0").await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let client_clone = leader_client.clone();
let read_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
client_clone.get_linearizable(b"counter").await
});
tokio::time::sleep(Duration::from_millis(10)).await;
for i in 1..=100 {
leader_client.put(b"counter", format!("v{i}").as_bytes()).await?;
}
let result = read_task.await?;
assert!(
result.is_ok(),
"Linearizable read should succeed with concurrent writes: {result:?}"
);
for engine in engines {
engine.stop().await?;
}
println!("✅ Multi-node read_index optimization verified");
Ok(())
}