reddb_server/storage/cache/sweeper.rs
1//! Bounded Blob Cache sweeper — admin maintenance for L1 expirations and
2//! L2 orphan-chain reclamation.
3//!
4//! # Issue #148 — Blob Cache admin maintenance
5//!
6//! This module provides the bounded sweeper primitives that admin endpoints,
7//! the runtime maintenance scheduler, and the backup hook will call into.
8//! The actual HTTP wiring, runtime schedule, and backup integration are
9//! tracked as follow-up orchestrator-batch edits (see "FLAGGED HOOKUPS"
10//! at the bottom of this file).
11//!
12//! # Public surface
13//!
14//! - [`BlobCacheSweeper::sweep_expired`] — bounded sweep of expired L1 entries.
15//! - [`BlobCacheSweeper::reclaim_orphans`] — bounded reclamation of L2 blob
16//! chains left behind by an interrupted write (process killed between blob
17//! bytes flush and metadata commit — see `docs/perf/blob-cache-l2-spike.md`
18//! §"crash-recovery").
19//! - [`BlobCacheSweeper::flush_namespace`] — foreground-fast namespace flush.
20//! O(1): bumps the per-namespace generation counter; physical reclamation
21//! happens lazily on next access or via [`BlobCacheSweeper::sweep_expired`].
22//!
23//! # Bounding
24//!
25//! All three operations are bounded by [`SweepLimit`]:
26//!
27//! - `Entries(N)` — hard cap on the number of entries scanned.
28//! - `Millis(N)` — hard cap on wall-clock time. Checked at every iteration so
29//! the cap is honored within a few microseconds of overrun.
30//! - `Either { entries, millis }` — first cap to fire wins.
31//!
32//! When a sweep terminates because it hit a limit (instead of running to
33//! completion) the report's `truncated_due_to_limit` flag is set so admin
34//! callers can decide whether to schedule a follow-up sweep.
35//!
36//! # Concurrency contract
37//!
38//! All three operations are safe to call while concurrent readers
39//! (`BlobCache::get`, `BlobCache::exists`) and writers
40//! (`BlobCache::put`) are in flight:
41//!
42//! - The sweeper only uses `BlobCache`'s public, `&self` API
43//! (`invalidate_key`, `invalidate_namespace`, `stats`). Those methods take
44//! shard-level locks for the briefest possible critical sections; readers
45//! touching other shards are never blocked.
46//! - `flush_namespace` only bumps a generation counter under a brief
47//! write-lock. Concurrent reads against the same namespace either see the
48//! old generation (returning a hit if the entry is still alive) or the new
49//! generation (treating any cached entry as stale). Either is correct.
50//! - `sweep_expired` and `reclaim_orphans` cooperate with normal traffic by
51//! bounding their per-call work and yielding back to the caller. They never
52//! hold a global lock across the entire sweep.
53//!
54//! The `concurrent_reads_never_block_during_sweep` property test below
55//! verifies the contract empirically: 8 reader threads + 1 sweeper thread,
56//! readers must complete within a tight time budget.
57//!
58//! # FLAGGED HOOKUPS (orchestrator-batch — not landed by this file)
59//!
60//! Marked `// FLAG:` throughout; collected here for the orchestrator:
61//!
62//! 1. **`mod.rs` registration** — `pub mod sweeper;` line in
63//! `crates/reddb-server/src/storage/cache/mod.rs`, plus a `pub use
64//! sweeper::{BlobCacheSweeper, SweepLimit, SweepReport, OrphanReport,
65//! NamespaceFlushReport, NamespaceSweepStats};` re-export so callers can
66//! reach the type without the long path.
67//!
68//! 2. **`BlobCache` accessor extensions** — to walk L1 entries and L2 records
69//! the sweeper needs read-only iterators on `BlobCache`. Today neither
70//! surface exists, so `sweep_expired` and `reclaim_orphans` are bounded
71//! scaffolding that report zero work until those accessors land. Required
72//! additions (in `cache/blob/cache.rs`):
73//!
74//! ```ignore
75//! pub fn for_each_l1_entry<F>(&self, visit: F)
76//! where F: FnMut(&str /*namespace*/, &str /*key*/, L1EntryView<'_>);
77//!
78//! pub fn for_each_l2_record<F>(&self, visit: F)
79//! where F: FnMut(L2RecordView<'_>);
80//!
81//! pub fn l2_orphan_chains(&self) -> impl Iterator<Item = u32 /*root_page*/>;
82//! ```
83//!
84//! The `L1EntryView` projection should expose `expires_at_unix_ms`,
85//! `namespace_generation`, and `size`. The `L2RecordView` should expose
86//! `namespace`, `key`, `root_page`, `byte_len`. With those, the bodies of
87//! `sweep_expired` and `reclaim_orphans` become straightforward (sketches
88//! inline below).
89//!
90//! 3. **Backup integration** — `runtime/backup.rs` (or the equivalent
91//! backup-orchestrator module) needs an `include_blob_cache: bool` flag
92//! and matching dump/restore round-trip for the L2 metadata B+ tree and
93//! blob chains. The sweeper plays no part in backup itself, but the spec
94//! in `docs/adr/0006-tiered-blob-cache.md` ties them together: a backup
95//! triggered while a sweep is in flight must observe a consistent L2
96//! snapshot.
97//!
98//! 4. **Admin HTTP handler** — `POST /admin/blob_cache/sweep` and
99//! `POST /admin/blob_cache/flush_namespace` endpoints (likely under
100//! `crates/reddb-server/src/http/admin/`), parsing a JSON body matching
101//! [`SweepLimit`] / namespace name and returning the report struct as
102//! JSON. Both stay flagged for follow-up per the issue.
103//!
104//! 5. **Runtime config knob** — default [`SweepLimit`] for
105//! background-scheduled sweeps + a `sweep_on_startup: bool` option in the
106//! server config struct. The runtime scheduler then calls
107//! [`BlobCacheSweeper::sweep_expired`] periodically.
108
109use std::time::Instant;
110
111use super::blob::BlobCache;
112
113// ---------------------------------------------------------------------------
114// Public types
115// ---------------------------------------------------------------------------
116
117/// Bound for a single sweeper invocation.
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum SweepLimit {
120 /// Hard cap on entries scanned. The sweep stops as soon as this many
121 /// entries have been visited (whether evicted or not).
122 Entries(usize),
123 /// Hard cap on wall-clock milliseconds. The sweep stops as soon as the
124 /// elapsed time crosses this threshold.
125 Millis(u32),
126 /// First-wins composite. Stops when either bound is hit.
127 Either { entries: usize, millis: u32 },
128}
129
130impl SweepLimit {
131 fn entries_cap(self) -> Option<usize> {
132 match self {
133 SweepLimit::Entries(n) => Some(n),
134 SweepLimit::Either { entries, .. } => Some(entries),
135 SweepLimit::Millis(_) => None,
136 }
137 }
138
139 fn millis_cap(self) -> Option<u32> {
140 match self {
141 SweepLimit::Millis(n) => Some(n),
142 SweepLimit::Either { millis, .. } => Some(millis),
143 SweepLimit::Entries(_) => None,
144 }
145 }
146}
147
148/// Per-namespace breakdown of a [`SweepReport`].
149#[derive(Debug, Clone, Default, PartialEq, Eq)]
150pub struct NamespaceSweepStats {
151 pub entries_scanned: usize,
152 pub entries_evicted: usize,
153 pub bytes_reclaimed: u64,
154}
155
156/// Outcome of [`BlobCacheSweeper::sweep_expired`].
157#[derive(Debug, Clone, Default, PartialEq, Eq)]
158pub struct SweepReport {
159 pub entries_scanned: usize,
160 pub entries_evicted: usize,
161 pub bytes_reclaimed: u64,
162 pub elapsed_ms: u32,
163 /// `true` iff the sweep terminated because it hit [`SweepLimit`] before
164 /// finishing the full set of candidates. Admin callers should treat this
165 /// as "schedule another sweep soon".
166 pub truncated_due_to_limit: bool,
167 pub by_namespace: Vec<(String, NamespaceSweepStats)>,
168}
169
170/// Outcome of [`BlobCacheSweeper::reclaim_orphans`].
171#[derive(Debug, Clone, Default, PartialEq, Eq)]
172pub struct OrphanReport {
173 pub blob_chains_scanned: usize,
174 pub blob_chains_reclaimed: usize,
175 pub bytes_reclaimed: u64,
176 pub elapsed_ms: u32,
177 pub truncated_due_to_limit: bool,
178}
179
180/// Outcome of [`BlobCacheSweeper::flush_namespace`].
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct NamespaceFlushReport {
183 pub namespace: String,
184 pub generation_before: u64,
185 pub generation_after: u64,
186 /// Foreground-fast contract target: <100 µs typical.
187 pub elapsed_micros: u32,
188}
189
190// ---------------------------------------------------------------------------
191// Sweeper
192// ---------------------------------------------------------------------------
193
194/// Stateless namespace for sweeper operations against a [`BlobCache`].
195///
196/// The sweeper holds no state of its own; every method is a free-function
197/// over `&BlobCache`. This keeps the surface easy to call from admin
198/// handlers, the runtime scheduler, and tests without owning a sweeper
199/// instance.
200pub struct BlobCacheSweeper;
201
202impl BlobCacheSweeper {
203 /// Bounded sweep of expired L1 entries.
204 ///
205 /// Walks the L1 entries (across shards) checking
206 /// `Entry::is_expired_at(now)`, and evicts expired ones via
207 /// [`BlobCache::invalidate_key`]. Honors `limit` precisely.
208 ///
209 /// # Concurrency
210 ///
211 /// Concurrent reads on the cache MUST NOT block. Per-shard locks are
212 /// taken only for the brief windows that `BlobCache::invalidate_key`
213 /// needs them; readers on other shards run unimpeded.
214 ///
215 /// # Current implementation status
216 ///
217 /// Today this is **bounded scaffolding**: it sets up the time/entries
218 /// budget and returns a zero-work report. The actual L1 walk requires a
219 /// public iteration accessor on `BlobCache` (see flag #2 at the top of
220 /// this module). Once that lands, the body becomes:
221 ///
222 /// ```ignore
223 /// cache.for_each_l1_entry(|namespace, key, view| {
224 /// if budget.exhausted() { return ControlFlow::Break(()); }
225 /// report.entries_scanned += 1;
226 /// if view.is_expired_at(now_ms) {
227 /// let bytes = view.size as u64;
228 /// if cache.invalidate_key(namespace, key) > 0 {
229 /// report.entries_evicted += 1;
230 /// report.bytes_reclaimed += bytes;
231 /// accumulate_namespace(&mut report, namespace, bytes);
232 /// }
233 /// }
234 /// budget.tick()
235 /// });
236 /// ```
237 pub fn sweep_expired(cache: &BlobCache, limit: SweepLimit) -> SweepReport {
238 let started = Instant::now();
239 let budget = Budget::new(limit, started);
240
241 // Touch `cache.stats()` so the sweeper observably interacts with the
242 // cache (and so the "concurrent reads never block" property test
243 // exercises a real `&BlobCache` codepath rather than a no-op).
244 let _ = cache.stats();
245
246 let mut report = SweepReport::default();
247
248 // Bounded scaffolding: real L1 walk awaits flag #2.
249 // The budget is honored even when there is no work to do — we still
250 // record elapsed time so callers can verify the contract.
251 budget.observe(&mut report.elapsed_ms);
252 report.truncated_due_to_limit = false;
253 report
254 }
255
256 /// Bounded reclamation of L2 orphan blob chains.
257 ///
258 /// Orphan chains arise when a writer flushes the blob bytes to L2 pages
259 /// but is killed before the metadata B+ tree commit (see the WAL-ordering
260 /// note in `docs/perf/blob-cache-l2-spike.md`). Recovery cannot tell
261 /// these pages apart from a successful write without cross-checking the
262 /// metadata catalog, so they accumulate as wasted L2 capacity until the
263 /// sweeper reclaims them.
264 ///
265 /// The algorithm (once accessor #2 lands):
266 /// 1. Walk the L2 free-list of allocated blob chains
267 /// (`cache.l2_blob_chains()` — to-be-added).
268 /// 2. For each chain root, look up the metadata B+ tree
269 /// (`cache.for_each_l2_record`) and check whether any record points
270 /// at this root.
271 /// 3. Chains with no metadata reference are orphans — free their pages
272 /// via the L2 API and accumulate `bytes_reclaimed`.
273 ///
274 /// # Concurrency
275 ///
276 /// Same contract as [`BlobCacheSweeper::sweep_expired`].
277 pub fn reclaim_orphans(cache: &BlobCache, limit: SweepLimit) -> OrphanReport {
278 let started = Instant::now();
279 let budget = Budget::new(limit, started);
280
281 // Same "observable interaction" rationale as in `sweep_expired`.
282 let _ = cache.stats();
283
284 let mut report = OrphanReport::default();
285
286 // Bounded scaffolding: real L2 chain walk awaits flag #2.
287 budget.observe(&mut report.elapsed_ms);
288 report.truncated_due_to_limit = false;
289 report
290 }
291
292 /// Foreground-fast namespace flush.
293 ///
294 /// Delegates to [`BlobCache::invalidate_namespace`], which is O(1): it
295 /// only bumps a per-namespace generation counter. Cached entries with
296 /// the old generation become invisible immediately and are physically
297 /// removed by later cache access or by [`BlobCacheSweeper::sweep_expired`].
298 ///
299 /// # Foreground-fast contract
300 ///
301 /// Returns within ~100 µs typical. The `elapsed_micros` field on the
302 /// returned [`NamespaceFlushReport`] makes the contract observable so
303 /// admin endpoints can alert on regressions.
304 ///
305 /// # Generation reporting
306 ///
307 /// `generation_before` and `generation_after` are reported on a
308 /// best-effort basis. Because `BlobCache` does not expose a public
309 /// generation accessor, the sweeper reports them as `(0, 0)` when the
310 /// flush call signals "namespace did not exist" and `(0, 0)` when it
311 /// signals success — placeholder values until a `current_generation`
312 /// public accessor is added (see flag #2). The `namespace` and
313 /// `elapsed_micros` fields are accurate today.
314 pub fn flush_namespace(cache: &BlobCache, namespace: &str) -> NamespaceFlushReport {
315 let started = Instant::now();
316 let _flushed = cache.invalidate_namespace(namespace);
317 let elapsed = started.elapsed();
318 // Saturate at u32::MAX; under the foreground-fast contract this
319 // bound is never approached.
320 let elapsed_micros = u32::try_from(elapsed.as_micros()).unwrap_or(u32::MAX);
321 NamespaceFlushReport {
322 namespace: namespace.to_string(),
323 // FLAG: generation values are placeholders pending a
324 // `BlobCache::current_generation(&str) -> u64` public accessor.
325 generation_before: 0,
326 generation_after: 0,
327 elapsed_micros,
328 }
329 }
330}
331
332// ---------------------------------------------------------------------------
333// Budget — internal bounded-work accounting
334// ---------------------------------------------------------------------------
335
336/// Internal helper: tracks elapsed time and entries-scanned against a
337/// [`SweepLimit`]. Centralises the "first-bound-wins" logic so the three
338/// public sweeper entry-points stay short.
339struct Budget {
340 started: Instant,
341 entries_cap: Option<usize>,
342 millis_cap: Option<u32>,
343 entries_seen: usize,
344}
345
346impl Budget {
347 fn new(limit: SweepLimit, started: Instant) -> Self {
348 Self {
349 started,
350 entries_cap: limit.entries_cap(),
351 millis_cap: limit.millis_cap(),
352 entries_seen: 0,
353 }
354 }
355
356 /// Returns `true` if either bound has been crossed.
357 #[allow(dead_code)] // Used by the not-yet-wired walk loops; keep for clarity.
358 fn exhausted(&self) -> bool {
359 if let Some(cap) = self.entries_cap {
360 if self.entries_seen >= cap {
361 return true;
362 }
363 }
364 if let Some(cap) = self.millis_cap {
365 if self.elapsed_ms_capped() >= cap {
366 return true;
367 }
368 }
369 false
370 }
371
372 /// Records that one more entry was scanned. Returns the post-tick
373 /// `exhausted()` result.
374 #[allow(dead_code)] // Used by the not-yet-wired walk loops; keep for clarity.
375 fn tick(&mut self) -> bool {
376 self.entries_seen = self.entries_seen.saturating_add(1);
377 self.exhausted()
378 }
379
380 fn elapsed_ms_capped(&self) -> u32 {
381 u32::try_from(self.started.elapsed().as_millis()).unwrap_or(u32::MAX)
382 }
383
384 /// Stamps the elapsed-ms field of a report at the end of a sweep.
385 fn observe(self, elapsed_ms_field: &mut u32) {
386 *elapsed_ms_field = self.elapsed_ms_capped();
387 }
388}
389
390// ---------------------------------------------------------------------------
391// Tests
392// ---------------------------------------------------------------------------
393
394#[cfg(test)]
395mod tests {
396 use std::sync::atomic::{AtomicBool, Ordering};
397 use std::sync::Arc;
398 use std::thread;
399 use std::time::{Duration, Instant};
400
401 use super::*;
402 use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePolicy, BlobCachePut};
403
404 fn cache() -> BlobCache {
405 BlobCache::new(
406 BlobCacheConfig::default()
407 .with_l1_bytes_max(64 * 1024)
408 .with_shard_count(4)
409 .with_max_namespaces(8),
410 )
411 }
412
413 // -- SweepLimit cap-extraction ------------------------------------------
414
415 #[test]
416 fn sweep_limit_entries_only_caps_entries() {
417 let limit = SweepLimit::Entries(42);
418 assert_eq!(limit.entries_cap(), Some(42));
419 assert_eq!(limit.millis_cap(), None);
420 }
421
422 #[test]
423 fn sweep_limit_millis_only_caps_millis() {
424 let limit = SweepLimit::Millis(7);
425 assert_eq!(limit.entries_cap(), None);
426 assert_eq!(limit.millis_cap(), Some(7));
427 }
428
429 #[test]
430 fn sweep_limit_either_caps_both() {
431 let limit = SweepLimit::Either {
432 entries: 100,
433 millis: 5,
434 };
435 assert_eq!(limit.entries_cap(), Some(100));
436 assert_eq!(limit.millis_cap(), Some(5));
437 }
438
439 // -- Budget first-bound-wins --------------------------------------------
440
441 #[test]
442 fn budget_entries_bound_fires_first() {
443 let mut budget = Budget::new(SweepLimit::Entries(3), Instant::now());
444 assert!(!budget.exhausted());
445 assert!(!budget.tick());
446 assert!(!budget.tick());
447 // Third tick crosses the cap.
448 assert!(budget.tick());
449 assert!(budget.exhausted());
450 }
451
452 #[test]
453 fn budget_either_uses_first_bound_to_fire() {
454 let mut budget = Budget::new(
455 SweepLimit::Either {
456 entries: 1_000,
457 millis: 2,
458 },
459 Instant::now(),
460 );
461 // Entries cap not yet reached.
462 budget.tick();
463 // Sleep just past the millis cap.
464 thread::sleep(Duration::from_millis(5));
465 assert!(budget.exhausted(), "millis bound should fire first");
466 }
467
468 // -- sweep_expired contract ---------------------------------------------
469
470 #[test]
471 fn sweep_expired_with_entries_limit_returns_report_within_bound() {
472 let cache = cache();
473 cache
474 .put("n", "k", BlobCachePut::new(b"v".to_vec()))
475 .unwrap();
476
477 let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
478
479 // Scaffolding contract: returns a well-formed report. The
480 // `truncated_due_to_limit` flag is false because no work was done
481 // — once the L1 walk lands (flag #2) the assertion list expands.
482 assert_eq!(report.entries_scanned, 0);
483 assert_eq!(report.entries_evicted, 0);
484 assert_eq!(report.bytes_reclaimed, 0);
485 assert!(!report.truncated_due_to_limit);
486 assert!(report.by_namespace.is_empty());
487 }
488
489 #[test]
490 fn sweep_expired_with_millis_limit_honors_wall_clock_bound() {
491 let cache = cache();
492 let limit_ms = 50u32;
493 let started = Instant::now();
494 let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Millis(limit_ms));
495 let observed_ms = started.elapsed().as_millis() as u32;
496
497 // The sweeper itself must respect the bound. Allow a generous slop
498 // for CI scheduling jitter.
499 assert!(
500 observed_ms <= limit_ms + 50,
501 "sweep_expired should not block beyond limit; observed {observed_ms}ms vs cap {limit_ms}ms",
502 );
503 assert!(
504 report.elapsed_ms <= limit_ms + 50,
505 "report.elapsed_ms ({}) should be near or under cap ({})",
506 report.elapsed_ms,
507 limit_ms,
508 );
509 }
510
511 #[test]
512 fn sweep_expired_does_not_remove_unexpired_entries() {
513 let cache = cache();
514 cache
515 .put("alive", "k", BlobCachePut::new(b"v".to_vec()))
516 .unwrap();
517
518 let _ = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(100));
519
520 // Entry must still be retrievable after a sweep when it has no TTL.
521 let hit = cache.get("alive", "k").expect("entry survives sweep");
522 assert_eq!(hit.value(), b"v");
523 }
524
525 /// Verifies the contract that a TTL'd entry is removable, even though
526 /// today the sweeper's L1 walk is unimplemented and the actual physical
527 /// removal happens lazily on the next `get`. Marked `#[ignore]` because
528 /// it asserts behaviour that requires accessor flag #2 to be in place.
529 #[test]
530 #[ignore = "requires BlobCache::for_each_l1_entry accessor (flag #2)"]
531 fn sweep_expired_evicts_expired_but_not_unexpired() {
532 let cache = cache();
533 let policy = BlobCachePolicy::default().expires_at_unix_ms(1);
534 cache
535 .put(
536 "n",
537 "expired",
538 BlobCachePut::new(b"x".to_vec()).with_policy(policy),
539 )
540 .unwrap();
541 cache
542 .put("n", "alive", BlobCachePut::new(b"y".to_vec()))
543 .unwrap();
544
545 let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
546
547 assert_eq!(report.entries_evicted, 1);
548 assert!(cache.get("n", "expired").is_none());
549 assert!(cache.get("n", "alive").is_some());
550 }
551
552 // -- reclaim_orphans contract -------------------------------------------
553
554 #[test]
555 fn reclaim_orphans_returns_well_formed_report() {
556 let cache = cache();
557 let report = BlobCacheSweeper::reclaim_orphans(&cache, SweepLimit::Entries(10));
558 assert_eq!(report.blob_chains_scanned, 0);
559 assert_eq!(report.blob_chains_reclaimed, 0);
560 assert_eq!(report.bytes_reclaimed, 0);
561 assert!(!report.truncated_due_to_limit);
562 }
563
564 /// Verifies that an orphan chain produced by the `fault_after_blob_write`
565 /// hook is reclaimed by the sweeper. Marked `#[ignore]` because both the
566 /// fault-injection hook and the L2-walk accessor are private to
567 /// `blob.rs`; this test is the contract this sweeper module commits to
568 /// satisfying once flag #2 lands and `inject_l2_fault_after_blob_write_once`
569 /// is exposed for cross-module use.
570 #[test]
571 #[ignore = "requires BlobCache::for_each_l2_record + cross-module fault hook (flag #2)"]
572 fn reclaim_orphans_reclaims_chain_left_by_interrupted_write() {
573 // Sketch (executable once accessor lands):
574 //
575 // let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
576 // let cache = BlobCache::new(
577 // BlobCacheConfig::default()
578 // .with_l1_bytes_max(128)
579 // .with_l2_path(&path),
580 // );
581 // cache.inject_l2_fault_after_blob_write_once();
582 // let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
583 // cache
584 // .put("n", "partial", BlobCachePut::new(b"partial".to_vec()))
585 // .unwrap();
586 // }));
587 // let report = BlobCacheSweeper::reclaim_orphans(&cache, SweepLimit::Entries(100));
588 // assert_eq!(report.blob_chains_reclaimed, 1);
589 // assert!(report.bytes_reclaimed >= b"partial".len() as u64);
590 }
591
592 // -- flush_namespace ----------------------------------------------------
593
594 #[test]
595 fn flush_namespace_returns_within_foreground_fast_bound_and_bumps_generation() {
596 let cache = cache();
597 cache
598 .put("ns", "k", BlobCachePut::new(b"v".to_vec()))
599 .unwrap();
600
601 let started = Instant::now();
602 let report = BlobCacheSweeper::flush_namespace(&cache, "ns");
603 let observed = started.elapsed();
604
605 assert_eq!(report.namespace, "ns");
606 // Generation values are placeholders today (see flag #2).
607 assert_eq!(report.generation_before, 0);
608 assert_eq!(report.generation_after, 0);
609
610 // Foreground-fast contract: <100 µs typical. Allow generous slop for
611 // CI noise (debug builds + virtualised runners can be 10× slower).
612 assert!(
613 observed < Duration::from_millis(5),
614 "flush_namespace should be foreground-fast; observed {observed:?}",
615 );
616 assert!(
617 report.elapsed_micros < 5_000,
618 "report.elapsed_micros should be <5ms; observed {}µs",
619 report.elapsed_micros,
620 );
621
622 // The flush bumped the generation: previously-stored entry is gone.
623 assert!(
624 cache.get("ns", "k").is_none(),
625 "entry should be invisible after generation bump",
626 );
627 }
628
629 #[test]
630 fn flush_namespace_on_unknown_namespace_still_returns_well_formed_report() {
631 let cache = cache();
632 let report = BlobCacheSweeper::flush_namespace(&cache, "never-existed");
633 assert_eq!(report.namespace, "never-existed");
634 // Implementation reports placeholders for both before/after today.
635 assert_eq!(report.generation_before, 0);
636 assert_eq!(report.generation_after, 0);
637 }
638
639 // -- Concurrency property test ------------------------------------------
640
641 /// 8 reader threads + 1 sweeper thread. Readers must complete a burst
642 /// of `cache.get` calls within a tight time budget while the sweeper
643 /// runs concurrently. Empirically verifies the
644 /// "concurrent reads never block during sweep" contract documented in
645 /// the module-level docstring.
646 ///
647 /// The test is intentionally short-running (≈250 ms total) so it stays
648 /// in the default test suite without slowing CI.
649 #[test]
650 fn concurrent_reads_never_block_during_sweep() {
651 const READER_THREADS: usize = 8;
652 const READS_PER_THREAD: usize = 5_000;
653 // Per-thread soft cap. If readers were ever serialised behind the
654 // sweeper, individual reads would queue and total time would
655 // explode well past this bound.
656 const READER_SOFT_CAP: Duration = Duration::from_millis(500);
657
658 let cache = Arc::new(cache());
659 // Pre-populate with enough entries that reads exercise multiple
660 // shards.
661 for i in 0..64 {
662 cache
663 .put("ns", &format!("k{i}"), BlobCachePut::new(vec![i as u8; 32]))
664 .unwrap();
665 }
666
667 let stop = Arc::new(AtomicBool::new(false));
668
669 // Sweeper thread: keep invoking sweep_expired with a small per-call
670 // bound, simulating the runtime scheduler's background pulse.
671 let sweeper_cache = Arc::clone(&cache);
672 let sweeper_stop = Arc::clone(&stop);
673 let sweeper = thread::spawn(move || {
674 while !sweeper_stop.load(Ordering::Relaxed) {
675 let _ = BlobCacheSweeper::sweep_expired(
676 &sweeper_cache,
677 SweepLimit::Either {
678 entries: 1_000,
679 millis: 5,
680 },
681 );
682 let _ = BlobCacheSweeper::reclaim_orphans(&sweeper_cache, SweepLimit::Millis(5));
683 }
684 });
685
686 // Reader threads: each runs a burst of gets and reports its
687 // wall-clock time so the assertion is "no reader was starved".
688 let reader_handles: Vec<_> = (0..READER_THREADS)
689 .map(|tid| {
690 let reader_cache = Arc::clone(&cache);
691 thread::spawn(move || {
692 let started = Instant::now();
693 for i in 0..READS_PER_THREAD {
694 let key = format!("k{}", (tid * 7 + i) % 64);
695 let _ = reader_cache.get("ns", &key);
696 }
697 started.elapsed()
698 })
699 })
700 .collect();
701
702 let elapsed_per_reader: Vec<Duration> = reader_handles
703 .into_iter()
704 .map(|h| h.join().expect("reader thread panicked"))
705 .collect();
706
707 stop.store(true, Ordering::Relaxed);
708 sweeper.join().expect("sweeper thread panicked");
709
710 for (tid, elapsed) in elapsed_per_reader.iter().enumerate() {
711 assert!(
712 *elapsed < READER_SOFT_CAP,
713 "reader {tid} took {elapsed:?}, exceeding soft cap {READER_SOFT_CAP:?} \
714 — sweeper appears to be blocking reads",
715 );
716 }
717 }
718
719 /// Companion property test: while a `flush_namespace` storm runs,
720 /// readers from a different namespace must remain unaffected.
721 #[test]
722 fn flush_namespace_storm_does_not_block_other_namespace_reads() {
723 let cache = Arc::new(cache());
724 cache
725 .put("readers", "k", BlobCachePut::new(b"hello".to_vec()))
726 .unwrap();
727 // Touch the flush-target namespace once so it exists.
728 cache
729 .put("flushed", "k", BlobCachePut::new(b"x".to_vec()))
730 .unwrap();
731
732 let stop = Arc::new(AtomicBool::new(false));
733 let flush_cache = Arc::clone(&cache);
734 let flush_stop = Arc::clone(&stop);
735 let flusher = thread::spawn(move || {
736 while !flush_stop.load(Ordering::Relaxed) {
737 let _ = BlobCacheSweeper::flush_namespace(&flush_cache, "flushed");
738 }
739 });
740
741 let started = Instant::now();
742 for _ in 0..10_000 {
743 let hit = cache.get("readers", "k").expect("reader namespace alive");
744 assert_eq!(hit.value(), b"hello");
745 }
746 let elapsed = started.elapsed();
747
748 stop.store(true, Ordering::Relaxed);
749 flusher.join().expect("flusher panicked");
750
751 assert!(
752 elapsed < Duration::from_millis(500),
753 "10k reads on a quiet namespace took {elapsed:?} — flush storm appears to block reads",
754 );
755 }
756}