use std::time::Duration;
use bytes::Bytes;
use d_engine_server::EmbeddedEngine;
use tempfile::TempDir;
use tokio::time::Instant;
use tracing_test::traced_test;
use crate::common::get_available_ports;
async fn create_engine_with_batching(
test_name: &str,
size_threshold: usize,
) -> (EmbeddedEngine, TempDir) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join(test_name);
let config_path = temp_dir.path().join("d-engine.toml");
let mut port_guard = get_available_ports(1).await;
port_guard.release_listeners();
let port = port_guard.as_slice()[0];
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft.read_consistency]
state_machine_sync_timeout_ms = 5000
[raft.read_consistency.read_batching]
size_threshold = {}
"#,
port,
db_path.display(),
size_threshold,
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let engine = EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Engine not ready");
(engine, temp_dir)
}
#[traced_test]
#[tokio::test]
async fn test_batching_preserves_linearizability() {
let (engine, _temp_dir) = create_engine_with_batching("test_linearizability", 50).await;
let client = engine.client();
client
.put(Bytes::from("key1"), Bytes::from("v1"))
.await
.expect("Failed to write key1");
client
.put(Bytes::from("key2"), Bytes::from("v2"))
.await
.expect("Failed to write key2");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut handles = vec![];
let base_client = engine.client();
for i in 0..100 {
let client = base_client.clone();
let key = if i < 50 {
Bytes::from("key1")
} else {
Bytes::from("key2")
};
let expected = if i < 50 {
Bytes::from("v1")
} else {
Bytes::from("v2")
};
let handle = tokio::spawn(async move {
let result = client
.get_linearizable(&key)
.await
.expect("Read failed")
.expect("Key should exist");
assert_eq!(result, expected, "Should return correct value");
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task failed");
}
}
#[traced_test]
#[tokio::test]
async fn test_concurrent_write_and_read() {
let (engine, _temp_dir) = create_engine_with_batching("test_concurrent_write", 50).await;
let client = engine.client();
client
.put(Bytes::from("key1"), Bytes::from("v1"))
.await
.expect("Failed to write key1");
tokio::time::sleep(Duration::from_millis(100)).await;
let start = Instant::now();
let mut handles = vec![];
let base_client = engine.client();
for _i in 0..100 {
let client = base_client.clone();
let handle = tokio::spawn(async move {
let result = client
.get_linearizable(b"key1")
.await
.expect("Read failed")
.expect("Key should exist");
assert_eq!(
result,
Bytes::from("v1"),
"Should read latest committed value"
);
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task failed");
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"Batched reads should complete quickly, took {elapsed:?}"
);
engine.stop().await.expect("Failed to stop engine");
}
#[traced_test]
#[tokio::test]
async fn test_single_request_timeout_trigger() {
let (engine, _temp_dir) = create_engine_with_batching("test_timeout", 50).await;
let client = engine.client();
client
.put(Bytes::from("lonely_key"), Bytes::from("lonely_value"))
.await
.expect("Failed to write");
tokio::time::sleep(Duration::from_millis(100)).await;
let start = Instant::now();
let result = client
.get_linearizable(b"lonely_key")
.await
.expect("Read failed")
.expect("Key should exist");
let elapsed = start.elapsed();
assert_eq!(result, Bytes::from("lonely_value"));
assert!(
elapsed < Duration::from_millis(100),
"Should not hang, took {elapsed:?}"
);
println!("Single request completed in {elapsed:?} (single-node fast path verified)");
engine.stop().await.expect("Failed to stop engine");
}
#[traced_test]
#[tokio::test]
async fn test_size_threshold_immediate_flush() {
let (engine, _temp_dir) = create_engine_with_batching("test_performance", 100).await;
let client = engine.client();
for i in 0..50 {
client
.put(Bytes::from(format!("key{i}")), Bytes::from(format!("v{i}")))
.await
.expect("Failed to write");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let start = Instant::now();
let mut handles = vec![];
let base_client = engine.client();
for i in 0..20 {
let client = base_client.clone();
let key = format!("key{i}");
let handle = tokio::spawn(async move {
client.get_linearizable(key.as_bytes()).await.expect("Read failed")
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task failed");
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(50),
"Size threshold should trigger immediate flush, took {elapsed:?}"
);
println!("50 batched reads completed in {elapsed:?} (size threshold verified)");
}
#[traced_test]
#[tokio::test]
async fn test_batching_throughput_improvement() {
if std::env::var("CI").is_ok() {
println!("Skipping benchmark (set CI=1 to run)");
return;
}
const NUM_REQUESTS: usize = 1000;
let (node1, _temp_dir1) = create_engine_with_batching("test_throughput_off", 10000).await;
let client1 = node1.client();
client1
.put(Bytes::from("bench_key"), Bytes::from("bench_value"))
.await
.expect("Failed to write");
tokio::time::sleep(Duration::from_millis(100)).await;
let start1 = Instant::now();
let mut handles1 = vec![];
let base_client1 = node1.client();
for _ in 0..NUM_REQUESTS {
let client = base_client1.clone();
let handle = tokio::spawn(async move { client.get_linearizable(b"bench_key").await });
handles1.push(handle);
}
for handle in handles1 {
let _ = handle.await;
}
let elapsed1 = start1.elapsed();
let throughput1 = NUM_REQUESTS as f64 / elapsed1.as_secs_f64();
node1.stop().await.expect("Failed to stop node1");
let (node2, _temp_dir2) = create_engine_with_batching("test_throughput_on", 50).await;
let client2 = node2.client();
client2
.put(Bytes::from("bench_key"), Bytes::from("bench_value"))
.await
.expect("Failed to write");
tokio::time::sleep(Duration::from_millis(100)).await;
let start2 = Instant::now();
let mut handles2 = vec![];
let base_client2 = node2.client();
for _ in 0..NUM_REQUESTS {
let client = base_client2.clone();
let handle = tokio::spawn(async move { client.get_linearizable(b"bench_key").await });
handles2.push(handle);
}
for handle in handles2 {
let _ = handle.await;
}
let elapsed2 = start2.elapsed();
let throughput2 = NUM_REQUESTS as f64 / elapsed2.as_secs_f64();
node2.stop().await.expect("Failed to stop node2");
println!("\n=== Throughput Regression Test ===");
println!("Baseline (batching OFF): {throughput1:.0} ops/sec (took {elapsed1:?})");
println!("Batching ON: {throughput2:.0} ops/sec (took {elapsed2:?})");
println!("Performance change: {:.2}x", throughput2 / throughput1);
assert!(
throughput2 >= throughput1,
"Batching should not degrade performance: {throughput2:.0} ops/sec < {throughput1:.0} ops/sec"
);
println!("\n✓ No performance regression detected");
if throughput2 > throughput1 * 1.5 {
println!(
" Bonus: {:.2}x improvement observed",
throughput2 / throughput1
);
}
}