#![allow(deprecated)]
use crate::common;
use common::setup_logging;
use oxcache::backend::l1::L1Backend;
use oxcache::backend::l2::L2Backend;
use oxcache::client::two_level::TwoLevelClient;
use oxcache::config::{L2Config, TwoLevelConfig};
use oxcache::serialization::SerializerEnum;
use oxcache::CacheOps;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
#[tokio::test]
async fn test_single_flight_deduplication() {
setup_logging();
if !common::is_redis_available().await {
println!("Skipping test_single_flight_deduplication because Redis is not available");
return;
}
let service_name = common::generate_unique_service_name("single_flight");
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let l2_config = L2Config {
connection_string: redis_url.into(),
connection_timeout_ms: 10000, ..Default::default()
};
let l2 = Arc::new(
L2Backend::new(&l2_config)
.await
.expect("Failed to create L2 backend"),
);
let _ = l2.delete("hot_key").await;
let l1 = Arc::new(L1Backend::new(1000));
let config = TwoLevelConfig {
promote_on_hit: true,
enable_batch_write: false,
batch_size: 100,
batch_interval_ms: 100,
invalidation_channel: None,
bloom_filter: None,
warmup: None,
max_key_length: Some(1024),
max_value_size: Some(1024 * 1024),
};
let client = Arc::new(
TwoLevelClient::new(
service_name.clone(),
config,
l1.clone(),
l2.clone(),
SerializerEnum::Json(oxcache::serialization::json::JsonSerializer::new()),
)
.await
.expect("Failed to create client"),
);
l2.set_with_version("hot_key", b"hot_value".to_vec(), None)
.await
.expect("Failed to set L2 value");
let concurrency = 50;
let barrier = Arc::new(Barrier::new(concurrency));
let mut handles = vec![];
for _ in 0..concurrency {
let c = client.clone();
let b = barrier.clone();
handles.push(tokio::spawn(async move {
b.wait().await;
c.get_bytes("hot_key").await
}));
}
let mut success_count = 0;
for handle in handles {
if let Ok(Ok(Some(val))) = handle.await {
if val == b"hot_value" {
success_count += 1;
}
}
}
assert_eq!(success_count, concurrency, "All requests should succeed");
tokio::time::sleep(Duration::from_millis(500)).await;
let l1_val = l1.get_with_metadata("hot_key").await.unwrap();
assert!(l1_val.is_some(), "L1 should be populated after L2 hit");
assert_eq!(l1_val.unwrap().0, b"hot_value");
let _ = l2.delete("hot_key").await;
common::cleanup_service(&service_name).await;
}