use std::time::Instant;
use super::blob::BlobCache;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SweepLimit {
Entries(usize),
Millis(u32),
Either { entries: usize, millis: u32 },
}
impl SweepLimit {
fn entries_cap(self) -> Option<usize> {
match self {
SweepLimit::Entries(n) => Some(n),
SweepLimit::Either { entries, .. } => Some(entries),
SweepLimit::Millis(_) => None,
}
}
fn millis_cap(self) -> Option<u32> {
match self {
SweepLimit::Millis(n) => Some(n),
SweepLimit::Either { millis, .. } => Some(millis),
SweepLimit::Entries(_) => None,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NamespaceSweepStats {
pub entries_scanned: usize,
pub entries_evicted: usize,
pub bytes_reclaimed: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct SweepReport {
pub entries_scanned: usize,
pub entries_evicted: usize,
pub bytes_reclaimed: u64,
pub elapsed_ms: u32,
pub truncated_due_to_limit: bool,
pub by_namespace: Vec<(String, NamespaceSweepStats)>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct OrphanReport {
pub blob_chains_scanned: usize,
pub blob_chains_reclaimed: usize,
pub bytes_reclaimed: u64,
pub elapsed_ms: u32,
pub truncated_due_to_limit: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NamespaceFlushReport {
pub namespace: String,
pub generation_before: u64,
pub generation_after: u64,
pub elapsed_micros: u32,
}
pub struct BlobCacheSweeper;
impl BlobCacheSweeper {
pub fn sweep_expired(cache: &BlobCache, limit: SweepLimit) -> SweepReport {
let started = Instant::now();
let budget = Budget::new(limit, started);
let _ = cache.stats();
let mut report = SweepReport::default();
budget.observe(&mut report.elapsed_ms);
report.truncated_due_to_limit = false;
report
}
pub fn reclaim_orphans(cache: &BlobCache, limit: SweepLimit) -> OrphanReport {
let started = Instant::now();
let budget = Budget::new(limit, started);
let _ = cache.stats();
let mut report = OrphanReport::default();
budget.observe(&mut report.elapsed_ms);
report.truncated_due_to_limit = false;
report
}
pub fn flush_namespace(cache: &BlobCache, namespace: &str) -> NamespaceFlushReport {
let started = Instant::now();
let _flushed = cache.invalidate_namespace(namespace);
let elapsed = started.elapsed();
let elapsed_micros = u32::try_from(elapsed.as_micros()).unwrap_or(u32::MAX);
NamespaceFlushReport {
namespace: namespace.to_string(),
generation_before: 0,
generation_after: 0,
elapsed_micros,
}
}
}
struct Budget {
started: Instant,
entries_cap: Option<usize>,
millis_cap: Option<u32>,
entries_seen: usize,
}
impl Budget {
fn new(limit: SweepLimit, started: Instant) -> Self {
Self {
started,
entries_cap: limit.entries_cap(),
millis_cap: limit.millis_cap(),
entries_seen: 0,
}
}
#[allow(dead_code)] fn exhausted(&self) -> bool {
if let Some(cap) = self.entries_cap {
if self.entries_seen >= cap {
return true;
}
}
if let Some(cap) = self.millis_cap {
if self.elapsed_ms_capped() >= cap {
return true;
}
}
false
}
#[allow(dead_code)] fn tick(&mut self) -> bool {
self.entries_seen = self.entries_seen.saturating_add(1);
self.exhausted()
}
fn elapsed_ms_capped(&self) -> u32 {
u32::try_from(self.started.elapsed().as_millis()).unwrap_or(u32::MAX)
}
fn observe(self, elapsed_ms_field: &mut u32) {
*elapsed_ms_field = self.elapsed_ms_capped();
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use super::*;
use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePolicy, BlobCachePut};
fn cache() -> BlobCache {
BlobCache::new(
BlobCacheConfig::default()
.with_l1_bytes_max(64 * 1024)
.with_shard_count(4)
.with_max_namespaces(8),
)
}
#[test]
fn sweep_limit_entries_only_caps_entries() {
let limit = SweepLimit::Entries(42);
assert_eq!(limit.entries_cap(), Some(42));
assert_eq!(limit.millis_cap(), None);
}
#[test]
fn sweep_limit_millis_only_caps_millis() {
let limit = SweepLimit::Millis(7);
assert_eq!(limit.entries_cap(), None);
assert_eq!(limit.millis_cap(), Some(7));
}
#[test]
fn sweep_limit_either_caps_both() {
let limit = SweepLimit::Either {
entries: 100,
millis: 5,
};
assert_eq!(limit.entries_cap(), Some(100));
assert_eq!(limit.millis_cap(), Some(5));
}
#[test]
fn budget_entries_bound_fires_first() {
let mut budget = Budget::new(SweepLimit::Entries(3), Instant::now());
assert!(!budget.exhausted());
assert!(!budget.tick());
assert!(!budget.tick());
assert!(budget.tick());
assert!(budget.exhausted());
}
#[test]
fn budget_either_uses_first_bound_to_fire() {
let mut budget = Budget::new(
SweepLimit::Either {
entries: 1_000,
millis: 2,
},
Instant::now(),
);
budget.tick();
thread::sleep(Duration::from_millis(5));
assert!(budget.exhausted(), "millis bound should fire first");
}
#[test]
fn sweep_expired_with_entries_limit_returns_report_within_bound() {
let cache = cache();
cache
.put("n", "k", BlobCachePut::new(b"v".to_vec()))
.unwrap();
let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
assert_eq!(report.entries_scanned, 0);
assert_eq!(report.entries_evicted, 0);
assert_eq!(report.bytes_reclaimed, 0);
assert!(!report.truncated_due_to_limit);
assert!(report.by_namespace.is_empty());
}
#[test]
fn sweep_expired_with_millis_limit_honors_wall_clock_bound() {
let cache = cache();
let limit_ms = 50u32;
let started = Instant::now();
let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Millis(limit_ms));
let observed_ms = started.elapsed().as_millis() as u32;
assert!(
observed_ms <= limit_ms + 50,
"sweep_expired should not block beyond limit; observed {observed_ms}ms vs cap {limit_ms}ms",
);
assert!(
report.elapsed_ms <= limit_ms + 50,
"report.elapsed_ms ({}) should be near or under cap ({})",
report.elapsed_ms,
limit_ms,
);
}
#[test]
fn sweep_expired_does_not_remove_unexpired_entries() {
let cache = cache();
cache
.put("alive", "k", BlobCachePut::new(b"v".to_vec()))
.unwrap();
let _ = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(100));
let hit = cache.get("alive", "k").expect("entry survives sweep");
assert_eq!(hit.value(), b"v");
}
#[test]
#[ignore = "requires BlobCache::for_each_l1_entry accessor (flag #2)"]
fn sweep_expired_evicts_expired_but_not_unexpired() {
let cache = cache();
let policy = BlobCachePolicy::default().expires_at_unix_ms(1);
cache
.put(
"n",
"expired",
BlobCachePut::new(b"x".to_vec()).with_policy(policy),
)
.unwrap();
cache
.put("n", "alive", BlobCachePut::new(b"y".to_vec()))
.unwrap();
let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
assert_eq!(report.entries_evicted, 1);
assert!(cache.get("n", "expired").is_none());
assert!(cache.get("n", "alive").is_some());
}
#[test]
fn reclaim_orphans_returns_well_formed_report() {
let cache = cache();
let report = BlobCacheSweeper::reclaim_orphans(&cache, SweepLimit::Entries(10));
assert_eq!(report.blob_chains_scanned, 0);
assert_eq!(report.blob_chains_reclaimed, 0);
assert_eq!(report.bytes_reclaimed, 0);
assert!(!report.truncated_due_to_limit);
}
#[test]
#[ignore = "requires BlobCache::for_each_l2_record + cross-module fault hook (flag #2)"]
fn reclaim_orphans_reclaims_chain_left_by_interrupted_write() {
}
#[test]
fn flush_namespace_returns_within_foreground_fast_bound_and_bumps_generation() {
let cache = cache();
cache
.put("ns", "k", BlobCachePut::new(b"v".to_vec()))
.unwrap();
let started = Instant::now();
let report = BlobCacheSweeper::flush_namespace(&cache, "ns");
let observed = started.elapsed();
assert_eq!(report.namespace, "ns");
assert_eq!(report.generation_before, 0);
assert_eq!(report.generation_after, 0);
assert!(
observed < Duration::from_millis(5),
"flush_namespace should be foreground-fast; observed {observed:?}",
);
assert!(
report.elapsed_micros < 5_000,
"report.elapsed_micros should be <5ms; observed {}µs",
report.elapsed_micros,
);
assert!(
cache.get("ns", "k").is_none(),
"entry should be invisible after generation bump",
);
}
#[test]
fn flush_namespace_on_unknown_namespace_still_returns_well_formed_report() {
let cache = cache();
let report = BlobCacheSweeper::flush_namespace(&cache, "never-existed");
assert_eq!(report.namespace, "never-existed");
assert_eq!(report.generation_before, 0);
assert_eq!(report.generation_after, 0);
}
#[test]
fn concurrent_reads_never_block_during_sweep() {
const READER_THREADS: usize = 8;
const READS_PER_THREAD: usize = 5_000;
const READER_SOFT_CAP: Duration = Duration::from_millis(500);
let cache = Arc::new(cache());
for i in 0..64 {
cache
.put("ns", &format!("k{i}"), BlobCachePut::new(vec![i as u8; 32]))
.unwrap();
}
let stop = Arc::new(AtomicBool::new(false));
let sweeper_cache = Arc::clone(&cache);
let sweeper_stop = Arc::clone(&stop);
let sweeper = thread::spawn(move || {
while !sweeper_stop.load(Ordering::Relaxed) {
let _ = BlobCacheSweeper::sweep_expired(
&sweeper_cache,
SweepLimit::Either {
entries: 1_000,
millis: 5,
},
);
let _ = BlobCacheSweeper::reclaim_orphans(&sweeper_cache, SweepLimit::Millis(5));
}
});
let reader_handles: Vec<_> = (0..READER_THREADS)
.map(|tid| {
let reader_cache = Arc::clone(&cache);
thread::spawn(move || {
let started = Instant::now();
for i in 0..READS_PER_THREAD {
let key = format!("k{}", (tid * 7 + i) % 64);
let _ = reader_cache.get("ns", &key);
}
started.elapsed()
})
})
.collect();
let elapsed_per_reader: Vec<Duration> = reader_handles
.into_iter()
.map(|h| h.join().expect("reader thread panicked"))
.collect();
stop.store(true, Ordering::Relaxed);
sweeper.join().expect("sweeper thread panicked");
for (tid, elapsed) in elapsed_per_reader.iter().enumerate() {
assert!(
*elapsed < READER_SOFT_CAP,
"reader {tid} took {elapsed:?}, exceeding soft cap {READER_SOFT_CAP:?} \
— sweeper appears to be blocking reads",
);
}
}
#[test]
fn flush_namespace_storm_does_not_block_other_namespace_reads() {
let cache = Arc::new(cache());
cache
.put("readers", "k", BlobCachePut::new(b"hello".to_vec()))
.unwrap();
cache
.put("flushed", "k", BlobCachePut::new(b"x".to_vec()))
.unwrap();
let stop = Arc::new(AtomicBool::new(false));
let flush_cache = Arc::clone(&cache);
let flush_stop = Arc::clone(&stop);
let flusher = thread::spawn(move || {
while !flush_stop.load(Ordering::Relaxed) {
let _ = BlobCacheSweeper::flush_namespace(&flush_cache, "flushed");
}
});
let started = Instant::now();
for _ in 0..10_000 {
let hit = cache.get("readers", "k").expect("reader namespace alive");
assert_eq!(hit.value(), b"hello");
}
let elapsed = started.elapsed();
stop.store(true, Ordering::Relaxed);
flusher.join().expect("flusher panicked");
assert!(
elapsed < Duration::from_millis(500),
"10k reads on a quiet namespace took {elapsed:?} — flush storm appears to block reads",
);
}
}