use super::*;
use crate::cache::CacheRead;
use crate::cache::disk::DiskCache;
use crate::cache::readonly::ReadOnlyStorage;
use crate::config::Config;
use crate::config::PreprocessorCacheModeConfig;
use bytes::Bytes;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use tempfile::Builder as TempBuilder;
use tokio::runtime::Builder as RuntimeBuilder;
use tokio::sync::Mutex;
use tokio::time::sleep;
#[test]
fn test_multi_level_storage_get() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir1 = TempBuilder::new()
.prefix("sccache_test_l1_")
.tempdir()
.unwrap();
let cache_dir1 = tempdir1.path().join("cache");
fs::create_dir(&cache_dir1).unwrap();
let tempdir2 = TempBuilder::new()
.prefix("sccache_test_l2_")
.tempdir()
.unwrap();
let cache_dir2 = tempdir2.path().join("cache");
fs::create_dir(&cache_dir2).unwrap();
let cache1 = DiskCache::new(
&cache_dir1,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
);
let cache2 = DiskCache::new(
&cache_dir2,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
);
let cache1_storage: Arc<dyn Storage> = Arc::new(cache1);
let cache2_storage: Arc<dyn Storage> = Arc::new(cache2);
let storage = MultiLevelStorage::new(vec![
Arc::clone(&cache1_storage),
Arc::clone(&cache2_storage),
]);
runtime.block_on(async {
{
let entry = CacheWrite::default();
cache2_storage.put("test_key", entry).await.unwrap();
}
match storage.get("test_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Expected cache hit at level 2"),
}
match storage.get("nonexistent").await.unwrap() {
Cache::Miss => {
}
_ => panic!("Expected cache miss"),
}
});
}
#[test]
fn test_multi_level_storage_backfill_on_hit() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir1 = TempBuilder::new()
.prefix("sccache_test_bf_l1_")
.tempdir()
.unwrap();
let cache_dir1 = tempdir1.path().join("cache");
fs::create_dir(&cache_dir1).unwrap();
let tempdir2 = TempBuilder::new()
.prefix("sccache_test_bf_l2_")
.tempdir()
.unwrap();
let cache_dir2 = tempdir2.path().join("cache");
fs::create_dir(&cache_dir2).unwrap();
let cache1 = DiskCache::new(
&cache_dir1,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
);
let cache2 = DiskCache::new(
&cache_dir2,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
);
let cache1_storage: Arc<dyn Storage> = Arc::new(cache1);
let cache2_storage: Arc<dyn Storage> = Arc::new(cache2);
let storage = MultiLevelStorage::new(vec![
Arc::clone(&cache1_storage),
Arc::clone(&cache2_storage),
]);
runtime.block_on(async {
{
let entry = CacheWrite::default();
cache2_storage.put("backfill_key", entry).await.unwrap();
}
match cache1_storage.get("backfill_key").await.unwrap() {
Cache::Miss => {
}
_ => panic!("Level 1 should be empty"),
}
match storage.get("backfill_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Expected cache hit at level 2"),
}
sleep(Duration::from_millis(200)).await;
match cache1_storage.get("backfill_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Level 1 should now have the data (backfilled)"),
}
});
}
struct InMemoryStorage {
data: Arc<Mutex<HashMap<String, Vec<u8>>>>,
access_log: Arc<Mutex<Vec<String>>>,
}
impl InMemoryStorage {
fn new() -> Self {
Self {
data: Arc::new(Mutex::new(HashMap::new())),
access_log: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_access_log(&self) -> Arc<Mutex<Vec<String>>> {
Arc::clone(&self.access_log)
}
}
#[async_trait]
impl Storage for InMemoryStorage {
async fn get(&self, key: &str) -> Result<Cache> {
self.access_log.lock().await.push(format!("get:{}", key));
let data = self.data.lock().await;
match data.get(key) {
Some(bytes) => {
let cursor = Cursor::new(bytes.clone());
match CacheRead::from(cursor) {
Ok(hit) => Ok(Cache::Hit(hit)),
Err(_) => Ok(Cache::Miss),
}
}
None => Ok(Cache::Miss),
}
}
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
self.access_log.lock().await.push(format!("put:{}", key));
let data = entry.finish()?;
self.data.lock().await.insert(key.to_string(), data);
Ok(Duration::ZERO)
}
async fn check(&self) -> Result<CacheMode> {
Ok(CacheMode::ReadWrite)
}
fn location(&self) -> String {
"InMemory".to_string()
}
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn get_raw(&self, key: &str) -> Result<Option<Bytes>> {
Ok(self.data.lock().await.get(key).cloned().map(Bytes::from))
}
async fn put_raw(&self, key: &str, data: Bytes) -> Result<Duration> {
self.data
.lock()
.await
.insert(key.to_string(), data.to_vec());
Ok(Duration::ZERO)
}
}
#[test]
fn test_disk_plus_remote_to_remote_backfill() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir = TempBuilder::new()
.prefix("sccache_test_multilevel_")
.tempdir()
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
let disk_cache = Arc::new(DiskCache::new(
&cache_dir,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
));
let remote_l1 = Arc::new(InMemoryStorage::new()); let remote_l2 = Arc::new(InMemoryStorage::new()); let remote_l3 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
disk_cache.clone() as Arc<dyn Storage>,
remote_l1.clone() as Arc<dyn Storage>,
remote_l2.clone() as Arc<dyn Storage>,
remote_l3.clone() as Arc<dyn Storage>,
]);
runtime.block_on(async {
{
let entry = CacheWrite::default();
remote_l3.put("global_key", entry).await.unwrap();
}
assert!(matches!(
disk_cache.get("global_key").await.unwrap(),
Cache::Miss
));
assert!(matches!(
remote_l1.get("global_key").await.unwrap(),
Cache::Miss
));
assert!(matches!(
remote_l2.get("global_key").await.unwrap(),
Cache::Miss
));
match storage.get("global_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Expected cache hit at L3"),
}
sleep(Duration::from_millis(400)).await;
match disk_cache.get("global_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Disk cache should be backfilled from L3"),
}
match remote_l1.get("global_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Remote L1 should be backfilled from L3"),
}
match remote_l2.get("global_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Remote L2 should be backfilled from L3"),
}
match storage.get("global_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Should hit at disk cache (L0)"),
}
});
}
#[test]
fn test_disk_plus_remotes_write_to_all() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir = TempBuilder::new()
.prefix("sccache_test_write_all_")
.tempdir()
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
let disk_cache = Arc::new(DiskCache::new(
&cache_dir,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
));
let remote_l1 = Arc::new(InMemoryStorage::new());
let remote_l2 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
disk_cache.clone() as Arc<dyn Storage>,
remote_l1.clone() as Arc<dyn Storage>,
remote_l2.clone() as Arc<dyn Storage>,
]);
runtime.block_on(async {
{
let entry = CacheWrite::default();
storage.put("write_test_key", entry).await.unwrap();
}
sleep(Duration::from_millis(200)).await;
match disk_cache.get("write_test_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Disk cache should have data after put"),
}
match remote_l1.get("write_test_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Remote L1 should have data after put"),
}
match remote_l2.get("write_test_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Remote L2 should have data after put"),
}
});
}
#[test]
fn test_remote_to_remote_backfill() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let cache_l2 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1.clone() as Arc<dyn Storage>,
cache_l2.clone() as Arc<dyn Storage>,
]);
runtime.block_on(async {
{
let entry = CacheWrite::default();
cache_l2.put("remote_key", entry).await.unwrap();
}
match cache_l0.get("remote_key").await.unwrap() {
Cache::Miss => {}
_ => panic!("L0 should be empty initially"),
}
match cache_l1.get("remote_key").await.unwrap() {
Cache::Miss => {}
_ => panic!("L1 should be empty initially"),
}
match storage.get("remote_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("Expected cache hit at L2"),
}
sleep(Duration::from_millis(300)).await;
match cache_l0.get("remote_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("L0 should be backfilled from L2"),
}
match cache_l1.get("remote_key").await.unwrap() {
Cache::Hit(_) => {
}
_ => panic!("L1 should be backfilled from L2"),
}
});
}
#[test]
#[serial_test::serial(multilevel_env)]
fn test_config_validation_invalid_level_name() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
unsafe {
env::set_var("SCCACHE_MULTILEVEL_CHAIN", "disk,invalid_backend,s3");
env::set_var("SCCACHE_DIR", "/tmp/test-cache");
}
let config = Config::load().unwrap();
let result = MultiLevelStorage::from_config(&config, runtime.handle());
assert!(result.is_err());
if let Err(e) = result {
let err_msg = format!("{}", e);
assert!(err_msg.contains("Unknown cache level") || err_msg.contains("invalid_backend"));
}
unsafe {
env::remove_var("SCCACHE_MULTILEVEL_CHAIN");
env::remove_var("SCCACHE_DIR");
}
}
#[test]
fn test_config_validation_empty_levels() {
let storage = MultiLevelStorage::new(vec![]);
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
runtime.block_on(async {
match storage.get("test_key").await.unwrap() {
Cache::Miss => {} _ => panic!("Empty levels should always miss"),
}
});
}
#[test]
fn test_config_validation_single_level() {
let cache = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![cache.clone() as Arc<dyn Storage>]);
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
runtime.block_on(async {
let entry = CacheWrite::default();
storage.put("single_key", entry).await.unwrap();
match storage.get("single_key").await.unwrap() {
Cache::Hit(_) => {} _ => panic!("Single level should work as passthrough"),
}
match cache.get("single_key").await.unwrap() {
Cache::Hit(_) => {} _ => panic!("Data should be in the single level"),
}
});
}
#[test]
#[serial_test::serial(multilevel_env)]
fn test_config_level_not_configured() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
unsafe {
env::set_var("SCCACHE_MULTILEVEL_CHAIN", "redis");
env::remove_var("SCCACHE_REDIS");
env::remove_var("SCCACHE_REDIS_ENDPOINT");
}
let config = Config::load().unwrap();
let result = MultiLevelStorage::from_config(&config, runtime.handle());
assert!(result.is_err());
if let Err(e) = result {
let err_msg = format!("{}", e);
assert!(
err_msg.contains("not configured")
|| err_msg.contains("missing")
|| err_msg.contains("requires"),
"Expected error about missing config or feature, got: {}",
err_msg
);
}
unsafe {
env::remove_var("SCCACHE_MULTILEVEL_CHAIN");
}
}
#[test]
fn test_concurrent_reads() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(4)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let cache_l2 = Arc::new(InMemoryStorage::new());
let storage = Arc::new(MultiLevelStorage::new(vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1.clone() as Arc<dyn Storage>,
cache_l2.clone() as Arc<dyn Storage>,
]));
runtime.block_on(async {
cache_l0.put("key_l0", CacheWrite::default()).await.unwrap();
cache_l1.put("key_l1", CacheWrite::default()).await.unwrap();
cache_l2.put("key_l2", CacheWrite::default()).await.unwrap();
let storage1 = Arc::clone(&storage);
let storage2 = Arc::clone(&storage);
let storage3 = Arc::clone(&storage);
let (r1, r2, r3) = tokio::join!(
async move { storage1.get("key_l0").await },
async move { storage2.get("key_l1").await },
async move { storage3.get("key_l2").await },
);
assert!(matches!(r1.unwrap(), Cache::Hit(_)));
assert!(matches!(r2.unwrap(), Cache::Hit(_)));
assert!(matches!(r3.unwrap(), Cache::Hit(_)));
});
}
#[test]
fn test_concurrent_write_and_read() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(4)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = Arc::new(MultiLevelStorage::new(vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1.clone() as Arc<dyn Storage>,
]));
runtime.block_on(async {
let storage_write = Arc::clone(&storage);
let storage_read = Arc::clone(&storage);
let write_task = tokio::spawn(async move {
storage_write
.put("concurrent_key", CacheWrite::default())
.await
});
let read_task = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
storage_read.get("concurrent_key").await
});
let (write_result, read_result) = tokio::join!(write_task, read_task);
write_result.unwrap().unwrap();
match read_result.unwrap().unwrap() {
Cache::Hit(_) | Cache::Miss => {} _ => panic!("Unexpected cache result"),
}
});
}
#[test]
fn test_large_data_handling() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1.clone() as Arc<dyn Storage>,
]);
runtime.block_on(async {
let mut entry = CacheWrite::new();
let large_data = vec![0xAB; 1024 * 1024]; entry.put_stdout(&large_data).unwrap();
cache_l1.put("large_key", entry).await.unwrap();
match storage.get("large_key").await.unwrap() {
Cache::Hit(_) => {}
_ => panic!("Should hit at L1"),
}
sleep(Duration::from_millis(200)).await;
match cache_l0.get("large_key").await.unwrap() {
Cache::Hit(_) => {} _ => panic!("L0 should have backfilled data from L1"),
}
});
}
#[test]
fn test_storage_trait_methods() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
cache_l0 as Arc<dyn Storage>,
cache_l1 as Arc<dyn Storage>,
]);
runtime.block_on(async {
match storage.check().await.unwrap() {
CacheMode::ReadWrite => {} _ => panic!("Expected ReadWrite mode"),
}
let location = storage.location();
assert!(
location.contains("Multi-level"),
"Location should mention Multi-level: {}",
location
);
let _ = storage.current_size().await.unwrap();
let _ = storage.max_size().await.unwrap();
});
}
#[test]
fn test_all_levels_fail_on_put() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(ReadOnlyStorage(Arc::new(InMemoryStorage::new())));
let cache_l1 = Arc::new(ReadOnlyStorage(Arc::new(InMemoryStorage::new())));
let storage = MultiLevelStorage::new(vec![
cache_l0 as Arc<dyn Storage>,
cache_l1 as Arc<dyn Storage>,
]);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("fail_key", entry).await;
assert!(result.is_ok(), "Put should succeed with read-only levels");
});
}
#[test]
fn test_preprocessor_cache_mode() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir = TempBuilder::new()
.prefix("sccache_test_preprocessor_")
.tempdir()
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
let preprocessor_config = PreprocessorCacheModeConfig {
use_preprocessor_cache_mode: true,
..Default::default()
};
let disk_cache = Arc::new(DiskCache::new(
&cache_dir,
1024 * 1024 * 100,
runtime.handle(),
preprocessor_config,
CacheMode::ReadWrite,
vec![],
));
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::new(vec![
disk_cache as Arc<dyn Storage>,
cache_l1 as Arc<dyn Storage>,
]);
let config = storage.preprocessor_cache_mode_config();
assert!(config.use_preprocessor_cache_mode);
}
#[test]
fn test_empty_levels_new() {
let storage = MultiLevelStorage::new(vec![]);
assert_eq!(storage.levels.len(), 0);
let location = storage.location();
assert!(location.contains("0"));
}
#[test]
fn test_preprocessor_cache_methods() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir = TempBuilder::new()
.prefix("sccache_test_prep_")
.tempdir()
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
let disk_cache = Arc::new(DiskCache::new(
&cache_dir,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
));
let storage = MultiLevelStorage::new(vec![disk_cache as Arc<dyn Storage>]);
runtime.block_on(async {
let result = storage.get_preprocessor_cache_entry("test_key").await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
let entry = PreprocessorCacheEntry::default();
let result = storage
.put_preprocessor_cache_entry("test_key", entry)
.await;
assert!(result.is_ok());
});
}
#[test]
fn test_readonly_level_in_check() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let tempdir = TempBuilder::new()
.prefix("sccache_test_ro_")
.tempdir()
.unwrap();
let cache_dir = tempdir.path().join("cache");
fs::create_dir(&cache_dir).unwrap();
let disk_cache = DiskCache::new(
&cache_dir,
1024 * 1024 * 100,
runtime.handle(),
PreprocessorCacheModeConfig::default(),
CacheMode::ReadWrite,
vec![],
);
let ro_cache = Arc::new(ReadOnlyStorage(Arc::new(disk_cache)));
let storage = MultiLevelStorage::new(vec![ro_cache as Arc<dyn Storage>]);
runtime.block_on(async {
match storage.check().await.unwrap() {
CacheMode::ReadOnly => {} _ => panic!("Should detect read-only mode"),
}
});
}
#[test]
fn test_sequential_read_order() {
let runtime = RuntimeBuilder::new_current_thread()
.enable_all()
.build()
.unwrap();
let l0 = Arc::new(InMemoryStorage::new());
let l1 = Arc::new(InMemoryStorage::new());
let l2 = Arc::new(InMemoryStorage::new());
let l0_log = l0.get_access_log();
let l1_log = l1.get_access_log();
let l2_log = l2.get_access_log();
let key = "test_key_12345678901234567890";
runtime.block_on(async {
let mut entry = CacheWrite::default();
entry.put_stdout(b"test data").unwrap();
l2.put(key, entry).await.unwrap();
});
let storage = MultiLevelStorage::new(vec![
l0 as Arc<dyn Storage>,
l1 as Arc<dyn Storage>,
l2 as Arc<dyn Storage>,
]);
runtime.block_on(async {
let result = storage.get(key).await.unwrap();
assert!(matches!(result, Cache::Hit(_)));
let l0_accesses = l0_log.lock().await;
let l1_accesses = l1_log.lock().await;
let l2_accesses = l2_log.lock().await;
assert_eq!(l0_accesses.len(), 1, "L0 should be checked first");
assert_eq!(l1_accesses.len(), 1, "L1 should be checked second");
assert_eq!(l2_accesses.len(), 2, "L2: put (setup) + get (check)");
assert_eq!(l0_accesses[0], format!("get:{}", key));
assert_eq!(l1_accesses[0], format!("get:{}", key));
assert_eq!(l2_accesses[0], format!("put:{}", key)); assert_eq!(l2_accesses[1], format!("get:{}", key)); });
}
#[test]
fn test_read_stops_at_first_hit_not_parallel() {
let runtime = RuntimeBuilder::new_current_thread()
.enable_all()
.build()
.unwrap();
let l0 = Arc::new(InMemoryStorage::new());
let l1 = Arc::new(InMemoryStorage::new());
let l2 = Arc::new(InMemoryStorage::new());
let l0_log = l0.get_access_log();
let l1_log = l1.get_access_log();
let l2_log = l2.get_access_log();
let key = "test_key_early_hit_1234567890ab";
runtime.block_on(async {
let mut entry = CacheWrite::default();
entry.put_stdout(b"L1 data").unwrap();
l1.put(key, entry).await.unwrap();
});
let storage = MultiLevelStorage::new(vec![
l0 as Arc<dyn Storage>,
l1 as Arc<dyn Storage>,
l2 as Arc<dyn Storage>,
]);
runtime.block_on(async {
let result = storage.get(key).await.unwrap();
assert!(matches!(result, Cache::Hit(_)));
let l0_accesses = l0_log.lock().await;
let l1_accesses = l1_log.lock().await;
let l2_accesses = l2_log.lock().await;
assert_eq!(l0_accesses.len(), 1, "L0 should be checked first");
assert_eq!(l1_accesses.len(), 2, "L1: put (setup) + get (check)");
assert_eq!(
l2_accesses.len(),
0,
"L2 should NOT be checked (sequential read stops at first hit)"
);
});
}
struct FailingStorage;
#[async_trait]
impl Storage for FailingStorage {
async fn get(&self, _key: &str) -> Result<Cache> {
Ok(Cache::Miss)
}
async fn put(&self, _key: &str, _entry: CacheWrite) -> Result<Duration> {
Err(anyhow!("Intentional failure for testing"))
}
async fn put_raw(&self, _key: &str, _entry: Bytes) -> Result<Duration> {
Err(anyhow!("Intentional failure for testing"))
}
async fn check(&self) -> Result<CacheMode> {
Ok(CacheMode::ReadWrite) }
fn location(&self) -> String {
"FailingStorage".to_string()
}
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
fn preprocessor_cache_mode_config(&self) -> PreprocessorCacheModeConfig {
PreprocessorCacheModeConfig::default()
}
async fn get_preprocessor_cache_entry(
&self,
_key: &str,
) -> Result<Option<Box<dyn crate::lru_disk_cache::ReadSeek>>> {
Err(anyhow!("Intentional failure for testing"))
}
async fn put_preprocessor_cache_entry(
&self,
_key: &str,
_entry: PreprocessorCacheEntry,
) -> Result<()> {
Err(anyhow!("Intentional failure for testing"))
}
}
#[test]
fn test_put_mode_ignore() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(FailingStorage);
let cache_l1 = Arc::new(FailingStorage);
let storage = MultiLevelStorage::with_write_error_policy(
vec![cache_l0 as Arc<dyn Storage>, cache_l1 as Arc<dyn Storage>],
WriteErrorPolicy::Ignore,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
assert!(
result.is_ok(),
"WriteErrorPolicy::Ignore should never fail, even when all levels error"
);
});
}
#[test]
fn test_put_mode_l0_fails_on_error() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(FailingStorage);
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::with_write_error_policy(
vec![cache_l0 as Arc<dyn Storage>, cache_l1 as Arc<dyn Storage>],
WriteErrorPolicy::L0,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
assert!(
result.is_err(),
"WriteErrorPolicy::L0 should fail when L0 write fails"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Intentional") || err_msg.contains("put_raw not implemented"),
"Expected failure message, got: {}",
err_msg
);
});
}
#[test]
fn test_put_mode_l0_succeeds_if_l0_ok() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(FailingStorage);
let storage = MultiLevelStorage::with_write_error_policy(
vec![cache_l0 as Arc<dyn Storage>, cache_l1 as Arc<dyn Storage>],
WriteErrorPolicy::L0,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
assert!(
result.is_ok(),
"WriteErrorPolicy::L0 should succeed when L0 succeeds, even if L1+ fails"
);
});
}
#[test]
fn test_put_mode_all_fails_on_any_error() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(FailingStorage);
let storage = MultiLevelStorage::with_write_error_policy(
vec![cache_l0 as Arc<dyn Storage>, cache_l1 as Arc<dyn Storage>],
WriteErrorPolicy::All,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
sleep(Duration::from_millis(100)).await;
assert!(
result.is_err(),
"WriteErrorPolicy::All should fail when any RW level fails"
);
});
}
#[test]
fn test_put_mode_all_succeeds_when_all_ok() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::with_write_error_policy(
vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1.clone() as Arc<dyn Storage>,
],
WriteErrorPolicy::All,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
sleep(Duration::from_millis(100)).await;
assert!(
result.is_ok(),
"WriteErrorPolicy::All should succeed when all levels succeed"
);
assert!(matches!(
cache_l0.get("test_key").await.unwrap(),
Cache::Hit(_)
));
assert!(matches!(
cache_l1.get("test_key").await.unwrap(),
Cache::Hit(_)
));
});
}
#[test]
fn test_put_mode_all_skips_readonly() {
let runtime = RuntimeBuilder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let cache_l0 = Arc::new(InMemoryStorage::new());
let cache_l1 = Arc::new(ReadOnlyStorage(Arc::new(InMemoryStorage::new())));
let cache_l2 = Arc::new(InMemoryStorage::new());
let storage = MultiLevelStorage::with_write_error_policy(
vec![
cache_l0.clone() as Arc<dyn Storage>,
cache_l1 as Arc<dyn Storage>,
cache_l2.clone() as Arc<dyn Storage>,
],
WriteErrorPolicy::All,
);
runtime.block_on(async {
let entry = CacheWrite::new();
let result = storage.put("test_key", entry).await;
sleep(Duration::from_millis(100)).await;
assert!(
result.is_ok(),
"WriteErrorPolicy::All should succeed when read-only levels are skipped"
);
assert!(matches!(
cache_l0.get("test_key").await.unwrap(),
Cache::Hit(_)
));
assert!(matches!(
cache_l2.get("test_key").await.unwrap(),
Cache::Hit(_)
));
});
}