use std::sync::Arc;
use std::vec;
use arc_swap::ArcSwap;
use bytes::Bytes;
use d_engine_core::ClientApi;
use d_engine_core::client::ClientApiResult;
use d_engine_core::client::ClientResponse;
use d_engine_core::client::ErrorCode;
use d_engine_core::client::KvEntry;
use d_engine_core::config::ReadConsistencyPolicy;
use d_engine_proto::client::WatchEventType;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use tokio::sync::oneshot;
use tracing_test::traced_test;
use crate::Client;
use crate::ClientConfig;
use crate::ClientInner;
use crate::ConnectionPool;
use crate::GrpcClient;
use crate::mock_rpc_service::MockNode;
#[tokio::test]
#[traced_test]
async fn test_put_success() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::write_success(),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "test_key".to_string().into_bytes();
let value = "test_value".to_string().into_bytes();
let result = client.put(key, value).await;
println!("Result: {result:?}",);
assert!(result.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_put_with_ttl_success() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::write_success(),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "ttl_key".to_string().into_bytes();
let value = "ttl_value".to_string().into_bytes();
let ttl_secs = 3600;
let result = client.put_with_ttl(key, value, ttl_secs).await;
println!("Result: {result:?}");
assert!(result.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_put_with_ttl_failure() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::client_error(ErrorCode::ConnectionTimeout),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "ttl_key".to_string().into_bytes();
let value = "ttl_value".to_string().into_bytes();
let ttl_secs = 3600;
let result = client.put_with_ttl(key, value, ttl_secs).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), ErrorCode::ConnectionTimeout);
}
#[tokio::test]
#[traced_test]
async fn test_put_with_zero_ttl() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::write_success(),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "no_ttl_key".to_string().into_bytes();
let value = "no_ttl_value".to_string().into_bytes();
let ttl_secs = 0;
let result = client.put_with_ttl(key, value, ttl_secs).await;
assert!(result.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_put_failure() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::client_error(ErrorCode::ConnectionTimeout),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client
.put(
"test_key".to_string().into_bytes(),
"test_value".to_string().into_bytes(),
)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), ErrorCode::ConnectionTimeout);
}
#[tokio::test]
#[traced_test]
async fn test_get_success() {
let key = Bytes::from("test_key".to_string());
let value = Bytes::from("test_value".to_string());
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: key.clone(),
value: value.clone(),
}]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await;
println!("{:?}", &result);
assert!(result.is_ok());
assert_eq!(result.unwrap().as_ref().map(|r| &r.value), Some(&value));
}
#[tokio::test]
#[traced_test]
async fn test_get_not_found() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "nonexistent_key".to_string().into_bytes();
let result = client.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), None);
}
#[tokio::test]
#[traced_test]
async fn test_delete_success() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::write_success(),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "test_key".to_string().into_bytes();
let result = client.delete(key).await;
assert!(result.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_delete_failure() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_write_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::client_error(ErrorCode::ConnectionTimeout),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "test_key".to_string().into_bytes();
let result = client.delete(key).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), ErrorCode::ConnectionTimeout);
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_success_linear() {
let keys = vec![
Bytes::from("key1".to_string()),
Bytes::from("key2".to_string()),
Bytes::from("key3".to_string()),
];
let values = [
Bytes::from("value1".to_string()),
Bytes::from("value2".to_string()),
Bytes::from("value3".to_string()),
];
let mut client_results = Vec::new();
for (i, key) in keys.iter().enumerate() {
client_results.push(KvEntry {
key: key.clone(),
value: values[i].clone(),
});
}
let response = ClientResponse::read_results(client_results);
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response,
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client
.get_multi_with_policy(
keys.clone().into_iter(),
Some(ReadConsistencyPolicy::LinearizableRead),
)
.await;
assert!(result.is_ok());
let results = result.unwrap();
assert_eq!(results.len(), keys.len());
for (i, key) in keys.iter().enumerate() {
assert_eq!(results[i].as_ref().map(|r| &r.key), Some(&key.clone()));
assert_eq!(results[i].as_ref().map(|r| &r.value), Some(&values[i]));
}
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_success_non_linear() {
let keys = vec![
Bytes::from("key1"),
Bytes::from("key2"),
Bytes::from("key3"),
];
let values = [
Some(Bytes::from_owner("value1".to_string())),
Some(Bytes::from_owner("value2".to_string())),
None, ];
let mut client_results = Vec::new();
for (i, key) in keys.iter().enumerate() {
client_results.push(KvEntry {
key: key.clone(),
value: match &values[i] {
Some(value) => value.clone(),
None => Bytes::copy_from_slice(&[]), },
});
}
let response = ClientResponse::read_results(client_results);
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response,
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_multi_with_policy(keys.clone().into_iter(), None).await;
assert!(result.is_ok());
let results = result.unwrap();
assert_eq!(results.len(), keys.len());
for (i, key) in keys.iter().enumerate() {
assert_eq!(results[i].as_ref().map(|r| &r.key), Some(&key.clone()));
assert_eq!(
results[i].as_ref().map(|r| r.value.clone()),
values[i].clone().or(Some(Bytes::new()))
);
}
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_failure() {
let keys = vec!["key1".to_string()];
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::client_error(ErrorCode::ConnectionTimeout),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
for linear in [Some(ReadConsistencyPolicy::LinearizableRead), None] {
let result = client
.get_multi_with_policy(keys.clone().into_iter().map(|k| k.to_string()), linear)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), ErrorCode::ConnectionTimeout);
}
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_empty_keys() {
let response = ClientResponse::read_results(vec![]);
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response,
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let result = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config: ClientConfig::default(),
endpoints: vec![],
})))
.get_multi_with_policy(std::iter::empty::<String>(), None)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), ErrorCode::InvalidRequest);
}
#[tokio::test]
#[traced_test]
async fn test_get_linearizable_success() {
let key = Bytes::from("test_key".to_string());
let value = Bytes::from("test_value".to_string());
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: key.clone(),
value: value.clone(),
}]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_linearizable(key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
#[traced_test]
async fn test_get_lease_success() {
let key = Bytes::from("test_key".to_string());
let value = Bytes::from("test_value".to_string());
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: key.clone(),
value: value.clone(),
}]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_lease(key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
#[traced_test]
async fn test_get_eventual_success() {
let key = Bytes::from("test_key".to_string());
let value = Bytes::from("test_value".to_string());
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: key.clone(),
value: value.clone(),
}]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_eventual(key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_success() {
let keys = vec![
Bytes::from("key1".to_string()),
Bytes::from("key2".to_string()),
Bytes::from("key3".to_string()),
];
let values = [
Bytes::from("value1".to_string()),
Bytes::from("value2".to_string()),
Bytes::from("value3".to_string()),
];
let mut client_results = Vec::new();
for (i, key) in keys.iter().enumerate() {
client_results.push(KvEntry {
key: key.clone(),
value: values[i].clone(),
});
}
let response = ClientResponse::read_results(client_results);
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response,
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_multi(&keys).await;
assert!(result.is_ok());
let results = result.unwrap();
assert_eq!(results.len(), keys.len());
for (i, _key) in keys.iter().enumerate() {
assert_eq!(results[i].as_ref(), Some(&values[i]));
}
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_with_mixed_results() {
let keys = vec![
Bytes::from("key1".to_string()),
Bytes::from("key2".to_string()),
Bytes::from("key3".to_string()),
];
let client_results = vec![
KvEntry {
key: keys[0].clone(),
value: Bytes::from("value1".to_string()),
},
KvEntry {
key: keys[2].clone(),
value: Bytes::from("value3".to_string()),
},
];
let response = ClientResponse::read_results(client_results);
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response,
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.get_multi(&keys).await;
assert!(result.is_ok());
let results = result.unwrap();
assert_eq!(
results.len(),
keys.len(),
"Result count must match input key count"
);
assert_eq!(
results[0],
Some(Bytes::from("value1")),
"key1 should have value1"
);
assert_eq!(results[1], None, "key2 is missing, must be None");
assert_eq!(
results[2],
Some(Bytes::from("value3")),
"key3 should have value3"
);
}
#[tokio::test]
#[traced_test]
async fn test_get_consistency_methods_failure() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::client_error(ErrorCode::ConnectionTimeout),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let key = "test_key".to_string().into_bytes();
let methods: [(&str, ClientApiResult<Option<Bytes>>); 3] = [
(
"get_linearizable",
client.get_linearizable(key.clone()).await,
),
("get_lease", client.get_lease(key.clone()).await),
("get_eventual", client.get_eventual(key.clone()).await),
];
let get_result: ClientApiResult<Option<Bytes>> = client.get(key.clone()).await;
for (method_name, result) in methods {
assert!(result.is_err(), "{method_name} should fail",);
assert_eq!(
result.unwrap_err().code(),
ErrorCode::ConnectionTimeout,
"{method_name} should return ConnectionTimeout",
);
}
assert!(get_result.is_err(), "get should fail");
assert_eq!(
get_result.unwrap_err().code(),
ErrorCode::ConnectionTimeout,
"get should return ConnectionTimeout",
);
}
#[tokio::test]
#[traced_test]
async fn test_client_refresh_with_new_endpoints() {
let (_tx1, rx1) = oneshot::channel::<()>();
let (_channel1, port1) = MockNode::simulate_client_read_mock_server(
rx1,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
}]),
)
.await
.unwrap();
let initial_endpoints = vec![format!("http://localhost:{}", port1)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(initial_endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client_inner = Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config: config.clone(),
endpoints: initial_endpoints.clone(),
}));
let grpc_client = GrpcClient::new(client_inner.clone());
let client = Client {
inner: Arc::new(grpc_client),
};
let initial_inner = client.inner.client_inner.load();
assert_eq!(initial_inner.endpoints, initial_endpoints);
let (_tx2, rx2) = oneshot::channel::<()>();
let (_channel2, port2) = MockNode::simulate_client_read_mock_server(
rx2,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
}]),
)
.await
.unwrap();
let new_endpoints = vec![format!("http://localhost:{}", port2)];
let result = client.refresh(Some(new_endpoints.clone())).await;
assert!(result.is_ok(), "Refresh should succeed");
let refreshed_inner = client.inner.client_inner.load();
assert_eq!(refreshed_inner.endpoints, new_endpoints);
assert_eq!(refreshed_inner.client_id, 123); assert!(client.get("test_key").await.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_client_refresh_with_none_endpoints() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
}]),
)
.await
.unwrap();
let initial_endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(initial_endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client_inner = Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 456,
config: config.clone(),
endpoints: initial_endpoints.clone(),
}));
let grpc_client = GrpcClient::new(client_inner.clone());
let client = Client {
inner: Arc::new(grpc_client),
};
let initial_inner = client.inner.client_inner.load();
assert_eq!(initial_inner.endpoints, initial_endpoints);
let result = client.refresh(None).await;
assert!(result.is_ok(), "Refresh with None should succeed");
let refreshed_inner = client.inner.client_inner.load();
assert_eq!(refreshed_inner.endpoints, initial_endpoints);
assert_eq!(refreshed_inner.client_id, 456); assert!(client.get("test_key").await.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_client_refresh_with_multiple_endpoints() {
let (_tx1, rx1) = oneshot::channel::<()>();
let (_channel1, port1) = MockNode::simulate_client_read_mock_server(
rx1,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
}]),
)
.await
.unwrap();
let (_tx2, rx2) = oneshot::channel::<()>();
let (_channel2, port2) = MockNode::simulate_client_read_mock_server(
rx2,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
}]),
)
.await
.unwrap();
let initial_endpoints = vec![format!("http://localhost:{}", port1)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(initial_endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client_inner = Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 789,
config: config.clone(),
endpoints: initial_endpoints.clone(),
}));
let grpc_client = GrpcClient::new(client_inner.clone());
let client = Client {
inner: Arc::new(grpc_client),
};
let initial_inner = client.inner.client_inner.load();
assert_eq!(initial_inner.endpoints.len(), 1);
let multiple_endpoints = vec![
format!("http://localhost:{}", port1),
format!("http://localhost:{}", port2),
];
let result = client.refresh(Some(multiple_endpoints.clone())).await;
assert!(
result.is_ok(),
"Refresh with multiple endpoints should succeed"
);
let refreshed_inner = client.inner.client_inner.load();
assert_eq!(refreshed_inner.endpoints, multiple_endpoints);
assert_eq!(refreshed_inner.endpoints.len(), 2);
assert_eq!(refreshed_inner.client_id, 789); }
#[tokio::test]
#[traced_test]
async fn test_client_refresh_failure_invalid_endpoints() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
}]),
)
.await
.unwrap();
let initial_endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(initial_endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client_inner = Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 999,
config: config.clone(),
endpoints: initial_endpoints.clone(),
}));
let grpc_client = GrpcClient::new(client_inner.clone());
let client = Client {
inner: Arc::new(grpc_client),
};
let initial_inner = client.inner.client_inner.load();
assert_eq!(initial_inner.endpoints, initial_endpoints);
let invalid_endpoints = vec!["http://invalid-host:9999".to_string()];
let result = client.refresh(Some(invalid_endpoints)).await;
assert!(
result.is_err(),
"Refresh with invalid endpoints should fail"
);
let unchanged_inner = client.inner.client_inner.load();
assert_eq!(unchanged_inner.endpoints, initial_endpoints);
assert_eq!(unchanged_inner.client_id, 999);
}
#[tokio::test]
#[traced_test]
async fn test_client_refresh_preserves_kv_and_cluster_clients() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
}]),
)
.await
.unwrap();
let initial_endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(initial_endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client_inner = Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 111,
config: config.clone(),
endpoints: initial_endpoints.clone(),
}));
let grpc_client = GrpcClient::new(client_inner.clone());
let client = Client {
inner: Arc::new(grpc_client),
};
let (_tx2, rx2) = oneshot::channel::<()>();
let (_channel2, port2) = MockNode::simulate_client_read_mock_server(
rx2,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
}]),
)
.await
.unwrap();
let new_endpoints = vec![format!("http://localhost:{}", port2)];
let result = client.refresh(Some(new_endpoints)).await;
assert!(result.is_ok(), "Refresh should succeed");
assert!(client.get("test_key").await.is_ok());
assert!(client.list_members().await.is_ok());
let kv_result = client.get("test_key").await;
assert!(kv_result.is_ok() || kv_result.is_err());
let cluster_result = client.list_members().await;
assert!(cluster_result.is_ok() || cluster_result.is_err()); }
mod cas_operations {
use super::*;
#[tokio::test]
#[traced_test]
async fn test_cas_acquire_lock_success() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, true).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let lock_key = b"distributed_lock";
let owner = b"client_a";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner).await;
println!("CAS acquire lock result: {result:?}");
assert!(result.is_ok());
assert!(result.unwrap(), "CAS should succeed");
}
#[tokio::test]
#[traced_test]
async fn test_cas_lock_conflict() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, false).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let lock_key = b"distributed_lock";
let owner_b = b"client_b";
let result = client.compare_and_swap(lock_key, None::<&[u8]>, owner_b).await;
println!("CAS conflict result: {result:?}");
assert!(result.is_ok());
assert!(!result.unwrap(), "CAS should fail due to conflict");
}
#[tokio::test]
#[traced_test]
async fn test_cas_release_lock() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, true).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let lock_key = b"distributed_lock";
let owner = b"client_a";
let result = client.compare_and_swap(lock_key, Some(owner), b"").await;
println!("CAS release lock result: {result:?}");
assert!(result.is_ok());
assert!(result.unwrap(), "CAS should succeed - correct owner");
}
#[tokio::test]
#[traced_test]
async fn test_cas_prevent_wrong_release() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, false).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let lock_key = b"distributed_lock";
let wrong_owner = b"client_b";
let result = client.compare_and_swap(lock_key, Some(wrong_owner), b"").await;
println!("CAS wrong release result: {result:?}");
assert!(result.is_ok());
assert!(!result.unwrap(), "CAS should fail - wrong owner");
}
#[tokio::test]
#[traced_test]
async fn test_cas_edge_cases() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, true).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let result = client.compare_and_swap(b"empty_key", None::<&[u8]>, b"").await;
println!("CAS empty value result: {result:?}");
assert!(result.is_ok());
assert!(result.unwrap(), "CAS with empty value should succeed");
let large_value = vec![b'x'; 1024 * 1024]; let result = client.compare_and_swap(b"large_key", None::<&[u8]>, &large_value).await;
println!("CAS large value result: {result:?}");
assert!(result.is_ok());
assert!(result.unwrap(), "CAS with large value should succeed");
}
#[tokio::test]
#[traced_test]
async fn test_cas_nonexistent_key_success() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, true).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let nonexistent_key = b"does_not_exist";
let result = client.compare_and_swap(nonexistent_key, None::<&[u8]>, b"new_value").await;
println!("CAS nonexistent key result: {result:?}");
assert!(result.is_ok());
assert!(
result.unwrap(),
"CAS on non-existent key with None should succeed"
);
}
#[tokio::test]
#[traced_test]
async fn test_cas_nonexistent_key_failure() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_cas_mock_server(rx, false).await.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 123,
config,
endpoints,
})));
let nonexistent_key = b"does_not_exist";
let result = client.compare_and_swap(nonexistent_key, Some(b"wrong"), b"value").await;
println!("CAS nonexistent with wrong expected result: {result:?}");
assert!(result.is_ok());
assert!(
!result.unwrap(),
"CAS on non-existent key with Some(wrong) should fail"
);
}
}
mod watch_membership_tests {
use super::*;
use d_engine_proto::client::MembershipSnapshot;
use tokio_stream::StreamExt;
async fn make_client(port: u16) -> GrpcClient {
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 42,
config,
endpoints,
})))
}
#[tokio::test]
#[traced_test]
async fn test_watch_membership_returns_err_when_server_rejects() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_watch_membership_error_mock_server(
rx,
tonic::Status::permission_denied("membership watch not allowed"),
)
.await
.unwrap();
let client = make_client(port).await;
let result = client.watch_membership().await;
assert!(
result.is_err(),
"Expected Err when server rejects watch_membership"
);
}
#[tokio::test]
#[traced_test]
async fn test_watch_membership_receives_snapshots_in_order() {
let snapshots = vec![
MembershipSnapshot {
members: vec![1, 2, 3],
learners: vec![],
committed_index: 10,
},
MembershipSnapshot {
members: vec![1, 2, 3, 4],
learners: vec![],
committed_index: 11,
},
];
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) =
MockNode::simulate_watch_membership_mock_server(rx, snapshots.clone())
.await
.unwrap();
let client = make_client(port).await;
let mut stream = client.watch_membership().await.expect("watch_membership should succeed");
let s1 = stream.next().await.expect("expected first snapshot").expect("no error");
assert_eq!(s1.members, vec![1, 2, 3]);
assert_eq!(s1.committed_index, 10);
let s2 = stream.next().await.expect("expected second snapshot").expect("no error");
assert_eq!(s2.members, vec![1, 2, 3, 4]);
assert_eq!(s2.committed_index, 11);
assert!(
stream.next().await.is_none(),
"stream should close after all snapshots"
);
}
#[tokio::test]
#[traced_test]
async fn test_watch_membership_empty_stream_closes_cleanly() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) =
MockNode::simulate_watch_membership_mock_server(rx, vec![]).await.unwrap();
let client = make_client(port).await;
let mut stream = client.watch_membership().await.expect("watch_membership should succeed");
assert!(
stream.next().await.is_none(),
"empty membership stream should close immediately"
);
}
}
mod watch_tests {
use super::*;
use tokio_stream::StreamExt;
async fn make_client(port: u16) -> GrpcClient {
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 42,
config,
endpoints,
})))
}
#[tokio::test]
#[traced_test]
async fn test_watch_server_error() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_watch_error_mock_server(
rx,
tonic::Status::permission_denied("watch not allowed"),
)
.await
.unwrap();
let client = make_client(port).await;
let result = client.watch(b"my-key").await;
assert!(result.is_err(), "Expected error when server rejects watch");
}
#[tokio::test]
#[traced_test]
async fn test_watch_success_receives_events() {
let events = vec![
WatchResponse {
key: Bytes::from("lock"),
value: Bytes::from("owner-a"),
prev_value: Bytes::new(),
event_type: WatchEventType::Put as i32,
error: 0,
revision: 0,
},
WatchResponse {
key: Bytes::from("lock"),
value: Bytes::new(),
prev_value: Bytes::new(),
event_type: WatchEventType::Delete as i32,
error: 0,
revision: 0,
},
];
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) =
MockNode::simulate_watch_mock_server(rx, events.clone()).await.unwrap();
let client = make_client(port).await;
let mut stream = client.watch(b"lock").await.expect("watch should succeed");
let ev1 = stream.next().await.expect("expected first event").expect("no error");
assert_eq!(ev1.key, Bytes::from("lock"));
assert_eq!(ev1.value, Bytes::from("owner-a"));
assert_eq!(ev1.event_type, WatchEventType::Put as i32);
let ev2 = stream.next().await.expect("expected second event").expect("no error");
assert_eq!(ev2.key, Bytes::from("lock"));
assert_eq!(ev2.event_type, WatchEventType::Delete as i32);
assert!(
stream.next().await.is_none(),
"stream should be closed after all events"
);
}
#[tokio::test]
#[traced_test]
async fn test_watch_empty_stream() {
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_watch_mock_server(rx, vec![]).await.unwrap();
let client = make_client(port).await;
let mut stream = client.watch(b"no-changes").await.expect("watch should succeed");
assert!(
stream.next().await.is_none(),
"empty event stream should close immediately"
);
}
}
mod scan_tests {
use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use d_engine_core::ClientApi;
use tokio::sync::oneshot;
use tracing_test::traced_test;
use crate::mock_rpc_service::MockNode;
use crate::{ClientConfig, ClientInner, ConnectionPool, GrpcClient};
async fn make_client(port: u16) -> GrpcClient {
let endpoints = vec![format!("http://localhost:{port}")];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone())
.await
.expect("Should create connection pool");
GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 42,
config,
endpoints,
})))
}
#[tokio::test]
#[traced_test]
async fn test_scan_prefix_success() {
use d_engine_proto::client::{KvEntry, ScanResponse};
let (_tx, rx) = oneshot::channel::<()>();
let response = ScanResponse {
entries: vec![
KvEntry {
key: Bytes::from("/services/node1"),
value: Bytes::from("10.0.0.1"),
},
KvEntry {
key: Bytes::from("/services/node2"),
value: Bytes::from("10.0.0.2"),
},
],
revision: 7,
};
let (_channel, port) = MockNode::simulate_scan_mock_server(rx, response).await.unwrap();
let client = make_client(port).await;
let result = client.scan_prefix(b"/services/").await.expect("scan should succeed");
assert_eq!(result.revision, 7);
assert_eq!(result.entries.len(), 2);
let map: std::collections::HashMap<_, _> = result.entries.into_iter().collect();
assert_eq!(
map.get(&Bytes::from("/services/node1")),
Some(&Bytes::from("10.0.0.1"))
);
assert_eq!(
map.get(&Bytes::from("/services/node2")),
Some(&Bytes::from("10.0.0.2"))
);
}
#[tokio::test]
#[traced_test]
async fn test_scan_prefix_empty_result() {
use d_engine_proto::client::ScanResponse;
let (_tx, rx) = oneshot::channel::<()>();
let response = ScanResponse {
entries: vec![],
revision: 3,
};
let (_channel, port) = MockNode::simulate_scan_mock_server(rx, response).await.unwrap();
let client = make_client(port).await;
let result = client.scan_prefix(b"/missing/").await.expect("scan should succeed");
assert_eq!(result.revision, 3);
assert!(result.entries.is_empty());
}
}
#[tokio::test]
#[traced_test]
async fn test_get_with_policy_returns_native_kv_entry() {
let key = Bytes::from("kv_key");
let value = Bytes::from("kv_value");
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![KvEntry {
key: key.clone(),
value: value.clone(),
}]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone()).await.unwrap();
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 1,
config,
endpoints,
})));
let entry: Option<KvEntry> = client
.get_with_policy(key.clone(), Some(ReadConsistencyPolicy::LinearizableRead))
.await
.unwrap();
assert_eq!(entry, Some(KvEntry { key, value }));
}
#[tokio::test]
#[traced_test]
async fn test_get_multi_with_policy_returns_native_kv_entries() {
let k1 = Bytes::from("mk1");
let v1 = Bytes::from("mv1");
let k2 = Bytes::from("mk2");
let v2 = Bytes::from("mv2");
let (_tx, rx) = oneshot::channel::<()>();
let (_channel, port) = MockNode::simulate_client_read_mock_server(
rx,
None::<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
ClientResponse::read_results(vec![
KvEntry {
key: k1.clone(),
value: v1.clone(),
},
KvEntry {
key: k2.clone(),
value: v2.clone(),
},
]),
)
.await
.unwrap();
let endpoints = vec![format!("http://localhost:{}", port)];
let config = ClientConfig::default();
let pool = ConnectionPool::create(endpoints.clone(), config.clone()).await.unwrap();
let client = GrpcClient::new(Arc::new(ArcSwap::from_pointee(ClientInner {
pool,
client_id: 1,
config,
endpoints,
})));
let results: Vec<Option<KvEntry>> = client
.get_multi_with_policy(vec![k1.clone(), k2.clone()].into_iter(), None)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0], Some(KvEntry { key: k1, value: v1 }));
assert_eq!(results[1], Some(KvEntry { key: k2, value: v2 }));
}