use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use d_engine_server::EmbeddedEngine;
use tempfile::TempDir;
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::common::get_available_ports;
async fn create_test_engine(test_name: &str) -> (EmbeddedEngine, TempDir) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join(test_name);
let config_path = temp_dir.path().join("d-engine.toml");
let mut port_guard = get_available_ports(1).await;
port_guard.release_listeners();
let port = port_guard.as_slice()[0];
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft.batching]
max_batch_size = 100
[raft.state_machine.lease]
enabled = true
"#,
port,
db_path.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let engine = EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Engine not ready");
(engine, temp_dir)
}
#[tokio::test]
async fn test_select_fairness_drain_no_starvation() {
let (engine, _temp_dir) = create_test_engine("select_fairness").await;
println!("Engine started, starting fairness test");
let write_count = Arc::new(AtomicU64::new(0));
const NUM_WRITERS: usize = 8;
let mut writer_handles = Vec::new();
let mut semaphores = Vec::new();
for writer_id in 0..NUM_WRITERS {
let client = engine.client();
let write_count_clone = write_count.clone();
let sem = Arc::new(Semaphore::new(32)); semaphores.push(sem.clone());
let handle = tokio::spawn(async move {
let mut i = 0u64;
loop {
for _ in 0..10 {
let key = format!("key_{writer_id}_{i}");
let value = format!("value_{i}");
let client_clone = client.clone();
let write_count_clone = write_count_clone.clone();
let permit = sem.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
let _permit = permit; if client_clone.put(key.as_bytes(), value.as_bytes()).await.is_ok() {
write_count_clone.fetch_add(1, Ordering::Relaxed);
}
});
i += 1;
}
tokio::task::yield_now().await;
}
});
writer_handles.push(handle);
}
let read_latencies = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let read_latencies_clone = read_latencies.clone();
let monitor_client = engine.client();
let monitor_task = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
for i in 0..50 {
let start = Instant::now();
let _result = monitor_client.get_linearizable(b"monitor_key").await;
let latency = start.elapsed();
read_latencies_clone.lock().await.push(latency);
tokio::time::sleep(Duration::from_millis(50)).await;
if i % 10 == 0 {
println!("Monitor iteration {i}, latency: {latency:?}");
}
}
});
tokio::time::sleep(Duration::from_secs(3)).await;
for handle in &writer_handles {
handle.abort();
}
for sem in &semaphores {
let _ = sem.acquire_many(32).await;
}
monitor_task.await.expect("Monitor task failed");
let total_writes = write_count.load(Ordering::Relaxed);
println!("Total writes submitted: {total_writes}");
let latencies = read_latencies.lock().await;
println!("Read samples: {}", latencies.len());
let mut sorted_latencies = latencies.clone();
sorted_latencies.sort();
assert!(
sorted_latencies.len() >= 10,
"Too few latency samples collected: {} (expected ≥ 10). \
Monitor task may have failed or reads timed out.",
sorted_latencies.len()
);
let p50_idx = sorted_latencies.len() / 2;
let p95_idx = (sorted_latencies.len() * 95) / 100;
let p99_idx = (sorted_latencies.len() * 99) / 100;
let p50 = sorted_latencies[p50_idx];
let p95 = sorted_latencies[p95_idx];
let p99 = sorted_latencies[p99_idx];
println!("Read latency p50: {p50:?}");
println!("Read latency p95: {p95:?}");
println!("Read latency p99: {p99:?}");
let is_ci = std::env::var("CI").is_ok();
let (p99_threshold, p50_threshold) = if is_ci {
(Duration::from_millis(100), Duration::from_millis(50))
} else {
(Duration::from_millis(50), Duration::from_millis(10))
};
assert!(
p99 < p99_threshold,
"P99 latency too high: {p99:?} (threshold: {p99_threshold:?}, CI: {is_ci})"
);
assert!(
p50 < p50_threshold,
"P50 latency too high: {p50:?} (threshold: {p50_threshold:?}, CI: {is_ci})"
);
assert!(
total_writes > 1000,
"Write throughput too low: {total_writes} (system may be blocked)"
);
println!("✅ Select fairness test passed: no starvation detected");
}