Skip to main content

reddb_server/storage/cache/
sweeper.rs

1//! Bounded Blob Cache sweeper — admin maintenance for L1 expirations and
2//! L2 orphan-chain reclamation.
3//!
4//! # Issue #148 — Blob Cache admin maintenance
5//!
6//! This module provides the bounded sweeper primitives that admin endpoints,
7//! the runtime maintenance scheduler, and the backup hook will call into.
8//! The actual HTTP wiring, runtime schedule, and backup integration are
9//! tracked as follow-up orchestrator-batch edits (see "FLAGGED HOOKUPS"
10//! at the bottom of this file).
11//!
12//! # Public surface
13//!
14//! - [`BlobCacheSweeper::sweep_expired`] — bounded sweep of expired L1 entries.
15//! - [`BlobCacheSweeper::reclaim_orphans`] — bounded reclamation of L2 blob
16//!   chains left behind by an interrupted write (process killed between blob
17//!   bytes flush and metadata commit — see `docs/perf/blob-cache-l2-spike.md`
18//!   §"crash-recovery").
19//! - [`BlobCacheSweeper::flush_namespace`] — foreground-fast namespace flush.
20//!   O(1): bumps the per-namespace generation counter; physical reclamation
21//!   happens lazily on next access or via [`BlobCacheSweeper::sweep_expired`].
22//!
23//! # Bounding
24//!
25//! All three operations are bounded by [`SweepLimit`]:
26//!
27//! - `Entries(N)` — hard cap on the number of entries scanned.
28//! - `Millis(N)` — hard cap on wall-clock time. Checked at every iteration so
29//!   the cap is honored within a few microseconds of overrun.
30//! - `Either { entries, millis }` — first cap to fire wins.
31//!
32//! When a sweep terminates because it hit a limit (instead of running to
33//! completion) the report's `truncated_due_to_limit` flag is set so admin
34//! callers can decide whether to schedule a follow-up sweep.
35//!
36//! # Concurrency contract
37//!
38//! All three operations are safe to call while concurrent readers
39//! (`BlobCache::get`, `BlobCache::exists`) and writers
40//! (`BlobCache::put`) are in flight:
41//!
42//! - The sweeper only uses `BlobCache`'s public, `&self` API
43//!   (`invalidate_key`, `invalidate_namespace`, `stats`). Those methods take
44//!   shard-level locks for the briefest possible critical sections; readers
45//!   touching other shards are never blocked.
46//! - `flush_namespace` only bumps a generation counter under a brief
47//!   write-lock. Concurrent reads against the same namespace either see the
48//!   old generation (returning a hit if the entry is still alive) or the new
49//!   generation (treating any cached entry as stale). Either is correct.
50//! - `sweep_expired` and `reclaim_orphans` cooperate with normal traffic by
51//!   bounding their per-call work and yielding back to the caller. They never
52//!   hold a global lock across the entire sweep.
53//!
54//! The `concurrent_reads_never_block_during_sweep` property test below
55//! verifies the contract empirically: 8 reader threads + 1 sweeper thread,
56//! readers must complete within a tight time budget.
57//!
58//! # FLAGGED HOOKUPS (orchestrator-batch — not landed by this file)
59//!
60//! Marked `// FLAG:` throughout; collected here for the orchestrator:
61//!
62//! 1. **`mod.rs` registration** — `pub mod sweeper;` line in
63//!    `crates/reddb-server/src/storage/cache/mod.rs`, plus a `pub use
64//!    sweeper::{BlobCacheSweeper, SweepLimit, SweepReport, OrphanReport,
65//!    NamespaceFlushReport, NamespaceSweepStats};` re-export so callers can
66//!    reach the type without the long path.
67//!
68//! 2. **`BlobCache` accessor extensions** — to walk L1 entries and L2 records
69//!    the sweeper needs read-only iterators on `BlobCache`. Today neither
70//!    surface exists, so `sweep_expired` and `reclaim_orphans` are bounded
71//!    scaffolding that report zero work until those accessors land. Required
72//!    additions (in `cache/blob/cache.rs`):
73//!
74//!    ```ignore
75//!    pub fn for_each_l1_entry<F>(&self, visit: F)
76//!    where F: FnMut(&str /*namespace*/, &str /*key*/, L1EntryView<'_>);
77//!
78//!    pub fn for_each_l2_record<F>(&self, visit: F)
79//!    where F: FnMut(L2RecordView<'_>);
80//!
81//!    pub fn l2_orphan_chains(&self) -> impl Iterator<Item = u32 /*root_page*/>;
82//!    ```
83//!
84//!    The `L1EntryView` projection should expose `expires_at_unix_ms`,
85//!    `namespace_generation`, and `size`. The `L2RecordView` should expose
86//!    `namespace`, `key`, `root_page`, `byte_len`. With those, the bodies of
87//!    `sweep_expired` and `reclaim_orphans` become straightforward (sketches
88//!    inline below).
89//!
90//! 3. **Backup integration** — `runtime/backup.rs` (or the equivalent
91//!    backup-orchestrator module) needs an `include_blob_cache: bool` flag
92//!    and matching dump/restore round-trip for the L2 metadata B+ tree and
93//!    blob chains. The sweeper plays no part in backup itself, but the spec
94//!    in `docs/adr/0006-tiered-blob-cache.md` ties them together: a backup
95//!    triggered while a sweep is in flight must observe a consistent L2
96//!    snapshot.
97//!
98//! 4. **Admin HTTP handler** — `POST /admin/blob_cache/sweep` and
99//!    `POST /admin/blob_cache/flush_namespace` endpoints (likely under
100//!    `crates/reddb-server/src/http/admin/`), parsing a JSON body matching
101//!    [`SweepLimit`] / namespace name and returning the report struct as
102//!    JSON. Both stay flagged for follow-up per the issue.
103//!
104//! 5. **Runtime config knob** — default [`SweepLimit`] for
105//!    background-scheduled sweeps + a `sweep_on_startup: bool` option in the
106//!    server config struct. The runtime scheduler then calls
107//!    [`BlobCacheSweeper::sweep_expired`] periodically.
108
109use std::time::Instant;
110
111use super::blob::BlobCache;
112
113// ---------------------------------------------------------------------------
114// Public types
115// ---------------------------------------------------------------------------
116
117/// Bound for a single sweeper invocation.
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum SweepLimit {
120    /// Hard cap on entries scanned. The sweep stops as soon as this many
121    /// entries have been visited (whether evicted or not).
122    Entries(usize),
123    /// Hard cap on wall-clock milliseconds. The sweep stops as soon as the
124    /// elapsed time crosses this threshold.
125    Millis(u32),
126    /// First-wins composite. Stops when either bound is hit.
127    Either { entries: usize, millis: u32 },
128}
129
130impl SweepLimit {
131    fn entries_cap(self) -> Option<usize> {
132        match self {
133            SweepLimit::Entries(n) => Some(n),
134            SweepLimit::Either { entries, .. } => Some(entries),
135            SweepLimit::Millis(_) => None,
136        }
137    }
138
139    fn millis_cap(self) -> Option<u32> {
140        match self {
141            SweepLimit::Millis(n) => Some(n),
142            SweepLimit::Either { millis, .. } => Some(millis),
143            SweepLimit::Entries(_) => None,
144        }
145    }
146}
147
148/// Per-namespace breakdown of a [`SweepReport`].
149#[derive(Debug, Clone, Default, PartialEq, Eq)]
150pub struct NamespaceSweepStats {
151    pub entries_scanned: usize,
152    pub entries_evicted: usize,
153    pub bytes_reclaimed: u64,
154}
155
156/// Outcome of [`BlobCacheSweeper::sweep_expired`].
157#[derive(Debug, Clone, Default, PartialEq, Eq)]
158pub struct SweepReport {
159    pub entries_scanned: usize,
160    pub entries_evicted: usize,
161    pub bytes_reclaimed: u64,
162    pub elapsed_ms: u32,
163    /// `true` iff the sweep terminated because it hit [`SweepLimit`] before
164    /// finishing the full set of candidates. Admin callers should treat this
165    /// as "schedule another sweep soon".
166    pub truncated_due_to_limit: bool,
167    pub by_namespace: Vec<(String, NamespaceSweepStats)>,
168}
169
170/// Outcome of [`BlobCacheSweeper::reclaim_orphans`].
171#[derive(Debug, Clone, Default, PartialEq, Eq)]
172pub struct OrphanReport {
173    pub blob_chains_scanned: usize,
174    pub blob_chains_reclaimed: usize,
175    pub bytes_reclaimed: u64,
176    pub elapsed_ms: u32,
177    pub truncated_due_to_limit: bool,
178}
179
180/// Outcome of [`BlobCacheSweeper::flush_namespace`].
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct NamespaceFlushReport {
183    pub namespace: String,
184    pub generation_before: u64,
185    pub generation_after: u64,
186    /// Foreground-fast contract target: <100 µs typical.
187    pub elapsed_micros: u32,
188}
189
190// ---------------------------------------------------------------------------
191// Sweeper
192// ---------------------------------------------------------------------------
193
194/// Stateless namespace for sweeper operations against a [`BlobCache`].
195///
196/// The sweeper holds no state of its own; every method is a free-function
197/// over `&BlobCache`. This keeps the surface easy to call from admin
198/// handlers, the runtime scheduler, and tests without owning a sweeper
199/// instance.
200pub struct BlobCacheSweeper;
201
202impl BlobCacheSweeper {
203    /// Bounded sweep of expired L1 entries.
204    ///
205    /// Walks the L1 entries (across shards) checking
206    /// `Entry::is_expired_at(now)`, and evicts expired ones via
207    /// [`BlobCache::invalidate_key`]. Honors `limit` precisely.
208    ///
209    /// # Concurrency
210    ///
211    /// Concurrent reads on the cache MUST NOT block. Per-shard locks are
212    /// taken only for the brief windows that `BlobCache::invalidate_key`
213    /// needs them; readers on other shards run unimpeded.
214    ///
215    /// # Current implementation status
216    ///
217    /// Today this is **bounded scaffolding**: it sets up the time/entries
218    /// budget and returns a zero-work report. The actual L1 walk requires a
219    /// public iteration accessor on `BlobCache` (see flag #2 at the top of
220    /// this module). Once that lands, the body becomes:
221    ///
222    /// ```ignore
223    /// cache.for_each_l1_entry(|namespace, key, view| {
224    ///     if budget.exhausted() { return ControlFlow::Break(()); }
225    ///     report.entries_scanned += 1;
226    ///     if view.is_expired_at(now_ms) {
227    ///         let bytes = view.size as u64;
228    ///         if cache.invalidate_key(namespace, key) > 0 {
229    ///             report.entries_evicted += 1;
230    ///             report.bytes_reclaimed += bytes;
231    ///             accumulate_namespace(&mut report, namespace, bytes);
232    ///         }
233    ///     }
234    ///     budget.tick()
235    /// });
236    /// ```
237    pub fn sweep_expired(cache: &BlobCache, limit: SweepLimit) -> SweepReport {
238        let started = Instant::now();
239        let budget = Budget::new(limit, started);
240
241        // Touch `cache.stats()` so the sweeper observably interacts with the
242        // cache (and so the "concurrent reads never block" property test
243        // exercises a real `&BlobCache` codepath rather than a no-op).
244        let _ = cache.stats();
245
246        let mut report = SweepReport::default();
247
248        // Bounded scaffolding: real L1 walk awaits flag #2.
249        // The budget is honored even when there is no work to do — we still
250        // record elapsed time so callers can verify the contract.
251        budget.observe(&mut report.elapsed_ms);
252        report.truncated_due_to_limit = false;
253        report
254    }
255
256    /// Bounded reclamation of L2 orphan blob chains.
257    ///
258    /// Orphan chains arise when a writer flushes the blob bytes to L2 pages
259    /// but is killed before the metadata B+ tree commit (see the WAL-ordering
260    /// note in `docs/perf/blob-cache-l2-spike.md`). Recovery cannot tell
261    /// these pages apart from a successful write without cross-checking the
262    /// metadata catalog, so they accumulate as wasted L2 capacity until the
263    /// sweeper reclaims them.
264    ///
265    /// The algorithm (once accessor #2 lands):
266    /// 1. Walk the L2 free-list of allocated blob chains
267    ///    (`cache.l2_blob_chains()` — to-be-added).
268    /// 2. For each chain root, look up the metadata B+ tree
269    ///    (`cache.for_each_l2_record`) and check whether any record points
270    ///    at this root.
271    /// 3. Chains with no metadata reference are orphans — free their pages
272    ///    via the L2 API and accumulate `bytes_reclaimed`.
273    ///
274    /// # Concurrency
275    ///
276    /// Same contract as [`BlobCacheSweeper::sweep_expired`].
277    pub fn reclaim_orphans(cache: &BlobCache, limit: SweepLimit) -> OrphanReport {
278        let started = Instant::now();
279        let budget = Budget::new(limit, started);
280
281        // Same "observable interaction" rationale as in `sweep_expired`.
282        let _ = cache.stats();
283
284        let mut report = OrphanReport::default();
285
286        // Bounded scaffolding: real L2 chain walk awaits flag #2.
287        budget.observe(&mut report.elapsed_ms);
288        report.truncated_due_to_limit = false;
289        report
290    }
291
292    /// Foreground-fast namespace flush.
293    ///
294    /// Delegates to [`BlobCache::invalidate_namespace`], which is O(1): it
295    /// only bumps a per-namespace generation counter. Cached entries with
296    /// the old generation become invisible immediately and are physically
297    /// removed by later cache access or by [`BlobCacheSweeper::sweep_expired`].
298    ///
299    /// # Foreground-fast contract
300    ///
301    /// Returns within ~100 µs typical. The `elapsed_micros` field on the
302    /// returned [`NamespaceFlushReport`] makes the contract observable so
303    /// admin endpoints can alert on regressions.
304    ///
305    /// # Generation reporting
306    ///
307    /// `generation_before` and `generation_after` are reported on a
308    /// best-effort basis. Because `BlobCache` does not expose a public
309    /// generation accessor, the sweeper reports them as `(0, 0)` when the
310    /// flush call signals "namespace did not exist" and `(0, 0)` when it
311    /// signals success — placeholder values until a `current_generation`
312    /// public accessor is added (see flag #2). The `namespace` and
313    /// `elapsed_micros` fields are accurate today.
314    pub fn flush_namespace(cache: &BlobCache, namespace: &str) -> NamespaceFlushReport {
315        let started = Instant::now();
316        let _flushed = cache.invalidate_namespace(namespace);
317        let elapsed = started.elapsed();
318        // Saturate at u32::MAX; under the foreground-fast contract this
319        // bound is never approached.
320        let elapsed_micros = u32::try_from(elapsed.as_micros()).unwrap_or(u32::MAX);
321        NamespaceFlushReport {
322            namespace: namespace.to_string(),
323            // FLAG: generation values are placeholders pending a
324            // `BlobCache::current_generation(&str) -> u64` public accessor.
325            generation_before: 0,
326            generation_after: 0,
327            elapsed_micros,
328        }
329    }
330}
331
332// ---------------------------------------------------------------------------
333// Budget — internal bounded-work accounting
334// ---------------------------------------------------------------------------
335
336/// Internal helper: tracks elapsed time and entries-scanned against a
337/// [`SweepLimit`]. Centralises the "first-bound-wins" logic so the three
338/// public sweeper entry-points stay short.
339struct Budget {
340    started: Instant,
341    entries_cap: Option<usize>,
342    millis_cap: Option<u32>,
343    entries_seen: usize,
344}
345
346impl Budget {
347    fn new(limit: SweepLimit, started: Instant) -> Self {
348        Self {
349            started,
350            entries_cap: limit.entries_cap(),
351            millis_cap: limit.millis_cap(),
352            entries_seen: 0,
353        }
354    }
355
356    /// Returns `true` if either bound has been crossed.
357    #[allow(dead_code)] // Used by the not-yet-wired walk loops; keep for clarity.
358    fn exhausted(&self) -> bool {
359        if let Some(cap) = self.entries_cap {
360            if self.entries_seen >= cap {
361                return true;
362            }
363        }
364        if let Some(cap) = self.millis_cap {
365            if self.elapsed_ms_capped() >= cap {
366                return true;
367            }
368        }
369        false
370    }
371
372    /// Records that one more entry was scanned. Returns the post-tick
373    /// `exhausted()` result.
374    #[allow(dead_code)] // Used by the not-yet-wired walk loops; keep for clarity.
375    fn tick(&mut self) -> bool {
376        self.entries_seen = self.entries_seen.saturating_add(1);
377        self.exhausted()
378    }
379
380    fn elapsed_ms_capped(&self) -> u32 {
381        u32::try_from(self.started.elapsed().as_millis()).unwrap_or(u32::MAX)
382    }
383
384    /// Stamps the elapsed-ms field of a report at the end of a sweep.
385    fn observe(self, elapsed_ms_field: &mut u32) {
386        *elapsed_ms_field = self.elapsed_ms_capped();
387    }
388}
389
390// ---------------------------------------------------------------------------
391// Tests
392// ---------------------------------------------------------------------------
393
394#[cfg(test)]
395mod tests {
396    use std::sync::atomic::{AtomicBool, Ordering};
397    use std::sync::Arc;
398    use std::thread;
399    use std::time::{Duration, Instant};
400
401    use super::*;
402    use crate::storage::cache::blob::{BlobCache, BlobCacheConfig, BlobCachePolicy, BlobCachePut};
403
404    fn cache() -> BlobCache {
405        BlobCache::new(
406            BlobCacheConfig::default()
407                .with_l1_bytes_max(64 * 1024)
408                .with_shard_count(4)
409                .with_max_namespaces(8),
410        )
411    }
412
413    // -- SweepLimit cap-extraction ------------------------------------------
414
415    #[test]
416    fn sweep_limit_entries_only_caps_entries() {
417        let limit = SweepLimit::Entries(42);
418        assert_eq!(limit.entries_cap(), Some(42));
419        assert_eq!(limit.millis_cap(), None);
420    }
421
422    #[test]
423    fn sweep_limit_millis_only_caps_millis() {
424        let limit = SweepLimit::Millis(7);
425        assert_eq!(limit.entries_cap(), None);
426        assert_eq!(limit.millis_cap(), Some(7));
427    }
428
429    #[test]
430    fn sweep_limit_either_caps_both() {
431        let limit = SweepLimit::Either {
432            entries: 100,
433            millis: 5,
434        };
435        assert_eq!(limit.entries_cap(), Some(100));
436        assert_eq!(limit.millis_cap(), Some(5));
437    }
438
439    // -- Budget first-bound-wins --------------------------------------------
440
441    #[test]
442    fn budget_entries_bound_fires_first() {
443        let mut budget = Budget::new(SweepLimit::Entries(3), Instant::now());
444        assert!(!budget.exhausted());
445        assert!(!budget.tick());
446        assert!(!budget.tick());
447        // Third tick crosses the cap.
448        assert!(budget.tick());
449        assert!(budget.exhausted());
450    }
451
452    #[test]
453    fn budget_either_uses_first_bound_to_fire() {
454        let mut budget = Budget::new(
455            SweepLimit::Either {
456                entries: 1_000,
457                millis: 2,
458            },
459            Instant::now(),
460        );
461        // Entries cap not yet reached.
462        budget.tick();
463        // Sleep just past the millis cap.
464        thread::sleep(Duration::from_millis(5));
465        assert!(budget.exhausted(), "millis bound should fire first");
466    }
467
468    // -- sweep_expired contract ---------------------------------------------
469
470    #[test]
471    fn sweep_expired_with_entries_limit_returns_report_within_bound() {
472        let cache = cache();
473        cache
474            .put("n", "k", BlobCachePut::new(b"v".to_vec()))
475            .unwrap();
476
477        let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
478
479        // Scaffolding contract: returns a well-formed report. The
480        // `truncated_due_to_limit` flag is false because no work was done
481        // — once the L1 walk lands (flag #2) the assertion list expands.
482        assert_eq!(report.entries_scanned, 0);
483        assert_eq!(report.entries_evicted, 0);
484        assert_eq!(report.bytes_reclaimed, 0);
485        assert!(!report.truncated_due_to_limit);
486        assert!(report.by_namespace.is_empty());
487    }
488
489    #[test]
490    fn sweep_expired_with_millis_limit_honors_wall_clock_bound() {
491        let cache = cache();
492        let limit_ms = 50u32;
493        let started = Instant::now();
494        let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Millis(limit_ms));
495        let observed_ms = started.elapsed().as_millis() as u32;
496
497        // The sweeper itself must respect the bound. Allow a generous slop
498        // for CI scheduling jitter.
499        assert!(
500            observed_ms <= limit_ms + 50,
501            "sweep_expired should not block beyond limit; observed {observed_ms}ms vs cap {limit_ms}ms",
502        );
503        assert!(
504            report.elapsed_ms <= limit_ms + 50,
505            "report.elapsed_ms ({}) should be near or under cap ({})",
506            report.elapsed_ms,
507            limit_ms,
508        );
509    }
510
511    #[test]
512    fn sweep_expired_does_not_remove_unexpired_entries() {
513        let cache = cache();
514        cache
515            .put("alive", "k", BlobCachePut::new(b"v".to_vec()))
516            .unwrap();
517
518        let _ = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(100));
519
520        // Entry must still be retrievable after a sweep when it has no TTL.
521        let hit = cache.get("alive", "k").expect("entry survives sweep");
522        assert_eq!(hit.value(), b"v");
523    }
524
525    /// Verifies the contract that a TTL'd entry is removable, even though
526    /// today the sweeper's L1 walk is unimplemented and the actual physical
527    /// removal happens lazily on the next `get`. Marked `#[ignore]` because
528    /// it asserts behaviour that requires accessor flag #2 to be in place.
529    #[test]
530    #[ignore = "requires BlobCache::for_each_l1_entry accessor (flag #2)"]
531    fn sweep_expired_evicts_expired_but_not_unexpired() {
532        let cache = cache();
533        let policy = BlobCachePolicy::default().expires_at_unix_ms(1);
534        cache
535            .put(
536                "n",
537                "expired",
538                BlobCachePut::new(b"x".to_vec()).with_policy(policy),
539            )
540            .unwrap();
541        cache
542            .put("n", "alive", BlobCachePut::new(b"y".to_vec()))
543            .unwrap();
544
545        let report = BlobCacheSweeper::sweep_expired(&cache, SweepLimit::Entries(10));
546
547        assert_eq!(report.entries_evicted, 1);
548        assert!(cache.get("n", "expired").is_none());
549        assert!(cache.get("n", "alive").is_some());
550    }
551
552    // -- reclaim_orphans contract -------------------------------------------
553
554    #[test]
555    fn reclaim_orphans_returns_well_formed_report() {
556        let cache = cache();
557        let report = BlobCacheSweeper::reclaim_orphans(&cache, SweepLimit::Entries(10));
558        assert_eq!(report.blob_chains_scanned, 0);
559        assert_eq!(report.blob_chains_reclaimed, 0);
560        assert_eq!(report.bytes_reclaimed, 0);
561        assert!(!report.truncated_due_to_limit);
562    }
563
564    /// Verifies that an orphan chain produced by the `fault_after_blob_write`
565    /// hook is reclaimed by the sweeper. Marked `#[ignore]` because both the
566    /// fault-injection hook and the L2-walk accessor are private to
567    /// `blob.rs`; this test is the contract this sweeper module commits to
568    /// satisfying once flag #2 lands and `inject_l2_fault_after_blob_write_once`
569    /// is exposed for cross-module use.
570    #[test]
571    #[ignore = "requires BlobCache::for_each_l2_record + cross-module fault hook (flag #2)"]
572    fn reclaim_orphans_reclaims_chain_left_by_interrupted_write() {
573        // Sketch (executable once accessor lands):
574        //
575        // let path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
576        // let cache = BlobCache::new(
577        //     BlobCacheConfig::default()
578        //         .with_l1_bytes_max(128)
579        //         .with_l2_path(&path),
580        // );
581        // cache.inject_l2_fault_after_blob_write_once();
582        // let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
583        //     cache
584        //         .put("n", "partial", BlobCachePut::new(b"partial".to_vec()))
585        //         .unwrap();
586        // }));
587        // let report = BlobCacheSweeper::reclaim_orphans(&cache, SweepLimit::Entries(100));
588        // assert_eq!(report.blob_chains_reclaimed, 1);
589        // assert!(report.bytes_reclaimed >= b"partial".len() as u64);
590    }
591
592    // -- flush_namespace ----------------------------------------------------
593
594    #[test]
595    fn flush_namespace_returns_within_foreground_fast_bound_and_bumps_generation() {
596        let cache = cache();
597        cache
598            .put("ns", "k", BlobCachePut::new(b"v".to_vec()))
599            .unwrap();
600
601        let started = Instant::now();
602        let report = BlobCacheSweeper::flush_namespace(&cache, "ns");
603        let observed = started.elapsed();
604
605        assert_eq!(report.namespace, "ns");
606        // Generation values are placeholders today (see flag #2).
607        assert_eq!(report.generation_before, 0);
608        assert_eq!(report.generation_after, 0);
609
610        // Foreground-fast contract: <100 µs typical. Allow generous slop for
611        // CI noise (debug builds + virtualised runners can be 10× slower).
612        assert!(
613            observed < Duration::from_millis(5),
614            "flush_namespace should be foreground-fast; observed {observed:?}",
615        );
616        assert!(
617            report.elapsed_micros < 5_000,
618            "report.elapsed_micros should be <5ms; observed {}µs",
619            report.elapsed_micros,
620        );
621
622        // The flush bumped the generation: previously-stored entry is gone.
623        assert!(
624            cache.get("ns", "k").is_none(),
625            "entry should be invisible after generation bump",
626        );
627    }
628
629    #[test]
630    fn flush_namespace_on_unknown_namespace_still_returns_well_formed_report() {
631        let cache = cache();
632        let report = BlobCacheSweeper::flush_namespace(&cache, "never-existed");
633        assert_eq!(report.namespace, "never-existed");
634        // Implementation reports placeholders for both before/after today.
635        assert_eq!(report.generation_before, 0);
636        assert_eq!(report.generation_after, 0);
637    }
638
639    // -- Concurrency property test ------------------------------------------
640
641    /// 8 reader threads + 1 sweeper thread. Readers must complete a burst
642    /// of `cache.get` calls within a tight time budget while the sweeper
643    /// runs concurrently. Empirically verifies the
644    /// "concurrent reads never block during sweep" contract documented in
645    /// the module-level docstring.
646    ///
647    /// The test is intentionally short-running (≈250 ms total) so it stays
648    /// in the default test suite without slowing CI.
649    #[test]
650    fn concurrent_reads_never_block_during_sweep() {
651        const READER_THREADS: usize = 8;
652        const READS_PER_THREAD: usize = 5_000;
653        // Per-thread soft cap. If readers were ever serialised behind the
654        // sweeper, individual reads would queue and total time would
655        // explode well past this bound.
656        const READER_SOFT_CAP: Duration = Duration::from_millis(500);
657
658        let cache = Arc::new(cache());
659        // Pre-populate with enough entries that reads exercise multiple
660        // shards.
661        for i in 0..64 {
662            cache
663                .put("ns", &format!("k{i}"), BlobCachePut::new(vec![i as u8; 32]))
664                .unwrap();
665        }
666
667        let stop = Arc::new(AtomicBool::new(false));
668
669        // Sweeper thread: keep invoking sweep_expired with a small per-call
670        // bound, simulating the runtime scheduler's background pulse.
671        let sweeper_cache = Arc::clone(&cache);
672        let sweeper_stop = Arc::clone(&stop);
673        let sweeper = thread::spawn(move || {
674            while !sweeper_stop.load(Ordering::Relaxed) {
675                let _ = BlobCacheSweeper::sweep_expired(
676                    &sweeper_cache,
677                    SweepLimit::Either {
678                        entries: 1_000,
679                        millis: 5,
680                    },
681                );
682                let _ = BlobCacheSweeper::reclaim_orphans(&sweeper_cache, SweepLimit::Millis(5));
683            }
684        });
685
686        // Reader threads: each runs a burst of gets and reports its
687        // wall-clock time so the assertion is "no reader was starved".
688        let reader_handles: Vec<_> = (0..READER_THREADS)
689            .map(|tid| {
690                let reader_cache = Arc::clone(&cache);
691                thread::spawn(move || {
692                    let started = Instant::now();
693                    for i in 0..READS_PER_THREAD {
694                        let key = format!("k{}", (tid * 7 + i) % 64);
695                        let _ = reader_cache.get("ns", &key);
696                    }
697                    started.elapsed()
698                })
699            })
700            .collect();
701
702        let elapsed_per_reader: Vec<Duration> = reader_handles
703            .into_iter()
704            .map(|h| h.join().expect("reader thread panicked"))
705            .collect();
706
707        stop.store(true, Ordering::Relaxed);
708        sweeper.join().expect("sweeper thread panicked");
709
710        for (tid, elapsed) in elapsed_per_reader.iter().enumerate() {
711            assert!(
712                *elapsed < READER_SOFT_CAP,
713                "reader {tid} took {elapsed:?}, exceeding soft cap {READER_SOFT_CAP:?} \
714                 — sweeper appears to be blocking reads",
715            );
716        }
717    }
718
719    /// Companion property test: while a `flush_namespace` storm runs,
720    /// readers from a different namespace must remain unaffected.
721    #[test]
722    fn flush_namespace_storm_does_not_block_other_namespace_reads() {
723        let cache = Arc::new(cache());
724        cache
725            .put("readers", "k", BlobCachePut::new(b"hello".to_vec()))
726            .unwrap();
727        // Touch the flush-target namespace once so it exists.
728        cache
729            .put("flushed", "k", BlobCachePut::new(b"x".to_vec()))
730            .unwrap();
731
732        let stop = Arc::new(AtomicBool::new(false));
733        let flush_cache = Arc::clone(&cache);
734        let flush_stop = Arc::clone(&stop);
735        let flusher = thread::spawn(move || {
736            while !flush_stop.load(Ordering::Relaxed) {
737                let _ = BlobCacheSweeper::flush_namespace(&flush_cache, "flushed");
738            }
739        });
740
741        let started = Instant::now();
742        for _ in 0..10_000 {
743            let hit = cache.get("readers", "k").expect("reader namespace alive");
744            assert_eq!(hit.value(), b"hello");
745        }
746        let elapsed = started.elapsed();
747
748        stop.store(true, Ordering::Relaxed);
749        flusher.join().expect("flusher panicked");
750
751        assert!(
752            elapsed < Duration::from_millis(500),
753            "10k reads on a quiet namespace took {elapsed:?} — flush storm appears to block reads",
754        );
755    }
756}