use std::time::Duration;
use bytes::Bytes;
use d_engine_core::ClientApi;
use d_engine_server::EmbeddedEngine;
use tempfile::TempDir;
use crate::common::get_available_ports;
async fn create_test_engine(test_name: &str) -> (EmbeddedEngine, TempDir) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join(test_name);
let config_path = temp_dir.path().join("d-engine.toml");
let mut port_guard = get_available_ports(1).await;
port_guard.release_listeners();
let port = port_guard.as_slice()[0];
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft.state_machine.lease]
enabled = true
"#,
port,
db_path.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let engine = EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Engine not ready");
(engine, temp_dir)
}
#[tokio::test]
async fn test_local_client_put() {
let (engine, _temp_dir) = create_test_engine("put").await;
let client = engine.client();
let key = b"test_key";
let value = b"test_value";
let result = client.put(key, value).await;
assert!(result.is_ok(), "PUT should succeed: {result:?}");
println!("✅ EmbeddedClient PUT operation succeeded");
}
#[tokio::test]
async fn test_local_client_get() {
let (engine, _temp_dir) = create_test_engine("get").await;
let client = engine.client();
let key = b"get_test_key";
let value = b"get_test_value";
client.put(key, value).await.expect("PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let result = client.get_eventual(key).await.expect("GET failed");
assert!(result.is_some(), "Value should exist");
assert_eq!(result.unwrap(), Bytes::from_static(value), "Value mismatch");
println!("✅ EmbeddedClient GET operation succeeded");
}
#[tokio::test]
async fn test_local_client_get_not_found() {
let (engine, _temp_dir) = create_test_engine("not_found").await;
let client = engine.client();
let key = b"non_existent_key";
let result = client.get_eventual(key).await.expect("GET should not error");
assert!(result.is_none(), "Non-existent key should return None");
println!("✅ EmbeddedClient GET not found handled correctly");
}
#[tokio::test]
async fn test_local_client_delete() {
let (engine, _temp_dir) = create_test_engine("delete").await;
let client = engine.client();
let key = b"delete_test_key";
let value = b"delete_test_value";
client.put(key, value).await.expect("PUT failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let get_result = client.get_eventual(key).await.expect("First GET failed");
assert!(get_result.is_some(), "Value should exist before delete");
client.delete(key).await.expect("DELETE failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let get_result = client.get_eventual(key).await.expect("Second GET failed");
assert!(get_result.is_none(), "Value should not exist after delete");
println!("✅ EmbeddedClient DELETE operation succeeded");
}
#[tokio::test]
async fn test_local_client_sequential_ops() {
let (engine, _temp_dir) = create_test_engine("sequential").await;
let client = engine.client();
for i in 0..5 {
let key = format!("key_{i}");
let value = format!("value_{i}");
client.put(key.as_bytes(), value.as_bytes()).await.expect("PUT failed");
}
tokio::time::sleep(Duration::from_millis(200)).await;
for i in 0..5 {
let key = format!("key_{i}");
let expected_value = format!("value_{i}");
let result = client.get_eventual(key.as_bytes()).await.expect("GET failed");
assert_eq!(
result.unwrap(),
Bytes::from(expected_value),
"Value mismatch for key_{i}"
);
}
for i in 0..5 {
let key = format!("key_{i}");
client.delete(key.as_bytes()).await.expect("DELETE failed");
}
tokio::time::sleep(Duration::from_millis(200)).await;
for i in 0..5 {
let key = format!("key_{i}");
let result = client.get_eventual(key.as_bytes()).await.expect("GET failed");
assert!(result.is_none(), "key_{i} should be deleted");
}
println!("✅ EmbeddedClient sequential operations succeeded");
}
#[tokio::test]
async fn test_local_client_concurrent_ops() {
let (engine, _temp_dir) = create_test_engine("concurrent").await;
let client1 = engine.client();
let client2 = engine.client();
let client3 = engine.client();
let handle1 = {
let client = client1.clone();
tokio::spawn(async move {
for i in 0..10 {
let key = format!("concurrent_key_{i}");
let value = format!("value_from_client1_{i}");
client.put(key.as_bytes(), value.as_bytes()).await.expect("Client1 PUT failed");
}
})
};
let handle2 = {
let client = client2.clone();
tokio::spawn(async move {
for i in 10..20 {
let key = format!("concurrent_key_{i}");
let value = format!("value_from_client2_{i}");
client.put(key.as_bytes(), value.as_bytes()).await.expect("Client2 PUT failed");
}
})
};
let handle3 = {
let client = client3.clone();
tokio::spawn(async move {
for i in 20..30 {
let key = format!("concurrent_key_{i}");
let value = format!("value_from_client3_{i}");
client.put(key.as_bytes(), value.as_bytes()).await.expect("Client3 PUT failed");
}
})
};
let (r1, r2, r3) = tokio::join!(handle1, handle2, handle3);
assert!(
r1.is_ok() && r2.is_ok() && r3.is_ok(),
"All concurrent operations should succeed"
);
drop(engine);
drop(_temp_dir);
println!("✅ EmbeddedClient concurrent operations succeeded");
}
#[tokio::test]
async fn test_local_client_large_value() {
let (engine, _temp_dir) = create_test_engine("delete_nonexist").await;
let client = engine.client();
let key = b"large_value_key";
let large_value = vec![b'X'; 512 * 1024];
client.put(key, &large_value).await.expect("Large value PUT failed");
tokio::time::sleep(Duration::from_millis(200)).await;
let result = client.get_eventual(key).await.expect("Large value GET failed");
assert!(result.is_some(), "Large value should exist");
assert_eq!(
result.unwrap().len(),
large_value.len(),
"Large value size mismatch"
);
println!("✅ EmbeddedClient large value handling succeeded");
}
#[tokio::test]
async fn test_local_client_empty_key_value() {
let (engine, _temp_dir) = create_test_engine("empty").await;
let client = engine.client();
let result = client.put(b"", b"some_value").await;
assert!(result.is_ok(), "Empty key PUT should succeed");
let result = client.put(b"key_with_empty_value", b"").await;
assert!(result.is_ok(), "Empty value PUT should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
let get_result = client.get_eventual(b"key_with_empty_value").await.expect("GET failed");
assert_eq!(
get_result.unwrap(),
Bytes::new(),
"Empty value should be retrievable"
);
println!("✅ EmbeddedClient empty key/value handling succeeded");
}
#[tokio::test]
async fn test_local_client_update() {
let (engine, _temp_dir) = create_test_engine("overwrite").await;
let client = engine.client();
let key = b"update_key";
let value1 = b"original_value";
let value2 = b"updated_value";
client.put(key, value1).await.expect("Initial PUT failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result = client.get_eventual(key).await.expect("First GET failed");
assert_eq!(result.unwrap(), Bytes::from_static(value1));
client.put(key, value2).await.expect("Update PUT failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result = client.get_eventual(key).await.expect("Second GET failed");
assert_eq!(
result.unwrap(),
Bytes::from_static(value2),
"Value should be updated"
);
println!("✅ EmbeddedClient update operation succeeded");
}
#[tokio::test]
async fn test_local_client_getters() {
let (engine, _temp_dir) = create_test_engine("large").await;
let client = engine.client();
let client_id = client.client_id();
assert!(client_id > 0, "Client ID should be positive");
let timeout = client.timeout();
assert!(timeout.as_millis() > 0, "Timeout should be positive");
println!(
"✅ EmbeddedClient getters work correctly (client_id={}, timeout={}ms)",
client_id,
timeout.as_millis()
);
}
#[tokio::test]
async fn test_local_client_clone() {
let (engine, _temp_dir) = create_test_engine("clone").await;
let client1 = engine.client();
let client2 = client1.clone();
client1.put(b"key1", b"value1").await.expect("Client1 PUT failed");
client2.put(b"key2", b"value2").await.expect("Client2 PUT failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let result1 = client1.get_eventual(b"key2").await.expect("Client1 GET failed");
let result2 = client2.get_eventual(b"key1").await.expect("Client2 GET failed");
assert!(
result1.is_some() && result2.is_some(),
"Both clients should see all data"
);
println!("✅ EmbeddedClient clone works correctly");
}
#[tokio::test]
async fn test_linearizable_read_after_write_no_sleep() {
let (engine, _temp_dir) = create_test_engine("special_chars").await;
let client = engine.client();
let key = b"linearizable_key";
let value = b"linearizable_value";
client.put(key, value).await.expect("PUT should succeed - log committed");
let result = client.get_linearizable(key).await.expect("get_linearizable should not error");
assert!(
result.is_some(),
"Linearizable read MUST return value immediately after PUT completes (no sleep needed)"
);
assert_eq!(
result.unwrap(),
Bytes::from_static(value),
"Value must match what we wrote"
);
println!("✅ Linearizable read correctly waits for state machine to catch up");
}
#[tokio::test]
async fn test_linearizable_read_sees_latest_value() {
let (engine, _temp_dir) = create_test_engine("multiple_keys").await;
let client = engine.client();
let key = b"seq_key";
client.put(key, b"v1").await.expect("PUT v1 failed");
let result = client.get_linearizable(key).await.expect("GET after v1 failed");
assert_eq!(result.unwrap(), Bytes::from_static(b"v1"), "Should read v1");
client.put(key, b"v2").await.expect("PUT v2 failed");
let result = client.get_linearizable(key).await.expect("GET after v2 failed");
assert_eq!(
result.unwrap(),
Bytes::from_static(b"v2"),
"Should read v2 (NOT v1 - no stale reads)"
);
client.put(key, b"v3").await.expect("PUT v3 failed");
let result = client.get_linearizable(key).await.expect("GET after v3 failed");
assert_eq!(
result.unwrap(),
Bytes::from_static(b"v3"),
"Should read v3 (latest value)"
);
println!("✅ Linearizable reads always see latest committed value");
}
#[tokio::test]
async fn test_concurrent_write_and_linearizable_read() {
let (engine, _temp_dir) = create_test_engine("get_multi").await;
let client = engine.client();
let mut tasks = vec![];
for i in 0..100 {
let c = client.clone();
tasks.push(tokio::spawn(async move {
let key = format!("key_{i}").into_bytes();
let value = format!("value_{i}").into_bytes();
c.put(&key, &value).await.unwrap_or_else(|_| panic!("PUT key_{i} failed"));
let result =
c.get_linearizable(&key).await.unwrap_or_else(|_| panic!("GET key_{i} failed"));
assert_eq!(
result.unwrap(),
Bytes::from(value),
"Linearizable read must see its own write for key_{i}"
);
}));
}
for task in tasks {
task.await.unwrap();
}
println!("✅ All 100 concurrent linearizable reads saw their own writes");
}
#[tokio::test]
async fn test_put_with_ttl_readable_immediately() {
let (engine, _temp_dir) = create_test_engine("put_with_ttl").await;
let client = engine.client();
client
.put_with_ttl(b"ttl-key", b"ttl-value", 60)
.await
.expect("put_with_ttl should succeed");
let value = client
.get_linearizable(b"ttl-key")
.await
.expect("get_linearizable should succeed");
assert_eq!(
value,
Some(Bytes::from("ttl-value")),
"value written with TTL must be readable immediately after write completes"
);
}
#[tokio::test]
async fn test_get_with_consistency_lease_read() {
use d_engine_proto::client::ReadConsistencyPolicy;
let (engine, _temp_dir) = create_test_engine("lease_read").await;
let client = engine.client();
client.put(b"lease-key", b"lease-val").await.expect("put should succeed");
let value = client
.get_with_consistency(b"lease-key", ReadConsistencyPolicy::LeaseRead)
.await
.expect("get_with_consistency LeaseRead should succeed");
assert_eq!(value, Some(Bytes::from("lease-val")));
}
#[tokio::test]
async fn test_get_multi_linearizable_aligned_with_missing_keys() {
let (engine, _temp_dir) = create_test_engine("get_multi_linearizable").await;
let client = engine.client();
client.put(b"ml1", b"v1").await.expect("put ml1 should succeed");
client.put(b"ml3", b"v3").await.expect("put ml3 should succeed");
let keys = vec![Bytes::from("ml1"), Bytes::from("ml2"), Bytes::from("ml3")];
let results = client
.get_multi_linearizable(&keys)
.await
.expect("get_multi_linearizable should succeed");
assert_eq!(results.len(), 3, "result length must equal key count");
assert_eq!(results[0], Some(Bytes::from("v1")), "ml1 must be present");
assert_eq!(results[1], None, "ml2 was not written — must be None");
assert_eq!(results[2], Some(Bytes::from("v3")), "ml3 must be present");
}
#[tokio::test]
async fn test_get_multi_eventual_aligned_with_missing_keys() {
let (engine, _temp_dir) = create_test_engine("get_multi_eventual").await;
let client = engine.client();
client.put(b"me1", b"a").await.expect("put me1 should succeed");
client.put(b"me2", b"b").await.expect("put me2 should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
let keys = vec![Bytes::from("me1"), Bytes::from("me2"), Bytes::from("me3")];
let results = client
.get_multi_eventual(&keys)
.await
.expect("get_multi_eventual should succeed");
assert_eq!(results.len(), 3, "result length must equal key count");
assert_eq!(results[0], Some(Bytes::from("a")));
assert_eq!(results[1], Some(Bytes::from("b")));
assert_eq!(results[2], None, "me3 was not written — must be None");
}
#[tokio::test]
async fn test_get_multi_linearizable_empty_keys() {
let (engine, _temp_dir) = create_test_engine("get_multi_empty").await;
let client = engine.client();
let results = client
.get_multi_linearizable(&[])
.await
.expect("empty key slice should not error");
assert!(results.is_empty(), "empty input must produce empty output");
}
#[cfg(feature = "watch")]
async fn create_watch_engine(
test_name: &str
) -> (d_engine_server::EmbeddedEngine, tempfile::TempDir) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join(test_name);
let config_path = temp_dir.path().join("d-engine.toml");
let mut port_guard = get_available_ports(1).await;
port_guard.release_listeners();
let port = port_guard.as_slice()[0];
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft]
general_raft_timeout_duration_in_ms = 100
[raft.commit_handler]
batch_size_threshold = 1
process_interval_ms = 1
[replication]
max_batch_size = 1
[raft.watch]
enabled = true
event_queue_size = 1000
watcher_buffer_size = 10
"#,
port,
db_path.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write watch config");
let engine = d_engine_server::EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start watch engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Watch engine not ready");
(engine, temp_dir)
}
#[cfg(feature = "watch")]
#[tokio::test]
async fn test_watch_registration_succeeds() {
let (engine, _temp_dir) = create_watch_engine("watch_reg").await;
let client = engine.client();
let handle = client.watch(b"watched-key");
assert!(handle.is_ok(), "watch registration should succeed");
}
#[cfg(feature = "watch")]
#[tokio::test]
async fn test_watch_receives_put_event() {
use d_engine_core::watch::WatchEventType;
let (engine, _temp_dir) = create_watch_engine("watch_put").await;
let client = engine.client();
let mut handle = client.watch(b"w-key").expect("watch should succeed");
client.put(b"w-key", b"w-val").await.expect("put should succeed");
let event = tokio::time::timeout(Duration::from_secs(1), handle.receiver_mut().recv())
.await
.expect("timed out waiting for Put watch event")
.expect("watch channel should not be closed");
assert_eq!(
event.event_type,
WatchEventType::Put as i32,
"expected Put event"
);
assert_eq!(event.key, Bytes::from("w-key"));
assert_eq!(event.value, Bytes::from("w-val"));
}
#[cfg(feature = "watch")]
#[tokio::test]
async fn test_watch_receives_delete_event() {
use d_engine_core::watch::WatchEventType;
let (engine, _temp_dir) = create_watch_engine("watch_delete").await;
let client = engine.client();
client.put(b"d-key", b"d-val").await.expect("put should succeed");
let mut handle = client.watch(b"d-key").expect("watch should succeed");
client.delete(b"d-key").await.expect("delete should succeed");
let event = tokio::time::timeout(Duration::from_secs(1), handle.receiver_mut().recv())
.await
.expect("timed out waiting for Delete watch event")
.expect("watch channel should not be closed");
assert_eq!(
event.event_type,
WatchEventType::Delete as i32,
"expected Delete event"
);
assert_eq!(event.key, Bytes::from("d-key"));
}
#[cfg(feature = "watch")]
#[tokio::test]
async fn test_watch_key_isolation() {
let (engine, _temp_dir) = create_watch_engine("watch_isolation").await;
let client = engine.client();
let mut handle = client.watch(b"my-key").expect("watch should succeed");
client.put(b"other-key", b"other-val").await.expect("put should succeed");
tokio::time::sleep(Duration::from_millis(200)).await;
match handle.receiver_mut().try_recv() {
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
}
Ok(event) => panic!("unexpected event for unrelated key: {event:?}"),
Err(e) => panic!("unexpected channel error: {e:?}"),
}
}