#![cfg(feature = "distributed-cache")]
use std::time::{Duration, Instant};
use oxirs_arq::cache::{
CacheCoherenceProtocol, CacheKey, CacheValue, CoherenceConfig, CoherenceProtocol,
ConsistencyLevel, DistributedCache, DistributedCacheConfig,
};
async fn is_redis_available() -> bool {
match redis::Client::open("redis://localhost:6379") {
Ok(client) => client.get_multiplexed_async_connection().await.is_ok(),
Err(_) => false,
}
}
async fn setup_cache() -> Option<DistributedCache> {
if !is_redis_available().await {
eprintln!("Redis not available, skipping test");
return None;
}
let config = DistributedCacheConfig {
l1_max_size: 100,
l1_ttl_seconds: 10,
l2_redis_url: "redis://localhost:6379".to_string(),
l2_ttl_seconds: 30,
compression: true,
invalidation_channel: format!("oxirs:test:{}", uuid::Uuid::new_v4()),
};
match DistributedCache::new(config).await {
Ok(cache) => Some(cache),
Err(e) => {
eprintln!("Failed to create cache: {:?}", e);
None
}
}
}
async fn setup_multi_cache(count: usize) -> Option<Vec<DistributedCache>> {
if !is_redis_available().await {
eprintln!("Redis not available, skipping test");
return None;
}
let channel = format!("oxirs:test:{}", uuid::Uuid::new_v4());
let mut caches = Vec::new();
for _ in 0..count {
let config = DistributedCacheConfig {
l1_max_size: 100,
l1_ttl_seconds: 10,
l2_redis_url: "redis://localhost:6379".to_string(),
l2_ttl_seconds: 30,
compression: true,
invalidation_channel: channel.clone(),
};
match DistributedCache::new(config).await {
Ok(cache) => caches.push(cache),
Err(e) => {
eprintln!("Failed to create cache: {:?}", e);
return None;
}
}
}
Some(caches)
}
#[tokio::test]
async fn test_l1_hit() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("query1".to_string());
let value = CacheValue::new(vec![1, 2, 3, 4, 5]);
cache.put(key.clone(), value.clone()).await.unwrap();
let start = Instant::now();
let result = cache.get(&key).await.unwrap();
let elapsed = start.elapsed();
assert!(result.is_some());
assert_eq!(result.unwrap().data, value.data);
assert!(elapsed.as_millis() < 10);
assert!(cache.metrics().l1_hits.get() > 0);
}
#[tokio::test]
async fn test_l2_hit() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("query2".to_string());
let value = CacheValue::new(vec![10, 20, 30, 40, 50]);
cache.put(key.clone(), value.clone()).await.unwrap();
cache.clear_l1();
let result = cache.get(&key).await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().data, value.data);
assert!(cache.metrics().l2_hits.get() > 0);
let result2 = cache.get(&key).await.unwrap();
assert!(result2.is_some());
assert!(cache.metrics().l1_hits.get() > 0);
}
#[tokio::test]
async fn test_cache_miss() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("nonexistent".to_string());
let result = cache.get(&key).await.unwrap();
assert!(result.is_none());
assert!(cache.metrics().l1_misses.get() > 0);
assert!(cache.metrics().l2_misses.get() > 0);
}
#[tokio::test]
async fn test_put_both_levels() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("query3".to_string());
let value = CacheValue::new(vec![100; 1000]);
cache.put(key.clone(), value.clone()).await.unwrap();
assert_eq!(cache.l1_size(), 1);
cache.clear_l1();
let result = cache.get(&key).await.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn test_invalidation() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("query4".to_string());
let value = CacheValue::new(vec![1, 2, 3]);
cache.put(key.clone(), value.clone()).await.unwrap();
assert!(cache.get(&key).await.unwrap().is_some());
cache.invalidate(&key).await.unwrap();
assert!(cache.get(&key).await.unwrap().is_none());
assert!(cache.metrics().invalidations_sent.get() > 0);
}
#[tokio::test]
async fn test_pubsub_invalidation() {
let caches = match setup_multi_cache(3).await {
Some(c) => c,
None => return,
};
for cache in &caches {
cache.start_invalidation_listener().await.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
let key = CacheKey::new("query5".to_string());
let value = CacheValue::new(vec![1, 2, 3]);
for cache in &caches {
cache.put(key.clone(), value.clone()).await.unwrap();
}
caches[0].invalidate(&key).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
for (idx, cache) in caches.iter().enumerate() {
let result = cache.get(&key).await.unwrap();
if idx == 0 {
assert!(result.is_none(), "Cache {} should not have the key", idx);
}
}
assert!(caches[0].metrics().invalidations_sent.get() > 0);
}
#[tokio::test]
async fn test_compression() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("large_value".to_string());
let value = CacheValue::new(vec![0u8; 1024 * 1024]);
cache.put(key.clone(), value.clone()).await.unwrap();
let result = cache.get(&key).await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().data.len(), value.data.len());
let ratio = *cache.metrics().compression_ratio.read();
assert!(
ratio > 1.0,
"Compression ratio should be > 1.0, got {}",
ratio
);
}
#[tokio::test]
async fn test_hit_rates() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
for i in 0..50 {
let key = CacheKey::new(format!("query_{}", i));
let value = CacheValue::new(vec![i as u8; 10]);
cache.put(key, value).await.unwrap();
}
for i in 0..40 {
let key = CacheKey::new(format!("query_{}", i));
cache.get(&key).await.unwrap();
}
let l1_hit_rate = cache.metrics().l1_hit_rate();
assert!(
l1_hit_rate > 0.8,
"L1 hit rate is {:.2}, expected > 0.8",
l1_hit_rate
);
}
#[tokio::test]
async fn test_latency() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key = CacheKey::new("latency_test".to_string());
let value = CacheValue::new(vec![1, 2, 3, 4, 5]);
cache.put(key.clone(), value.clone()).await.unwrap();
let mut l1_times = Vec::new();
for _ in 0..10 {
let start = Instant::now();
cache.get(&key).await.unwrap();
l1_times.push(start.elapsed());
}
let avg_l1 = l1_times.iter().sum::<Duration>() / l1_times.len() as u32;
println!("Average L1 latency: {:?}", avg_l1);
assert!(avg_l1.as_millis() < 10);
cache.clear_l1();
let mut l2_times = Vec::new();
for _ in 0..10 {
cache.clear_l1(); let start = Instant::now();
cache.get(&key).await.unwrap();
l2_times.push(start.elapsed());
}
let avg_l2 = l2_times.iter().sum::<Duration>() / l2_times.len() as u32;
println!("Average L2 latency: {:?}", avg_l2);
assert!(avg_l2.as_millis() < 50); }
#[tokio::test]
async fn test_multi_node_consistency() {
let caches = match setup_multi_cache(3).await {
Some(c) => c,
None => return,
};
for cache in &caches {
cache.start_invalidation_listener().await.unwrap();
}
tokio::time::sleep(Duration::from_millis(100)).await;
let key = CacheKey::new("shared_query".to_string());
let value1 = CacheValue::new(vec![1, 2, 3]);
let value2 = CacheValue::new(vec![4, 5, 6]);
caches[0].put(key.clone(), value1.clone()).await.unwrap();
caches[1].put(key.clone(), value2.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
for cache in &caches {
cache.clear_l1();
}
let results: Vec<_> =
futures::future::join_all(caches.iter().map(|cache| cache.get(&key))).await;
let values_count = results
.iter()
.filter(|r| r.as_ref().ok().and_then(|v| v.as_ref()).is_some())
.count();
assert!(
values_count >= 2,
"Expected at least 2 nodes to have the value"
);
}
#[tokio::test]
async fn test_coherence_verification() {
let caches = match setup_multi_cache(3).await {
Some(c) => c,
None => return,
};
for i in 0..10 {
let key = CacheKey::new(format!("coherence_test_{}", i));
let value = CacheValue::new(vec![i as u8; 10]);
for cache in &caches {
cache.put(key.clone(), value.clone()).await.unwrap();
}
}
let config = CoherenceConfig {
consistency_level: ConsistencyLevel::Eventual,
max_staleness_seconds: 60,
};
let protocol = CacheCoherenceProtocol::new(CoherenceProtocol::PubSub, config);
let cache_refs: Vec<&DistributedCache> = caches.iter().collect();
let report = protocol.verify_coherence(&cache_refs).await.unwrap();
println!("Coherence report: {}", report.summary());
assert!(
report.coherence_rate >= 0.8,
"Coherence rate should be >= 0.8, got {}",
report.coherence_rate
);
}
#[tokio::test]
async fn test_cache_key_namespaces() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let key1 = CacheKey::with_namespace("query1".to_string(), "tenant1".to_string());
let key2 = CacheKey::with_namespace("query1".to_string(), "tenant2".to_string());
let value1 = CacheValue::new(vec![1, 2, 3]);
let value2 = CacheValue::new(vec![4, 5, 6]);
cache.put(key1.clone(), value1.clone()).await.unwrap();
cache.put(key2.clone(), value2.clone()).await.unwrap();
let result1 = cache.get(&key1).await.unwrap();
let result2 = cache.get(&key2).await.unwrap();
assert_eq!(result1.unwrap().data, value1.data);
assert_eq!(result2.unwrap().data, value2.data);
}
#[tokio::test]
async fn test_l1_expiration() {
if !is_redis_available().await {
eprintln!("Redis not available, skipping test");
return;
}
let config = DistributedCacheConfig {
l1_max_size: 100,
l1_ttl_seconds: 1, l2_redis_url: "redis://localhost:6379".to_string(),
l2_ttl_seconds: 30,
compression: false,
invalidation_channel: format!("oxirs:test:{}", uuid::Uuid::new_v4()),
};
let cache = match DistributedCache::new(config).await {
Ok(c) => c,
Err(_) => return,
};
let key = CacheKey::new("expiring_key".to_string());
let value = CacheValue::new(vec![1, 2, 3]);
let put_result = tokio::time::timeout(
Duration::from_secs(2),
cache.put(key.clone(), value.clone()),
)
.await;
assert!(put_result.is_ok(), "Put operation timed out");
put_result.unwrap().unwrap();
let get_result = tokio::time::timeout(Duration::from_millis(500), cache.get(&key)).await;
assert!(get_result.is_ok(), "First get operation timed out");
assert!(get_result.unwrap().unwrap().is_some());
tokio::time::sleep(Duration::from_millis(1100)).await;
cache.clear_l1();
let l2_result = tokio::time::timeout(Duration::from_secs(2), cache.get(&key)).await;
assert!(
l2_result.is_ok(),
"L2 get operation timed out - Redis may be hanging"
);
let result = l2_result.unwrap().unwrap();
assert!(result.is_some(), "L2 should still have the value");
}
#[tokio::test]
async fn test_concurrent_access() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let cache = std::sync::Arc::new(cache);
let mut handles = Vec::new();
for i in 0..10 {
let cache = cache.clone();
let handle = tokio::spawn(async move {
let key = CacheKey::new(format!("concurrent_{}", i));
let value = CacheValue::new(vec![i as u8; 100]);
cache.put(key.clone(), value.clone()).await.unwrap();
for _ in 0..10 {
let result = cache.get(&key).await.unwrap();
assert!(result.is_some());
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert!(cache.l1_size() > 0);
}
#[tokio::test]
async fn bench_distributed_cache() {
let cache = match setup_cache().await {
Some(c) => c,
None => return,
};
let num_queries = 1000;
for i in 0..num_queries {
let key = CacheKey::new(format!("bench_query_{}", i));
let value = CacheValue::new(vec![i as u8; 100]);
cache.put(key, value).await.unwrap();
}
let start = Instant::now();
for i in 0..num_queries {
let key = CacheKey::new(format!("bench_query_{}", i));
let _val = cache.get(&key).await.unwrap();
}
let elapsed = start.elapsed();
let l1_hit_rate = cache.metrics().l1_hit_rate();
let throughput = num_queries as f64 / elapsed.as_secs_f64();
println!("Benchmark results:");
println!(" L1 hit rate: {:.2}%", l1_hit_rate * 100.0);
println!(" Throughput: {:.0} ops/sec", throughput);
println!(
" Average latency: {:.2}ms",
elapsed.as_millis() as f64 / num_queries as f64
);
assert!(
l1_hit_rate > 0.8,
"L1 hit rate should be > 80%, got {:.2}%",
l1_hit_rate * 100.0
);
assert!(
elapsed.as_millis() < num_queries * 2,
"Average latency should be < 2ms per operation"
);
}