mod common;
use std::collections::HashSet;
use std::sync::Arc;
use std::thread;
use fibre::spsc;
use crate::common::build_test_cache;
#[test]
fn snapshot_iter_visits_all_items() {
let cache = build_test_cache(4);
let mut expected = HashSet::new();
for i in 0..100 {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter_snapshot().collect();
assert_eq!(collected, expected);
}
#[test]
fn snapshot_iter_misses_insert_after_shard_scan() {
let cache = Arc::new(build_test_cache(4));
cache.insert(0, "a".to_string(), 1); cache.insert(2, "b".to_string(), 1);
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(50));
cache_clone.insert(4, "new".to_string(), 1); });
let collected: Vec<_> = cache.iter_snapshot().collect();
thread_handle.join().unwrap();
assert_eq!(collected.len(), 2, "Should only see the original 2 items");
assert!(
!collected.iter().any(|(k, _)| *k == 4),
"Should miss item inserted into a past shard"
);
}
#[test]
fn snapshot_iter_sees_insert_before_shard_scan() {
let cache = Arc::new(build_test_cache(4));
let (go_tx, go_rx) = spsc::bounded_sync::<()>(1);
let (done_tx, done_rx) = spsc::bounded_sync::<()>(1);
cache.insert(0, "a".to_string(), 1);
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
go_rx.recv().unwrap();
cache_clone.insert(2, "new".to_string(), 1);
done_tx.send(()).unwrap();
});
let mut collected = Vec::new();
let mut iter = cache.iter_snapshot();
if let Some(item) = iter.next() {
collected.push(item);
}
go_tx.send(()).unwrap();
done_rx.recv().unwrap();
for item in iter {
collected.push(item);
}
thread_handle.join().unwrap();
assert_eq!(
collected.len(),
2,
"Should see the original and the new item"
);
assert!(
collected.iter().any(|(k, _)| *k == 2),
"Should see item inserted into a future shard"
);
}
#[test]
fn snapshot_iter_skips_deleted_item() {
let cache = Arc::new(build_test_cache(4));
let (go_tx, go_rx) = spsc::bounded_sync::<()>(1);
let (done_tx, done_rx) = spsc::bounded_sync::<()>(1);
cache.insert(0, "a".to_string(), 1); cache.insert(1, "b".to_string(), 1);
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
go_rx.recv().unwrap();
cache_clone.invalidate(&0);
done_tx.send(()).unwrap();
});
let iter = cache.iter_snapshot();
go_tx.send(()).unwrap();
done_rx.recv().unwrap();
let collected: Vec<_> = iter.collect();
thread_handle.join().unwrap();
assert_eq!(
collected.len(),
1,
"Should only collect the item that was not deleted"
);
assert_eq!(collected[0].0, 1, "The remaining item should be key 1");
}