mod common;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Barrier;
use crate::common::build_test_async_cache;
#[tokio::test]
async fn snapshot_iter_visits_all_items() {
let cache = build_test_async_cache(4);
let mut expected = HashSet::new();
for i in 0..100 {
let value = i.to_string();
cache.insert(i, value.clone(), 1).await;
expected.insert((i, Arc::new(value)));
}
let mut collected = HashSet::new();
let mut iter = cache.iter_snapshot_async();
while let Some(item) = iter.next().await {
collected.insert(item);
}
assert_eq!(collected, expected);
}
#[tokio::test]
async fn snapshot_iter_misses_insert_after_shard_scan() {
let cache = Arc::new(build_test_async_cache(4));
cache.insert(0, "a".to_string(), 1).await; cache.insert(2, "b".to_string(), 1).await;
let barrier = Arc::new(Barrier::new(2));
let cache_clone = cache.clone();
let barrier_clone = barrier.clone();
let task_handle = tokio::spawn(async move {
barrier_clone.wait().await;
cache_clone.insert(4, "new".to_string(), 1).await; });
let mut collected = Vec::new();
let mut iter = cache.iter_snapshot_async();
if let Some(item) = iter.next().await {
collected.push(item);
}
barrier.wait().await;
while let Some(item) = iter.next().await {
collected.push(item);
}
task_handle.await.unwrap();
assert_eq!(collected.len(), 2, "Should only see the original 2 items");
assert!(!collected.iter().any(|(k, _)| *k == 4));
}
#[tokio::test]
async fn snapshot_iter_sees_insert_before_shard_scan() {
let cache = Arc::new(build_test_async_cache(4));
cache.insert(0, "a".to_string(), 1).await;
let barrier = Arc::new(Barrier::new(2));
let cache_clone = cache.clone();
let barrier_clone = barrier.clone();
let task_handle = tokio::spawn(async move {
barrier_clone.wait().await;
cache_clone.insert(2, "new".to_string(), 1).await;
});
let mut collected = Vec::new();
let mut iter = cache.iter_snapshot_async();
if let Some(item) = iter.next().await {
collected.push(item);
}
barrier.wait().await;
while let Some(item) = iter.next().await {
collected.push(item);
}
task_handle.await.unwrap();
assert_eq!(
collected.len(),
2,
"Should see the original and the new item"
);
assert!(collected.iter().any(|(k, _)| *k == 2));
}
#[tokio::test]
async fn snapshot_iter_skips_deleted_item() {
let cache = Arc::new(build_test_async_cache(4));
cache.insert(0, "a".to_string(), 1).await;
cache.insert(1, "b".to_string(), 1).await;
let barrier = Arc::new(Barrier::new(2));
let cache_clone = cache.clone();
let barrier_clone = barrier.clone();
let task_handle = tokio::spawn(async move {
barrier_clone.wait().await;
cache_clone.invalidate(&0).await;
});
let mut collected = Vec::new();
let mut iter = cache.iter_snapshot_async();
barrier.wait().await;
while let Some(item) = iter.next().await {
collected.push(item);
}
task_handle.await.unwrap();
assert_eq!(
collected.len(),
1,
"Should only collect the item that was not deleted"
);
assert_eq!(collected[0].0, 1, "The remaining item should be key 1");
}