#[cfg(test)]
mod tests {
use std::collections::HashMap;
use bytes::Bytes;
#[test]
fn test_get_multi_result_reconstruction() {
let requested_keys: Vec<_> = [
Bytes::from("key1"),
Bytes::from("key2"),
Bytes::from("key3"),
]
.to_vec();
let server_results: Vec<_> = vec![
(Bytes::from("key1"), Bytes::from("value1")),
(Bytes::from("key3"), Bytes::from("value3")),
];
let results_by_key: HashMap<_, _> = server_results.into_iter().collect();
let reconstructed: Vec<Option<Bytes>> =
requested_keys.iter().map(|k| results_by_key.get(k).cloned()).collect();
assert_eq!(
reconstructed.len(),
3,
"Result count must match request count"
);
assert_eq!(
reconstructed[0],
Some(Bytes::from("value1")),
"Position 0 is key1"
);
assert_eq!(reconstructed[1], None, "Position 1 is key2 (missing)");
assert_eq!(
reconstructed[2],
Some(Bytes::from("value3")),
"Position 2 is key3"
);
}
#[test]
fn test_get_multi_all_keys_exist() {
let requested_keys: Vec<_> = [Bytes::from("a"), Bytes::from("b")].to_vec();
let server_results: Vec<_> = vec![
(Bytes::from("a"), Bytes::from("1")),
(Bytes::from("b"), Bytes::from("2")),
];
let results_by_key: HashMap<_, _> = server_results.into_iter().collect();
let reconstructed: Vec<Option<Bytes>> =
requested_keys.iter().map(|k| results_by_key.get(k).cloned()).collect();
assert_eq!(reconstructed.len(), 2);
assert_eq!(reconstructed[0], Some(Bytes::from("1")));
assert_eq!(reconstructed[1], Some(Bytes::from("2")));
}
#[test]
fn test_get_multi_no_keys_exist() {
let requested_keys: Vec<_> =
[Bytes::from("x"), Bytes::from("y"), Bytes::from("z")].to_vec();
let server_results: Vec<(Bytes, Bytes)> = Vec::new();
let results_by_key: HashMap<_, _> = server_results.into_iter().collect();
let reconstructed: Vec<Option<Bytes>> =
requested_keys.iter().map(|k| results_by_key.get(k).cloned()).collect();
assert_eq!(reconstructed.len(), 3);
assert!(reconstructed.iter().all(|r| r.is_none()));
}
#[test]
fn test_get_multi_preserves_empty_values() {
let requested_keys: Vec<_> = [Bytes::from("empty"), Bytes::from("nonempty")].to_vec();
let server_results: Vec<_> = vec![
(Bytes::from("empty"), Bytes::new()),
(Bytes::from("nonempty"), Bytes::from("v")),
];
let results_by_key: HashMap<_, _> = server_results.into_iter().collect();
let reconstructed: Vec<Option<Bytes>> =
requested_keys.iter().map(|k| results_by_key.get(k).cloned()).collect();
assert_eq!(reconstructed[0], Some(Bytes::new()));
assert_eq!(reconstructed[1], Some(Bytes::from("v")));
}
}
#[cfg(test)]
mod integration_tests {
use std::time::Duration;
use d_engine_core::ClientApi;
use tempfile::TempDir;
use crate::api::EmbeddedEngine;
async fn create_test_engine() -> (EmbeddedEngine, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("db");
let config_path = temp_dir.path().join("d-engine.toml");
let port = 50000 + (std::process::id() % 10000);
let config_content = format!(
r#"
[cluster]
listen_address = "127.0.0.1:{}"
db_root_dir = "{}"
single_node = true
[raft]
general_raft_timeout_duration_in_ms = 100
[raft.commit_handler]
batch_size_threshold = 1
process_interval_ms = 1
[replication]
max_batch_size = 1
"#,
port,
db_path.display()
);
std::fs::write(&config_path, config_content).expect("Failed to write config");
let engine = EmbeddedEngine::start_with(config_path.to_str().unwrap())
.await
.expect("Failed to start engine");
engine.wait_ready(Duration::from_secs(5)).await.expect("Engine not ready");
(engine, temp_dir)
}
mod cas_operations {
use super::*;
#[tokio::test]
async fn test_cas_acquire_lock_success() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let lock_key = b"distributed_lock";
let owner = b"client_a";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner).await;
assert!(result.is_ok(), "CAS should not error");
assert!(result.unwrap(), "CAS should succeed");
let value = client.get(lock_key).await.expect("get should succeed");
assert_eq!(value, Some(owner.to_vec().into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_lock_conflict() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let lock_key = b"distributed_lock";
let owner_a = b"client_a";
let owner_b = b"client_b";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner_a).await;
assert!(result.unwrap(), "Client A should acquire lock");
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner_b).await;
assert!(!result.unwrap(), "Client B should fail - lock held");
let value = client.get(lock_key).await.expect("get should succeed");
assert_eq!(value, Some(owner_a.to_vec().into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_release_lock() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let lock_key = b"distributed_lock";
let owner = b"client_a";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner).await;
assert!(result.unwrap());
let result = client.compare_and_swap(lock_key, Some(owner), b"").await;
assert!(result.unwrap(), "Correct owner should release lock");
let value = client.get(lock_key).await.expect("get should succeed");
assert_eq!(value, Some(b"".to_vec().into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_prevent_wrong_release() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let lock_key = b"distributed_lock";
let owner_a = b"client_a";
let wrong_owner = b"client_b";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner_a).await;
assert!(result.unwrap());
let result = client.compare_and_swap(lock_key, Some(wrong_owner), b"").await;
assert!(!result.unwrap(), "Wrong owner cannot release lock");
let value = client.get(lock_key).await.expect("get should succeed");
assert_eq!(value, Some(owner_a.to_vec().into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_edge_cases() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let result = client.compare_and_swap(b"empty_key", None::<&[u8]>, b"").await;
assert!(result.unwrap(), "CAS with empty value should succeed");
let value = client.get(b"empty_key").await.expect("get should succeed");
assert_eq!(value, Some(b"".to_vec().into()));
let large_value = vec![b'x'; 1024 * 1024]; let result = client.compare_and_swap(b"large_key", None::<&[u8]>, &large_value).await;
assert!(result.unwrap(), "CAS with large value should succeed");
let value = client.get(b"large_key").await.expect("get should succeed");
assert_eq!(value, Some(large_value.into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_nonexistent_key_success() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let nonexistent_key = b"does_not_exist";
let result =
client.compare_and_swap(nonexistent_key, None::<&[u8]>, b"new_value").await;
assert!(
result.unwrap(),
"CAS on non-existent key with None should succeed"
);
let value = client.get(nonexistent_key).await.expect("get should succeed");
assert_eq!(value, Some(b"new_value".to_vec().into()));
engine.stop().await.expect("Failed to stop engine");
}
#[tokio::test]
async fn test_cas_nonexistent_key_failure() {
let (engine, _temp_dir) = create_test_engine().await;
let client = engine.client();
let nonexistent_key = b"does_not_exist";
let result = client.compare_and_swap(nonexistent_key, Some(b"wrong"), b"value").await;
assert!(
!result.unwrap(),
"CAS on non-existent key with Some(wrong) should fail"
);
let value = client.get(nonexistent_key).await.expect("get should succeed");
assert_eq!(value, None);
engine.stop().await.expect("Failed to stop engine");
}
}
}