mod common;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use fibre::spsc;
use fibre_cache::CacheBuilder;
use crate::common::{ShardControllingHasher, build_test_cache_with_cap};
#[test]
fn iter_on_empty_cache() {
let cache = build_test_cache_with_cap(4, 100);
assert_eq!(
cache.iter().count(),
0,
"Iterator on empty cache should yield no items"
);
}
#[test]
fn iter_visits_all_items_single_batch() {
let cache = build_test_cache_with_cap(4, 100);
let mut expected = HashSet::new();
for i in 0..20 {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter().collect();
assert_eq!(collected, expected);
}
#[test]
fn iter_visits_all_items_multiple_batches() {
let cache = build_test_cache_with_cap(4, 100);
let mut expected = HashSet::new();
for i in 0..100 {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter_with_batch_size(10).collect();
assert_eq!(collected, expected);
}
#[test]
fn iter_with_batch_size_of_one() {
let cache = build_test_cache_with_cap(4, 100);
let mut expected = HashSet::new();
for i in 0..10 {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter_with_batch_size(1).collect();
assert_eq!(collected, expected);
}
#[test]
fn iter_handles_partially_filled_shards() {
let cache = build_test_cache_with_cap(4, 100);
let mut expected = HashSet::new();
for i in (0..20).filter(|x| x % 4 == 0 || x % 4 == 2) {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter().collect();
assert_eq!(collected, expected);
assert_eq!(collected.len(), 10);
}
#[test]
fn iter_skips_ttl_expired_items() {
let cache = CacheBuilder::<i32, String>::new()
.shards(1)
.time_to_live(Duration::from_millis(50))
.janitor_tick_interval(Duration::from_millis(10)) .build()
.unwrap();
let mut expected = HashSet::new();
for i in 0..5 {
cache.insert(i, i.to_string(), 1);
}
thread::sleep(Duration::from_millis(100));
for i in 5..10 {
let value = i.to_string();
cache.insert(i, value.clone(), 1);
expected.insert((i, Arc::new(value)));
}
let collected: HashSet<_> = cache.iter().collect();
assert_eq!(collected, expected, "Iterator should skip expired items");
}
#[test]
fn iter_skips_items_expiring_mid_iteration() {
let cache = CacheBuilder::new()
.shards(4) .hasher(ShardControllingHasher)
.time_to_live(Duration::from_millis(100))
.janitor_tick_interval(Duration::from_millis(10))
.build()
.unwrap();
for i in [0, 4, 8] {
cache.insert(i, i.to_string(), 1);
} for i in [2, 6, 10] {
cache.insert(i, i.to_string(), 1);
}
let mut iter = cache.iter_with_batch_size(3);
let mut collected = vec![];
collected.push(iter.next().unwrap());
collected.push(iter.next().unwrap());
collected.push(iter.next().unwrap());
thread::sleep(Duration::from_millis(150));
for item in iter {
collected.push(item);
}
assert_eq!(
collected.len(),
3,
"Only the first valid batch should be collected"
);
}
#[test]
fn iter_misses_insert_after_shard_scan() {
let cache = Arc::new(build_test_cache_with_cap(4, 100));
let (go_tx, go_rx) = spsc::bounded_sync(1);
let (done_tx, done_rx) = spsc::bounded_sync(1);
cache.insert(0, "a".to_string(), 1);
cache.insert(4, "b".to_string(), 1);
cache.insert(1, "c".to_string(), 1);
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
go_rx.recv().unwrap();
cache_clone.insert(100, "new".to_string(), 1); done_tx.send(()).unwrap();
});
let mut collected = HashSet::new();
let mut items_seen = 0;
let mut signaled = false;
for (key, value) in cache.iter_with_batch_size(3) {
items_seen += 1;
collected.insert((key, value));
if items_seen >= 3 && !signaled {
go_tx.send(()).unwrap();
done_rx.recv().unwrap(); signaled = true;
}
}
thread_handle.join().unwrap();
assert!(
collected.iter().all(|(k, _)| *k != 100),
"Item inserted after its shard was scanned should be missed"
);
assert_eq!(
collected.len(),
3,
"Should have collected all original items"
);
}
#[test]
fn iter_sees_insert_before_shard_scan() {
let cache = Arc::new(build_test_cache_with_cap(4, 100));
let (go_tx, go_rx) = spsc::bounded_sync(1);
let (done_tx, done_rx) = spsc::bounded_sync(1);
for i in (0..10).filter(|x| x % 4 == 0) {
cache.insert(i, i.to_string(), 1);
}
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
go_rx.recv().unwrap();
cache_clone.insert(102, "new".to_string(), 1); done_tx.send(()).unwrap();
});
let mut collected = HashSet::new();
let mut items_seen = 0;
let mut signaled = false;
for (key, value) in cache.iter_with_batch_size(3) {
items_seen += 1;
collected.insert((key, value));
if items_seen >= 3 && !signaled {
go_tx.send(()).unwrap();
done_rx.recv().unwrap(); signaled = true;
}
}
thread_handle.join().unwrap();
assert!(
collected.iter().any(|(k, _)| *k == 102),
"Item inserted before its shard was scanned should be seen"
);
assert_eq!(
collected.len(),
4,
"Should have collected original items plus the new one"
);
}
#[test]
fn iter_sees_updated_value_before_scan() {
let cache = Arc::new(build_test_cache_with_cap(4, 100));
let (go_tx, go_rx) = spsc::bounded_sync(1);
let (done_tx, done_rx) = spsc::bounded_sync(1);
cache.insert(0, "zero".to_string(), 1); cache.insert(2, "old_two".to_string(), 1);
let cache_clone = cache.clone();
let thread_handle = thread::spawn(move || {
go_rx.recv().unwrap();
cache_clone.insert(2, "new_two".to_string(), 1);
done_tx.send(()).unwrap();
});
let mut collected = HashMap::new();
let mut signaled = false;
for (key, value) in cache.iter_with_batch_size(1) {
collected.insert(key, value);
if !signaled {
go_tx.send(()).unwrap();
done_rx.recv().unwrap(); signaled = true;
}
}
thread_handle.join().unwrap();
assert_eq!(**collected.get(&2).unwrap(), "new_two");
}
#[test]
fn iter_does_not_block_writers_on_other_shards() {
let cache = Arc::new(build_test_cache_with_cap(4, 100));
let barrier = Arc::new(Barrier::new(2));
let cache_clone = cache.clone();
let barrier_clone = barrier.clone();
let writer_thread = thread::spawn(move || {
barrier_clone.wait();
let start = std::time::Instant::now();
cache_clone.insert(1, "shard 1".to_string(), 1);
cache_clone.insert(2, "shard 2".to_string(), 1);
cache_clone.insert(3, "shard 3".to_string(), 1);
assert!(start.elapsed() < Duration::from_millis(50));
});
let mut iter = cache.iter();
iter.next();
barrier.wait();
thread::sleep(Duration::from_millis(100));
for _ in iter {}
writer_thread.join().unwrap();
}