Skip to main content

reddb_server/storage/cache/blob/
cache.rs

1//! Byte-oriented Blob Cache.
2//!
3//! This is the first internal tracer for RedDB's exact-key blob cache. It is
4//! intentionally L1-only: a sharded, byte-bounded, in-process cache with SIEVE
5//! eviction, namespace caps, and opaque content metadata. Durable L2 storage,
6//! dependency invalidation, and public APIs land in follow-up slices.
7
8use super::config::{BlobCacheConfig, L2Compression};
9use super::entry::{effective_expires_at_unix_ms, jitter_seed, Entry};
10use super::l2::BlobCacheL2;
11use super::shard::{InsertOutcome, Lookup, Shard};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::hash::{Hash, Hasher};
15#[cfg(test)]
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
18use std::sync::{Arc, OnceLock, Weak};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21use parking_lot::RwLock;
22
23use super::super::compressor::{CompressOpts, Compressed, L2BlobCompressor};
24use super::super::extended_ttl::ExtendedTtlPolicy;
25use super::super::promotion_pool::{
26    AsyncPromotionPool, PoolOpts, PromotionExecutor, PromotionRequest,
27};
28
29// Test-only thread-local counter of how many times
30// `EffectiveExpiry::compute` is invoked from `Shard::get`. Thread-local
31// (rather than a global atomic) so the off-fast-path test does not race
32// with other tests in the harness's parallel executor.
33#[cfg(test)]
34thread_local! {
35    pub(super) static EFFECTIVE_EXPIRY_COMPUTE_CALLS: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum CacheError {
40    BlobTooLarge {
41        size: usize,
42        max: usize,
43    },
44    MetadataTooLarge {
45        keys: usize,
46        bytes: usize,
47        max_keys: usize,
48        max_bytes: usize,
49    },
50    TooManyNamespaces {
51        max: usize,
52    },
53    VersionMismatch {
54        existing: u64,
55        attempted: u64,
56    },
57    L2Full {
58        size: u64,
59        max: u64,
60    },
61    L2Io(String),
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub(super) struct BlobCacheKey {
66    pub(super) namespace: String,
67    pub(super) key: String,
68}
69
70impl BlobCacheKey {
71    pub(super) fn new(namespace: impl Into<String>, key: impl Into<String>) -> Self {
72        Self {
73            namespace: namespace.into(),
74            key: key.into(),
75        }
76    }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80struct ScopedLabel {
81    namespace: String,
82    label: String,
83}
84
85impl ScopedLabel {
86    fn new(namespace: impl Into<String>, label: impl Into<String>) -> Self {
87        Self {
88            namespace: namespace.into(),
89            label: label.into(),
90        }
91    }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct BlobCacheHit {
96    pub(super) bytes: Arc<[u8]>,
97    pub(super) content_metadata: BTreeMap<String, String>,
98    pub(super) version: Option<u64>,
99    /// `Some(remaining_ms)` when the hit came from the stale-while-revalidate
100    /// window of an `ExtendedTtlPolicy`; `None` when the entry was fresh.
101    /// Boolean staleness is just `.is_some()`.
102    pub(super) stale_window_remaining_ms: Option<u64>,
103}
104
105impl BlobCacheHit {
106    pub(crate) fn new(
107        bytes: Arc<[u8]>,
108        content_metadata: BTreeMap<String, String>,
109        version: Option<u64>,
110    ) -> Self {
111        Self {
112            bytes,
113            content_metadata,
114            version,
115            stale_window_remaining_ms: None,
116        }
117    }
118
119    pub(crate) fn new_stale(
120        bytes: Arc<[u8]>,
121        content_metadata: BTreeMap<String, String>,
122        version: Option<u64>,
123        window_remaining_ms: u64,
124    ) -> Self {
125        Self {
126            bytes,
127            content_metadata,
128            version,
129            stale_window_remaining_ms: Some(window_remaining_ms),
130        }
131    }
132
133    /// Cached payload, refcounted so duplicate readers share the buffer.
134    pub fn bytes(&self) -> &Arc<[u8]> {
135        &self.bytes
136    }
137
138    /// Convenience accessor returning a `&[u8]` view into [`bytes`](Self::bytes).
139    pub fn value(&self) -> &[u8] {
140        &self.bytes
141    }
142
143    /// Opaque content metadata captured on `put`.
144    pub fn content_metadata(&self) -> &BTreeMap<String, String> {
145        &self.content_metadata
146    }
147
148    /// Optional CAS / freshness version stamped on `put`.
149    pub fn version(&self) -> Option<u64> {
150        self.version
151    }
152
153    /// `true` when the hit was served from the stale-while-revalidate window
154    /// of an `ExtendedTtlPolicy`. Always `false` when the extended policy is
155    /// `off()` or the entry was within its hard expiry.
156    pub fn is_stale(&self) -> bool {
157        self.stale_window_remaining_ms.is_some()
158    }
159
160    /// Remaining stale-window milliseconds when [`is_stale`](Self::is_stale)
161    /// is `true`; `None` when the hit was fresh.
162    pub fn stale_window_remaining_ms(&self) -> Option<u64> {
163        self.stale_window_remaining_ms
164    }
165}
166
167#[derive(Debug, Clone, Default, PartialEq, Eq)]
168pub struct BlobCachePut {
169    pub bytes: Vec<u8>,
170    pub content_metadata: BTreeMap<String, String>,
171    pub tags: BTreeSet<String>,
172    pub dependencies: BTreeSet<String>,
173    pub policy: BlobCachePolicy,
174}
175
176impl BlobCachePut {
177    pub fn new(bytes: impl Into<Vec<u8>>) -> Self {
178        Self {
179            bytes: bytes.into(),
180            content_metadata: BTreeMap::new(),
181            tags: BTreeSet::new(),
182            dependencies: BTreeSet::new(),
183            policy: BlobCachePolicy::default(),
184        }
185    }
186
187    pub fn with_content_metadata(mut self, content_metadata: BTreeMap<String, String>) -> Self {
188        self.content_metadata = content_metadata;
189        self
190    }
191
192    pub fn with_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
193        self.tags = tags.into_iter().map(Into::into).collect();
194        self
195    }
196
197    pub fn with_dependencies(
198        mut self,
199        dependencies: impl IntoIterator<Item = impl Into<String>>,
200    ) -> Self {
201        self.dependencies = dependencies.into_iter().map(Into::into).collect();
202        self
203    }
204
205    pub fn with_policy(mut self, policy: BlobCachePolicy) -> Self {
206        self.policy = policy;
207        self
208    }
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum L1Admission {
213    Always,
214    Auto,
215    Never,
216}
217
218/// Three-valued answer for [`BlobCache::exists`].
219///
220/// Today the implementation always returns [`Present`](Self::Present) or
221/// [`Absent`](Self::Absent) — it tracks the answer authoritatively. The
222/// [`MaybePresent`](Self::MaybePresent) variant exists in the type so the
223/// upcoming Bloom synopsis (#146) can answer "probably yes" without forcing
224/// a metadata read, all without breaking the `exists` contract.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum CachePresence {
227    /// The cache holds a live entry for this key.
228    Present,
229    /// The cache definitely does not hold this key (negative cache hit).
230    Absent,
231    /// A probabilistic synopsis cannot rule the key out without a deeper
232    /// lookup. Treat as a hit prospect: the caller should fetch.
233    MaybePresent,
234}
235
236impl From<bool> for CachePresence {
237    fn from(present: bool) -> Self {
238        if present {
239            CachePresence::Present
240        } else {
241            CachePresence::Absent
242        }
243    }
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct BlobCachePolicy {
248    ttl_ms: Option<u64>,
249    expires_at_unix_ms: Option<u64>,
250    max_blob_bytes: Option<usize>,
251    l1_admission: L1Admission,
252    priority: u8,
253    version: Option<u64>,
254    /// Extended TTL knobs (idle / stale-while-revalidate / jitter).
255    /// Defaults to [`ExtendedTtlPolicy::off`] so existing call sites and
256    /// stored entries continue to behave with hard-expiry-only semantics.
257    /// Wired into [`BlobCache::get`] behind the
258    /// `cache.blob.policy.extended` config knob (#194).
259    extended: ExtendedTtlPolicy,
260}
261
262impl Default for BlobCachePolicy {
263    fn default() -> Self {
264        Self {
265            ttl_ms: None,
266            expires_at_unix_ms: None,
267            max_blob_bytes: None,
268            l1_admission: L1Admission::Auto,
269            priority: 128,
270            version: None,
271            extended: ExtendedTtlPolicy::off(),
272        }
273    }
274}
275
276impl BlobCachePolicy {
277    // ----- builder-style setters (consuming) -----------------------------
278
279    pub fn ttl_ms(mut self, ttl_ms: u64) -> Self {
280        self.ttl_ms = Some(ttl_ms);
281        self
282    }
283
284    pub fn expires_at_unix_ms(mut self, expires_at_unix_ms: u64) -> Self {
285        self.expires_at_unix_ms = Some(expires_at_unix_ms);
286        self
287    }
288
289    pub fn max_blob_bytes(mut self, max_blob_bytes: usize) -> Self {
290        self.max_blob_bytes = Some(max_blob_bytes);
291        self
292    }
293
294    pub fn l1_admission(mut self, l1_admission: L1Admission) -> Self {
295        self.l1_admission = l1_admission;
296        self
297    }
298
299    pub fn priority(mut self, priority: u8) -> Self {
300        self.priority = priority;
301        self
302    }
303
304    pub fn version(mut self, version: u64) -> Self {
305        self.version = Some(version);
306        self
307    }
308
309    /// Replace the extended TTL knobs in one chainable call. Defaults to
310    /// [`ExtendedTtlPolicy::off`]; setting an active policy turns on the
311    /// idle / stale-serve / jitter behaviours in [`BlobCache::get`] and
312    /// [`BlobCache::put`] for entries written with this policy.
313    pub fn extended(mut self, extended: ExtendedTtlPolicy) -> Self {
314        self.extended = extended;
315        self
316    }
317
318    // ----- read-back accessors -------------------------------------------
319    //
320    // Setter methods consume `self` and return `Self`, so they cannot share
321    // a name with `&self` getters. The `*_value` suffix keeps both surfaces
322    // available without renaming the public builder API.
323
324    pub fn ttl_ms_value(&self) -> Option<u64> {
325        self.ttl_ms
326    }
327
328    pub fn expires_at_unix_ms_value(&self) -> Option<u64> {
329        self.expires_at_unix_ms
330    }
331
332    pub fn max_blob_bytes_value(&self) -> Option<usize> {
333        self.max_blob_bytes
334    }
335
336    pub fn l1_admission_value(&self) -> L1Admission {
337        self.l1_admission
338    }
339
340    pub fn priority_value(&self) -> u8 {
341        self.priority
342    }
343
344    pub fn version_value(&self) -> Option<u64> {
345        self.version
346    }
347
348    /// Read-back accessor for the extended TTL knobs. Mirrors the
349    /// `*_value` getter pattern used by every other [`BlobCachePolicy`]
350    /// field (#151 — fields are private; readers go through getters).
351    pub fn extended_value(&self) -> ExtendedTtlPolicy {
352        self.extended
353    }
354}
355
356#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
357pub struct BlobCacheStats {
358    pub(super) hits: u64,
359    pub(super) misses: u64,
360    pub(super) insertions: u64,
361    pub(super) evictions: u64,
362    pub(super) expirations: u64,
363    pub(super) invalidations: u64,
364    pub(super) namespace_flushes: u64,
365    pub(super) version_mismatches: u64,
366    pub(super) entries: usize,
367    pub(super) bytes_in_use: usize,
368    pub(super) l1_bytes_max: usize,
369    pub(super) l2_bytes_in_use: u64,
370    pub(super) l2_bytes_max: u64,
371    pub(super) l2_full_rejections: u64,
372    pub(super) l2_metadata_reads: u64,
373    pub(super) l2_negative_skips: u64,
374    /// Times the per-namespace Bloom synopsis answered `MaybePresent` but the
375    /// authoritative L2 metadata B+ tree said `Absent` (the false-positive
376    /// cost of the probabilistic synopsis).
377    pub(super) synopsis_metadata_reads: u64,
378    /// Total bytes used by all per-namespace Bloom synopsis filters.
379    pub(super) synopsis_bytes: u64,
380    pub(super) namespaces: usize,
381    pub(super) max_namespaces: usize,
382    /// Async promotion pool counters (issue #193). All zero when the
383    /// pool is not enabled (default — `cache.blob.async_promotion = "off"`).
384    pub(super) promotion_queued: u64,
385    pub(super) promotion_dropped: u64,
386    pub(super) promotion_completed: u64,
387    pub(super) promotion_queue_depth: usize,
388    /// Numerator of the L2 compression ratio: sum of `original_len` over
389    /// entries that actually compressed (#192). Stored as the ratio's
390    /// component so [`BlobCacheStats`] stays `Eq` (avoids `f64` fields).
391    pub(super) l2_compression_original_bytes: u64,
392    /// Denominator of the L2 compression ratio: sum of `stored_len` over
393    /// entries that actually compressed.
394    pub(super) l2_compression_stored_bytes: u64,
395    /// Counter of L2 entries the compressor returned as `Raw` (any reason).
396    pub(super) l2_compression_skipped_total: u64,
397    /// Cumulative `(original_len - stored_len)` across compressed entries.
398    pub(super) l2_bytes_saved_total: u64,
399    /// Counter — L1 hits that served a stale entry from the SWR window of
400    /// an `ExtendedTtlPolicy` (#194). Stays 0 when extended is off.
401    pub(super) l1_stale_serves_total: u64,
402    /// Counter — L1 entries evicted by the idle-TTL gate of an
403    /// `ExtendedTtlPolicy` (#194). Stays 0 when extended is off.
404    pub(super) l1_idle_evicts_total: u64,
405}
406
407impl BlobCacheStats {
408    /// Number of `get`/`exists` calls that resolved to `Present` /
409    /// `MaybePresent`. Both count as hit prospects.
410    pub fn hits(&self) -> u64 {
411        self.hits
412    }
413
414    /// Number of `get`/`exists` calls that resolved to `Absent`.
415    pub fn misses(&self) -> u64 {
416        self.misses
417    }
418
419    pub fn insertions(&self) -> u64 {
420        self.insertions
421    }
422
423    pub fn evictions(&self) -> u64 {
424        self.evictions
425    }
426
427    pub fn expirations(&self) -> u64 {
428        self.expirations
429    }
430
431    pub fn invalidations(&self) -> u64 {
432        self.invalidations
433    }
434
435    pub fn namespace_flushes(&self) -> u64 {
436        self.namespace_flushes
437    }
438
439    pub fn version_mismatches(&self) -> u64 {
440        self.version_mismatches
441    }
442
443    pub fn entries(&self) -> usize {
444        self.entries
445    }
446
447    /// Bytes resident in L1. Returned as `u64` for symmetry with
448    /// [`l2_bytes_in_use`](Self::l2_bytes_in_use); upcast is lossless.
449    pub fn bytes_in_use(&self) -> u64 {
450        self.bytes_in_use as u64
451    }
452
453    pub fn l1_bytes_max(&self) -> usize {
454        self.l1_bytes_max
455    }
456
457    pub fn l2_bytes_in_use(&self) -> u64 {
458        self.l2_bytes_in_use
459    }
460
461    pub fn l2_bytes_max(&self) -> u64 {
462        self.l2_bytes_max
463    }
464
465    pub fn l2_full_rejections(&self) -> u64 {
466        self.l2_full_rejections
467    }
468
469    pub fn l2_metadata_reads(&self) -> u64 {
470        self.l2_metadata_reads
471    }
472
473    pub fn l2_negative_skips(&self) -> u64 {
474        self.l2_negative_skips
475    }
476
477    /// Times the Bloom synopsis answered `MaybePresent` but the authoritative
478    /// L2 metadata B+ tree said `Absent`. This is the cost of the
479    /// probabilistic synopsis: a counter for the false-positive rate in
480    /// production. Negative answers from the filter never trigger a metadata
481    /// read (see [`l2_negative_skips`](Self::l2_negative_skips)).
482    pub fn synopsis_metadata_reads(&self) -> u64 {
483        self.synopsis_metadata_reads
484    }
485
486    /// Total bytes used by all per-namespace Bloom synopsis filters.
487    pub fn synopsis_bytes(&self) -> u64 {
488        self.synopsis_bytes
489    }
490
491    pub fn namespaces(&self) -> usize {
492        self.namespaces
493    }
494
495    pub fn max_namespaces(&self) -> usize {
496        self.max_namespaces
497    }
498
499    /// Total promotion requests successfully enqueued by `get` since boot.
500    /// `0` when async promotion is disabled.
501    pub fn promotion_queued(&self) -> u64 {
502        self.promotion_queued
503    }
504
505    /// Total promotion requests dropped on queue saturation since boot.
506    /// `0` when async promotion is disabled.
507    pub fn promotion_dropped(&self) -> u64 {
508        self.promotion_dropped
509    }
510
511    /// Total promotion requests executed by workers since boot.
512    /// `0` when async promotion is disabled.
513    pub fn promotion_completed(&self) -> u64 {
514        self.promotion_completed
515    }
516
517    /// Snapshot of pending requests in the promotion queue.
518    /// `0` when async promotion is disabled.
519    pub fn promotion_queue_depth(&self) -> usize {
520        self.promotion_queue_depth
521    }
522
523    /// Running average of `original_len / stored_len` for L2 entries that
524    /// the compressor actually shrank (#192). Returns `1.0` when no
525    /// compressed entry has been observed yet, regardless of how many
526    /// `Raw` entries have passed through (callers should pair this with
527    /// [`l2_compression_skipped_total`](Self::l2_compression_skipped_total)
528    /// to interpret).
529    pub fn l2_compression_ratio_observed(&self) -> f64 {
530        if self.l2_compression_stored_bytes == 0 {
531            return 1.0;
532        }
533        self.l2_compression_original_bytes as f64 / self.l2_compression_stored_bytes as f64
534    }
535
536    /// Number of L2 entries the compressor returned as `Raw` since boot —
537    /// any reason: payload below `min_bytes`, content type already
538    /// compressed, ratio gate fired, or `cache.blob.l2_compression = "off"`.
539    pub fn l2_compression_skipped_total(&self) -> u64 {
540        self.l2_compression_skipped_total
541    }
542
543    /// Cumulative `(original_len - stored_len)` across all L2 entries the
544    /// compressor shrank. Operators read this to size the L2 budget
545    /// multiplier from real workloads.
546    pub fn l2_bytes_saved_total(&self) -> u64 {
547        self.l2_bytes_saved_total
548    }
549
550    /// Counter — L1 hits served as stale by the SWR window of an
551    /// `ExtendedTtlPolicy` (#194). `0` when no entry was written with an
552    /// active extended policy.
553    pub fn l1_stale_serves_total(&self) -> u64 {
554        self.l1_stale_serves_total
555    }
556
557    /// Counter — L1 entries evicted by the idle-TTL gate of an
558    /// `ExtendedTtlPolicy` (#194). `0` when no entry was written with an
559    /// active extended policy.
560    pub fn l1_idle_evicts_total(&self) -> u64 {
561        self.l1_idle_evicts_total
562    }
563}
564
565#[derive(Clone, Copy)]
566enum IndexedKind {
567    Tag,
568    Dependency,
569}
570
571#[derive(Debug)]
572struct AtomicStats {
573    hits: AtomicU64,
574    misses: AtomicU64,
575    insertions: AtomicU64,
576    evictions: AtomicU64,
577    expirations: AtomicU64,
578    invalidations: AtomicU64,
579    namespace_flushes: AtomicU64,
580    version_mismatches: AtomicU64,
581    l2_full_rejections: AtomicU64,
582    /// Counter incremented every time `BlobCache::get` returns a stale
583    /// entry from the SWR window of an `ExtendedTtlPolicy`. Stays at 0
584    /// when extended is `off()` for every entry.
585    l1_stale_serves: AtomicU64,
586    /// Counter incremented every time the idle-TTL gate of an
587    /// `ExtendedTtlPolicy` evicts an L1 entry. Stays at 0 when extended
588    /// is `off()` for every entry.
589    l1_idle_evicts: AtomicU64,
590}
591
592impl AtomicStats {
593    fn new() -> Self {
594        Self {
595            hits: AtomicU64::new(0),
596            misses: AtomicU64::new(0),
597            insertions: AtomicU64::new(0),
598            evictions: AtomicU64::new(0),
599            expirations: AtomicU64::new(0),
600            invalidations: AtomicU64::new(0),
601            namespace_flushes: AtomicU64::new(0),
602            version_mismatches: AtomicU64::new(0),
603            l2_full_rejections: AtomicU64::new(0),
604            l1_stale_serves: AtomicU64::new(0),
605            l1_idle_evicts: AtomicU64::new(0),
606        }
607    }
608}
609
610/// Sharded, byte-bounded blob cache with optional durable L2 backing.
611///
612/// # Concurrency
613///
614/// `BlobCache` is `Send + Sync`. All public methods are safe to call from
615/// multiple threads concurrently. Internal sharding ensures disjoint-key
616/// contention does not serialize: independent keys land on independent
617/// `RwLock<Shard>` instances, and the global indexes (namespace set, tag /
618/// dependency maps) are read-mostly behind their own `RwLock`s.
619///
620/// `BlobCache` is **not** `Clone` — share ownership via `Arc<BlobCache>`.
621///
622/// # Blocking
623///
624/// All methods are synchronous. `put` may perform L2 disk I/O on the
625/// calling thread when an L2 path is configured; tokio callers should wrap
626/// `put` in `spawn_blocking`. `get`, `exists`, and the `invalidate_*`
627/// family touch L2 only on rehydrate / delete paths.
628pub struct BlobCache {
629    config: BlobCacheConfig,
630    shards: Vec<RwLock<Shard>>,
631    namespaces: RwLock<HashSet<String>>,
632    namespace_generations: RwLock<HashMap<String, u64>>,
633    tag_index: RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
634    dependency_index: RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
635    l2: Option<Arc<BlobCacheL2>>,
636    bytes_in_use: AtomicUsize,
637    stats: AtomicStats,
638    /// Optional async L2->L1 promotion pool (issue #193). When `None`,
639    /// `get` performs the L1 promotion synchronously on the read path.
640    /// When set via `enable_async_promotion`, L2 hits return bytes to
641    /// the caller immediately and the L1 install runs on a worker.
642    promotion_pool: OnceLock<Arc<AsyncPromotionPool>>,
643}
644
645// Compile-time guarantee that the documented `Send + Sync` contract above
646// stays in lockstep with the struct's interior. If this ever fails to
647// compile, the docstring is lying — fix the field that broke it, do not
648// remove this assertion.
649const _: fn() = || {
650    fn assert_send_sync<T: Send + Sync>() {}
651    assert_send_sync::<BlobCache>();
652};
653
654impl BlobCache {
655    /// Infallible constructor. Panics if `config.l2_path` is set and the L2
656    /// file cannot be opened — use [`BlobCache::open_with_l2`] instead for
657    /// configs that include an L2 path so boot errors are handled gracefully.
658    pub fn new(config: BlobCacheConfig) -> Self {
659        Self::try_new(config).expect("open blob-cache L2")
660    }
661
662    /// Fallible constructor for configs that include an L2 path.
663    /// Returns `Err(CacheError::L2Io(...))` on invalid path, corrupt control
664    /// sidecar, or any other recoverable I/O failure — the process stays alive.
665    pub fn open_with_l2(config: BlobCacheConfig) -> Result<Self, CacheError> {
666        Self::try_new(config)
667    }
668
669    fn try_new(config: BlobCacheConfig) -> Result<Self, CacheError> {
670        let config = BlobCacheConfig {
671            shard_count: config.shard_count.max(1),
672            ..config
673        };
674        let l2 = config
675            .l2_path
676            .clone()
677            .map(|path| BlobCacheL2::open(path, config.l2_bytes_max))
678            .transpose()?;
679        let shards = (0..config.shard_count)
680            .map(|_| RwLock::new(Shard::new()))
681            .collect();
682        Ok(Self {
683            config,
684            shards,
685            namespaces: RwLock::new(HashSet::new()),
686            namespace_generations: RwLock::new(HashMap::new()),
687            tag_index: RwLock::new(HashMap::new()),
688            dependency_index: RwLock::new(HashMap::new()),
689            l2: l2.map(Arc::new),
690            bytes_in_use: AtomicUsize::new(0),
691            stats: AtomicStats::new(),
692            promotion_pool: OnceLock::new(),
693        })
694    }
695
696    pub fn with_defaults() -> Self {
697        Self::new(BlobCacheConfig::default())
698    }
699
700    /// Path to the L2 metadata B+ tree directory, when L2 is enabled.
701    ///
702    /// Used by the backup orchestrator (`include_blob_cache=true`) so it
703    /// can locate the on-disk L2 tree for tarball / per-file upload, and
704    /// by the runbook procedures in
705    /// `docs/operations/blob-cache-backup-restore.md` §2 / §3 to confirm
706    /// where on disk the cache lives.
707    pub fn l2_path(&self) -> Option<&std::path::Path> {
708        self.config.l2_path.as_deref()
709    }
710
711    pub fn put(
712        &self,
713        namespace: impl Into<String>,
714        key: impl Into<String>,
715        input: BlobCachePut,
716    ) -> Result<(), CacheError> {
717        self.put_at(namespace, key, input, unix_now_ms())
718    }
719
720    fn put_at(
721        &self,
722        namespace: impl Into<String>,
723        key: impl Into<String>,
724        input: BlobCachePut,
725        now_ms: u64,
726    ) -> Result<(), CacheError> {
727        let namespace = namespace.into();
728        let key = BlobCacheKey::new(namespace.clone(), key);
729        self.validate_blob_size(input.bytes.len(), input.policy)?;
730        self.validate_metadata(&input.content_metadata)?;
731        self.ensure_namespace(&namespace)?;
732        let namespace_generation = self.current_generation(&namespace);
733        let tags = input.tags.clone();
734        let dependencies = input.dependencies.clone();
735
736        let shard_idx = self.shard_index(&key);
737        let mut shard = self.shards[shard_idx].write();
738        self.check_version(
739            &shard,
740            &key,
741            input.policy.version_value(),
742            namespace_generation,
743        )?;
744        let entry = Entry::new(
745            input.bytes,
746            input.content_metadata,
747            input.tags,
748            input.dependencies,
749            input.policy,
750            namespace_generation,
751            now_ms,
752            &namespace,
753            &key.key,
754        );
755        let entry_size = entry.size;
756        if let Some(l2) = &self.l2 {
757            let old_l2_size = l2.record_size(&key);
758            // Compression decision happens in the foreground put — the
759            // outcome (`Compressed::Raw` or `Compressed::Zstd`) is what
760            // gets framed and written to the chain (#192). When the knob
761            // is `Off`, skip the compressor entirely (CPU savings) and
762            // emit a `Raw` variant directly so the on-disk format stays
763            // uniform.
764            let compressed = match self.config.l2_compression {
765                L2Compression::Off => Compressed::Raw(entry.bytes.as_ref().to_vec()),
766                L2Compression::On => {
767                    let content_type = entry
768                        .content_metadata
769                        .get("content-type")
770                        .map(String::as_str);
771                    L2BlobCompressor::compress(
772                        entry.bytes.as_ref(),
773                        content_type,
774                        &CompressOpts::default(),
775                    )
776                    .map_err(|err| CacheError::L2Io(err.to_string()))?
777                }
778            };
779            match l2.put(&key, &entry, old_l2_size, compressed) {
780                Ok(()) => {}
781                Err(err @ CacheError::L2Full { .. }) => {
782                    self.stats
783                        .l2_full_rejections
784                        .fetch_add(1, Ordering::Relaxed);
785                    return Err(err);
786                }
787                Err(err) => return Err(err),
788            }
789        }
790        let outcome = if matches!(input.policy.l1_admission_value(), L1Admission::Never) {
791            let old_entry = shard.remove(&key);
792            InsertOutcome {
793                old_entry,
794                admitted: false,
795            }
796        } else {
797            shard.insert(key.clone(), entry)
798        };
799        drop(shard);
800
801        if let Some(old_entry) = outcome.old_entry.as_ref() {
802            self.deindex_entry(&key, old_entry);
803        }
804        if outcome.admitted {
805            self.index_entry(&key, &tags, &dependencies);
806        }
807
808        let old_size = outcome.old_entry.as_ref().map_or(0, |entry| entry.size);
809        let new_size = if outcome.admitted { entry_size } else { 0 };
810        if new_size >= old_size {
811            self.bytes_in_use
812                .fetch_add(new_size - old_size, Ordering::Relaxed);
813        } else {
814            self.bytes_in_use
815                .fetch_sub(old_size - new_size, Ordering::Relaxed);
816        }
817        self.stats.insertions.fetch_add(1, Ordering::Relaxed);
818        if outcome.admitted {
819            self.evict_until_within_budget(shard_idx);
820        }
821        Ok(())
822    }
823
824    pub fn get(&self, namespace: &str, key: &str) -> Option<BlobCacheHit> {
825        self.get_at(namespace, key, unix_now_ms())
826    }
827
828    fn get_at(&self, namespace: &str, key: &str, now_ms: u64) -> Option<BlobCacheHit> {
829        let cache_key = BlobCacheKey::new(namespace, key);
830        let namespace_generation = self.current_generation(namespace);
831        let shard_idx = self.shard_index(&cache_key);
832        let mut shard = self.shards[shard_idx].write();
833        match shard.get(&cache_key, now_ms, namespace_generation) {
834            Lookup::Hit(hit) => {
835                self.stats.hits.fetch_add(1, Ordering::Relaxed);
836                if hit.is_stale() {
837                    self.stats.l1_stale_serves.fetch_add(1, Ordering::Relaxed);
838                }
839                Some(hit)
840            }
841            Lookup::Expired(entry) => {
842                drop(shard);
843                self.record_removed_entry(&cache_key, &entry);
844                if let Some(l2) = &self.l2 {
845                    l2.delete_key(&cache_key);
846                }
847                self.stats.expirations.fetch_add(1, Ordering::Relaxed);
848                self.stats.misses.fetch_add(1, Ordering::Relaxed);
849                None
850            }
851            Lookup::IdleEvicted(entry) => {
852                drop(shard);
853                self.record_removed_entry(&cache_key, &entry);
854                if let Some(l2) = &self.l2 {
855                    l2.delete_key(&cache_key);
856                }
857                self.stats.expirations.fetch_add(1, Ordering::Relaxed);
858                self.stats.l1_idle_evicts.fetch_add(1, Ordering::Relaxed);
859                self.stats.misses.fetch_add(1, Ordering::Relaxed);
860                None
861            }
862            Lookup::Stale(entry) => {
863                drop(shard);
864                self.record_removed_entry(&cache_key, &entry);
865                self.stats.misses.fetch_add(1, Ordering::Relaxed);
866                None
867            }
868            Lookup::Miss => {
869                drop(shard);
870                if let Some(pool) = self.promotion_pool.get() {
871                    // Async path: do the L2 read (we owe the bytes to the
872                    // caller right now) but defer the L1 install onto the
873                    // worker pool. Caller does not pay promotion bookkeeping.
874                    if let Some(l2) = self.l2.as_ref() {
875                        if let Some(entry) = l2.get(&cache_key, now_ms, namespace_generation) {
876                            let hit = entry.hit();
877                            // Drop the freshly-fetched Entry — the worker will
878                            // re-fetch it. Cost: one extra L2 metadata read +
879                            // blob read per L2 hit while async mode is on.
880                            // Acceptable trade-off for opt-in mode; documented
881                            // in the PR.
882                            drop(entry);
883                            let request = PromotionRequest {
884                                namespace: cache_key.namespace.clone(),
885                                key: cache_key.key.clone(),
886                                bytes: Arc::clone(hit.bytes()),
887                                policy: BlobCachePolicy::default(),
888                            };
889                            let _ = pool.schedule(request);
890                            self.stats.hits.fetch_add(1, Ordering::Relaxed);
891                            return Some(hit);
892                        }
893                    }
894                    self.stats.misses.fetch_add(1, Ordering::Relaxed);
895                    return None;
896                }
897                if let Some(hit) =
898                    self.rehydrate_l2_entry(&cache_key, now_ms, namespace_generation, shard_idx)
899                {
900                    self.stats.hits.fetch_add(1, Ordering::Relaxed);
901                    return Some(hit);
902                }
903                self.stats.misses.fetch_add(1, Ordering::Relaxed);
904                None
905            }
906            Lookup::Present => unreachable!("get cannot return presence-only lookup"),
907        }
908    }
909
910    /// Probe whether `(namespace, key)` is cached.
911    ///
912    /// Returns a three-valued [`CachePresence`]:
913    ///
914    /// - `Present` when an L1-resident entry is held for the key.
915    /// - `Absent` when the cache can authoritatively rule the key out: either
916    ///   no L2 is configured, or the per-namespace Bloom synopsis
917    ///   (no-false-negatives) says the key was never inserted into L2.
918    /// - `MaybePresent` when L1 missed but the Bloom synopsis cannot rule the
919    ///   key out. Callers that need an exact answer must follow up with
920    ///   [`get`](Self::get), which performs the authoritative metadata read
921    ///   and either rehydrates a hit or surfaces a genuine miss.
922    ///
923    /// `exists` deliberately does NOT touch the L2 metadata B+ tree on a
924    /// `MaybePresent` answer — that is the whole reason the synopsis exists
925    /// (#146). The probabilistic answer is the cheap fast path; pay the
926    /// metadata-read cost only when you actually need the bytes.
927    pub fn exists(&self, namespace: &str, key: &str) -> CachePresence {
928        self.exists_at(namespace, key, unix_now_ms())
929    }
930
931    fn exists_at(&self, namespace: &str, key: &str, now_ms: u64) -> CachePresence {
932        let cache_key = BlobCacheKey::new(namespace, key);
933        let namespace_generation = self.current_generation(namespace);
934        let shard_idx = self.shard_index(&cache_key);
935        let mut shard = self.shards[shard_idx].write();
936        match shard.contains(&cache_key, now_ms, namespace_generation) {
937            Lookup::Present => {
938                self.stats.hits.fetch_add(1, Ordering::Relaxed);
939                CachePresence::Present
940            }
941            Lookup::Expired(entry) => {
942                drop(shard);
943                self.record_removed_entry(&cache_key, &entry);
944                if let Some(l2) = &self.l2 {
945                    l2.delete_key(&cache_key);
946                }
947                self.stats.expirations.fetch_add(1, Ordering::Relaxed);
948                self.stats.misses.fetch_add(1, Ordering::Relaxed);
949                CachePresence::Absent
950            }
951            Lookup::IdleEvicted(entry) => {
952                drop(shard);
953                self.record_removed_entry(&cache_key, &entry);
954                if let Some(l2) = &self.l2 {
955                    l2.delete_key(&cache_key);
956                }
957                self.stats.expirations.fetch_add(1, Ordering::Relaxed);
958                self.stats.l1_idle_evicts.fetch_add(1, Ordering::Relaxed);
959                self.stats.misses.fetch_add(1, Ordering::Relaxed);
960                CachePresence::Absent
961            }
962            Lookup::Stale(entry) => {
963                drop(shard);
964                self.record_removed_entry(&cache_key, &entry);
965                self.stats.misses.fetch_add(1, Ordering::Relaxed);
966                CachePresence::Absent
967            }
968            Lookup::Miss => {
969                drop(shard);
970                let Some(l2) = self.l2.as_ref() else {
971                    self.stats.misses.fetch_add(1, Ordering::Relaxed);
972                    return CachePresence::Absent;
973                };
974                if l2.synopsis_may_contain(namespace, key) {
975                    // Filter says maybe — the cheap fast path defers the
976                    // authoritative read to `get`. Count as a hit prospect.
977                    self.stats.hits.fetch_add(1, Ordering::Relaxed);
978                    CachePresence::MaybePresent
979                } else {
980                    // Filter says no — definitively absent (no
981                    // false-negatives).
982                    self.stats.misses.fetch_add(1, Ordering::Relaxed);
983                    CachePresence::Absent
984                }
985            }
986            Lookup::Hit(_) => unreachable!("exists cannot return a hit payload"),
987        }
988    }
989
990    /// Node-local invalidation for one exact cache key.
991    ///
992    /// This does not propagate to replicas. Cluster-wide invalidation is a
993    /// future contract; callers that need cross-node coherence must rely on the
994    /// underlying write reaching each node and triggering local eviction there.
995    pub fn invalidate_key(&self, namespace: &str, key: &str) -> usize {
996        if !self.namespace_exists(namespace) {
997            return 0;
998        }
999        let cache_key = BlobCacheKey::new(namespace, key);
1000        let shard_idx = self.shard_index(&cache_key);
1001        let mut shard = self.shards[shard_idx].write();
1002        let removed = shard.remove(&cache_key);
1003        drop(shard);
1004
1005        if let Some(entry) = removed {
1006            self.record_invalidated_entry(&cache_key, &entry);
1007            1
1008        } else {
1009            self.l2
1010                .as_ref()
1011                .and_then(|l2| l2.delete_key(&cache_key))
1012                .map(|_| {
1013                    self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
1014                    1
1015                })
1016                .unwrap_or(0)
1017        }
1018    }
1019
1020    /// Node-local invalidation for keys with a namespace-local prefix.
1021    pub fn invalidate_prefix(&self, namespace: &str, prefix: &str) -> usize {
1022        if !self.namespace_exists(namespace) {
1023            return 0;
1024        }
1025
1026        let mut removed = Vec::new();
1027        for shard in &self.shards {
1028            let mut shard = shard.write();
1029            let keys = shard
1030                .keys_matching(|key| key.namespace == namespace && key.key.starts_with(prefix));
1031            for key in keys {
1032                if let Some(entry) = shard.remove(&key) {
1033                    removed.push((key, entry));
1034                }
1035            }
1036        }
1037
1038        let count = removed.len();
1039        for (key, entry) in removed {
1040            self.record_invalidated_entry(&key, &entry);
1041        }
1042        let l2_count = self
1043            .l2
1044            .as_ref()
1045            .map_or(0, |l2| l2.delete_prefix(namespace, prefix));
1046        if l2_count > count {
1047            self.stats
1048                .invalidations
1049                .fetch_add((l2_count - count) as u64, Ordering::Relaxed);
1050        }
1051        count.max(l2_count)
1052    }
1053
1054    /// Node-local batched invalidation for all entries carrying any of `tags`.
1055    ///
1056    /// Locks each affected shard once per call, so a batched invalidation
1057    /// from a downstream adapter (#143) does not multiply lock acquisitions
1058    /// the way N singular calls would.
1059    pub fn invalidate_tags(&self, namespace: &str, tags: &[&str]) -> usize {
1060        self.invalidate_indexed_many(namespace, tags, IndexedKind::Tag)
1061    }
1062
1063    /// Node-local batched invalidation for all entries carrying any of `dependencies`.
1064    pub fn invalidate_dependencies(&self, namespace: &str, dependencies: &[&str]) -> usize {
1065        self.invalidate_indexed_many(namespace, dependencies, IndexedKind::Dependency)
1066    }
1067
1068    /// Node-local invalidation for all entries carrying `tag`.
1069    #[deprecated(
1070        since = "0.1.0",
1071        note = "use `invalidate_tags(namespace, &[tag])` for batched callers"
1072    )]
1073    pub fn invalidate_tag(&self, namespace: &str, tag: &str) -> usize {
1074        self.invalidate_indexed_many(namespace, &[tag], IndexedKind::Tag)
1075    }
1076
1077    /// Node-local invalidation for all entries carrying `dependency`.
1078    #[deprecated(
1079        since = "0.1.0",
1080        note = "use `invalidate_dependencies(namespace, &[dependency])` for batched callers"
1081    )]
1082    pub fn invalidate_dependency(&self, namespace: &str, dependency: &str) -> usize {
1083        self.invalidate_indexed_many(namespace, &[dependency], IndexedKind::Dependency)
1084    }
1085
1086    /// O(1) foreground namespace flush.
1087    ///
1088    /// The foreground path only bumps a namespace generation. Old entries become
1089    /// invisible immediately and are physically removed by later cache access or
1090    /// a future sweeper.
1091    pub fn invalidate_namespace(&self, namespace: &str) -> bool {
1092        if !self.namespace_exists(namespace) {
1093            return false;
1094        }
1095        let mut generations = self.namespace_generations.write();
1096        let generation = generations.entry(namespace.to_string()).or_insert(0);
1097        *generation = generation.saturating_add(1);
1098        if let Some(l2) = &self.l2 {
1099            l2.delete_namespace(namespace);
1100        }
1101        self.stats.namespace_flushes.fetch_add(1, Ordering::Relaxed);
1102        true
1103    }
1104
1105    pub fn stats(&self) -> BlobCacheStats {
1106        BlobCacheStats {
1107            hits: self.stats.hits.load(Ordering::Relaxed),
1108            misses: self.stats.misses.load(Ordering::Relaxed),
1109            insertions: self.stats.insertions.load(Ordering::Relaxed),
1110            evictions: self.stats.evictions.load(Ordering::Relaxed),
1111            expirations: self.stats.expirations.load(Ordering::Relaxed),
1112            invalidations: self.stats.invalidations.load(Ordering::Relaxed),
1113            namespace_flushes: self.stats.namespace_flushes.load(Ordering::Relaxed),
1114            version_mismatches: self.stats.version_mismatches.load(Ordering::Relaxed),
1115            entries: self.shards.iter().map(|shard| shard.read().len()).sum(),
1116            bytes_in_use: self.bytes_in_use.load(Ordering::Relaxed),
1117            l1_bytes_max: self.config.l1_bytes_max,
1118            l2_bytes_in_use: self.l2.as_ref().map_or(0, |l2| l2.stats_bytes_in_use()),
1119            l2_bytes_max: self.config.l2_bytes_max,
1120            l2_full_rejections: self.stats.l2_full_rejections.load(Ordering::Relaxed),
1121            l2_metadata_reads: self.l2.as_ref().map_or(0, |l2| l2.stats_metadata_reads()),
1122            l2_negative_skips: self.l2.as_ref().map_or(0, |l2| l2.stats_negative_skips()),
1123            synopsis_metadata_reads: self
1124                .l2
1125                .as_ref()
1126                .map_or(0, |l2| l2.stats_synopsis_metadata_reads()),
1127            synopsis_bytes: self.l2.as_ref().map_or(0, |l2| l2.stats_synopsis_bytes()),
1128            namespaces: self.namespaces.read().len(),
1129            max_namespaces: self.config.max_namespaces,
1130            promotion_queued: self
1131                .promotion_pool
1132                .get()
1133                .map_or(0, |p| p.metrics().queued_total),
1134            promotion_dropped: self
1135                .promotion_pool
1136                .get()
1137                .map_or(0, |p| p.metrics().dropped_total),
1138            promotion_completed: self
1139                .promotion_pool
1140                .get()
1141                .map_or(0, |p| p.metrics().completed_total),
1142            promotion_queue_depth: self
1143                .promotion_pool
1144                .get()
1145                .map_or(0, |p| p.metrics().queue_depth),
1146            l2_compression_original_bytes: self
1147                .l2
1148                .as_ref()
1149                .map_or(0, |l2| l2.stats_compression_original_bytes()),
1150            l2_compression_stored_bytes: self
1151                .l2
1152                .as_ref()
1153                .map_or(0, |l2| l2.stats_compression_stored_bytes()),
1154            l2_compression_skipped_total: self
1155                .l2
1156                .as_ref()
1157                .map_or(0, |l2| l2.stats_compression_skipped_total()),
1158            l2_bytes_saved_total: self
1159                .l2
1160                .as_ref()
1161                .map_or(0, |l2| l2.stats_bytes_saved_total()),
1162            l1_stale_serves_total: self.stats.l1_stale_serves.load(Ordering::Relaxed),
1163            l1_idle_evicts_total: self.stats.l1_idle_evicts.load(Ordering::Relaxed),
1164        }
1165    }
1166
1167    // -- Async promotion (issue #193) ---------------------------------------
1168
1169    /// Initialize the async L2->L1 promotion pool. Must be called on an
1170    /// `Arc<Self>` so the executor closure can hold a `Weak<Self>` (no
1171    /// reference cycle).
1172    ///
1173    /// Idempotent on first call only — `OnceLock` semantics: a second call
1174    /// returns the previously-installed pool unchanged. The returned `Arc`
1175    /// can be used by callers that want to inspect metrics directly; most
1176    /// callers should ignore it and read metrics via `stats()`.
1177    pub fn enable_async_promotion(self: &Arc<Self>, opts: PoolOpts) -> Arc<AsyncPromotionPool> {
1178        let weak: Weak<Self> = Arc::downgrade(self);
1179        let executor: PromotionExecutor = Arc::new(move |req| {
1180            // Upgrade only at execution time. If the cache has been
1181            // dropped, the worker silently no-ops (executor never holds
1182            // a strong ref between calls).
1183            let Some(cache) = weak.upgrade() else {
1184                return Ok(());
1185            };
1186            cache.promote_from_l2(&req)
1187        });
1188        let pool = AsyncPromotionPool::new_with_executor(opts, executor);
1189        match self.promotion_pool.set(Arc::clone(&pool)) {
1190            Ok(()) => pool,
1191            // Race: another caller already initialized. Drain ours and
1192            // return the winner. The losing pool's workers are spawned;
1193            // shutdown drains them out gracefully.
1194            Err(losing_pool) => {
1195                losing_pool.shutdown();
1196                Arc::clone(
1197                    self.promotion_pool
1198                        .get()
1199                        .expect("OnceLock set+get inconsistency"),
1200                )
1201            }
1202        }
1203    }
1204
1205    /// Drain and stop the async promotion pool, if enabled. Safe to call
1206    /// from `Drop` impls / test teardown — no-op when the pool was never
1207    /// initialized.
1208    pub fn shutdown_async_promotion(&self) {
1209        if let Some(pool) = self.promotion_pool.get() {
1210            Arc::clone(pool).shutdown();
1211        }
1212    }
1213
1214    /// Test-only escape hatch: schedule outcome of the most recent attempt
1215    /// is internal; tests assert on `stats()` counters instead.
1216    #[cfg(test)]
1217    fn promotion_pool_handle(&self) -> Option<Arc<AsyncPromotionPool>> {
1218        self.promotion_pool.get().cloned()
1219    }
1220
1221    /// Test-only: install a custom executor (e.g. one that sleeps to
1222    /// expose the hot-path / worker-path latency split). Used by the
1223    /// async-promotion wiring tests in this file.
1224    #[cfg(test)]
1225    fn enable_async_promotion_with_executor(
1226        self: &Arc<Self>,
1227        opts: PoolOpts,
1228        executor: PromotionExecutor,
1229    ) -> Arc<AsyncPromotionPool> {
1230        let pool = AsyncPromotionPool::new_with_executor(opts, executor);
1231        let _ = self.promotion_pool.set(Arc::clone(&pool));
1232        pool
1233    }
1234
1235    pub fn config(&self) -> &BlobCacheConfig {
1236        &self.config
1237    }
1238
1239    #[cfg(test)]
1240    fn inject_l2_fault_after_blob_write_once(&self) {
1241        self.l2
1242            .as_ref()
1243            .expect("L2 enabled")
1244            .inject_fault_after_blob_write_once();
1245    }
1246
1247    #[cfg(test)]
1248    fn inject_l2_synopsis_maybe_present(&self, namespace: &str, key: &str) {
1249        self.l2
1250            .as_ref()
1251            .expect("L2 enabled")
1252            .inject_synopsis_maybe_present(namespace, key);
1253    }
1254
1255    /// Test-only escape hatch (#192 lane 2/5): synthesise a legacy
1256    /// `V1Raw` L2 entry on disk so the forward-compat read test can
1257    /// verify pre-compression entries still rehydrate.
1258    #[cfg(test)]
1259    fn inject_l2_v1_entry(
1260        &self,
1261        namespace: &str,
1262        key: &str,
1263        payload: &[u8],
1264    ) -> Result<(), CacheError> {
1265        let l2 = self.l2.as_ref().expect("L2 enabled");
1266        let cache_key = BlobCacheKey::new(namespace, key);
1267        l2.inject_v1_entry(&cache_key, payload)
1268    }
1269
1270    fn validate_blob_size(&self, size: usize, policy: BlobCachePolicy) -> Result<(), CacheError> {
1271        let max = policy
1272            .max_blob_bytes_value()
1273            .unwrap_or(self.config.l1_bytes_max);
1274        if size > max {
1275            Err(CacheError::BlobTooLarge { size, max })
1276        } else {
1277            Ok(())
1278        }
1279    }
1280
1281    fn validate_metadata(&self, metadata: &BTreeMap<String, String>) -> Result<(), CacheError> {
1282        let keys = metadata.len();
1283        let bytes = metadata
1284            .iter()
1285            .map(|(key, value)| key.len() + value.len())
1286            .sum::<usize>();
1287        if keys > self.config.content_metadata_keys_max
1288            || bytes > self.config.content_metadata_bytes_max
1289        {
1290            Err(CacheError::MetadataTooLarge {
1291                keys,
1292                bytes,
1293                max_keys: self.config.content_metadata_keys_max,
1294                max_bytes: self.config.content_metadata_bytes_max,
1295            })
1296        } else {
1297            Ok(())
1298        }
1299    }
1300
1301    fn rehydrate_l2_entry(
1302        &self,
1303        key: &BlobCacheKey,
1304        now_ms: u64,
1305        namespace_generation: u64,
1306        shard_idx: usize,
1307    ) -> Option<BlobCacheHit> {
1308        let l2 = self.l2.as_ref()?;
1309        let entry = l2.get(key, now_ms, namespace_generation)?;
1310        let hit = entry.hit();
1311        self.do_l1_promotion_sync(key, entry, shard_idx);
1312        Some(hit)
1313    }
1314
1315    /// Pure L1 install bookkeeping: shard write-lock, byte accounting,
1316    /// eviction loop. Extracted so the async promotion pool can call it
1317    /// from a worker (issue #193, lane 1/5).
1318    ///
1319    /// This is intentionally side-effect-only — it does not touch hit/miss
1320    /// stats (the caller already counted the hit) and does not return the
1321    /// `BlobCacheHit` (the caller already handed bytes to the user).
1322    fn do_l1_promotion_sync(&self, key: &BlobCacheKey, entry: Entry, shard_idx: usize) {
1323        let entry_size = entry.size;
1324        let mut shard = self.shards[shard_idx].write();
1325        let outcome = shard.insert(key.clone(), entry);
1326        drop(shard);
1327        let old_size = outcome.old_entry.as_ref().map_or(0, |entry| entry.size);
1328        if entry_size >= old_size {
1329            self.bytes_in_use
1330                .fetch_add(entry_size - old_size, Ordering::Relaxed);
1331        } else {
1332            self.bytes_in_use
1333                .fetch_sub(old_size - entry_size, Ordering::Relaxed);
1334        }
1335        self.evict_until_within_budget(shard_idx);
1336    }
1337
1338    /// Worker-side promotion path: re-fetch the entry from L2 and run the
1339    /// L1 install bookkeeping. Idempotent — re-promoting a key that the
1340    /// hot path already promoted (race with another reader) is harmless.
1341    /// Returns `Err` only when L2 is unavailable or the key is no longer
1342    /// present at L2 (silently treated as a no-op upstream).
1343    fn promote_from_l2(&self, req: &PromotionRequest) -> Result<(), String> {
1344        let l2 = self
1345            .l2
1346            .as_ref()
1347            .ok_or_else(|| "promotion executor invoked without L2 configured".to_string())?;
1348        let cache_key = BlobCacheKey::new(req.namespace.as_str(), req.key.as_str());
1349        let now_ms = unix_now_ms();
1350        let namespace_generation = self.current_generation(req.namespace.as_str());
1351        if let Some(entry) = l2.get(&cache_key, now_ms, namespace_generation) {
1352            let shard_idx = self.shard_index(&cache_key);
1353            self.do_l1_promotion_sync(&cache_key, entry, shard_idx);
1354        }
1355        Ok(())
1356    }
1357
1358    fn ensure_namespace(&self, namespace: &str) -> Result<(), CacheError> {
1359        {
1360            let namespaces = self.namespaces.read();
1361            if namespaces.contains(namespace) {
1362                return Ok(());
1363            }
1364        }
1365        let mut namespaces = self.namespaces.write();
1366        if namespaces.contains(namespace) {
1367            return Ok(());
1368        }
1369        if namespaces.len() >= self.config.max_namespaces {
1370            return Err(CacheError::TooManyNamespaces {
1371                max: self.config.max_namespaces,
1372            });
1373        }
1374        namespaces.insert(namespace.to_string());
1375        self.namespace_generations
1376            .write()
1377            .entry(namespace.to_string())
1378            .or_insert(0);
1379        Ok(())
1380    }
1381
1382    fn namespace_exists(&self, namespace: &str) -> bool {
1383        self.namespaces.read().contains(namespace)
1384            || self
1385                .l2
1386                .as_ref()
1387                .is_some_and(|l2| l2.has_namespace(namespace))
1388    }
1389
1390    fn current_generation(&self, namespace: &str) -> u64 {
1391        self.namespace_generations
1392            .read()
1393            .get(namespace)
1394            .copied()
1395            .unwrap_or(0)
1396    }
1397
1398    fn index_entry(
1399        &self,
1400        key: &BlobCacheKey,
1401        tags: &BTreeSet<String>,
1402        dependencies: &BTreeSet<String>,
1403    ) {
1404        if !tags.is_empty() {
1405            let mut index = self.tag_index.write();
1406            for tag in tags {
1407                index
1408                    .entry(ScopedLabel::new(key.namespace.as_str(), tag.as_str()))
1409                    .or_default()
1410                    .insert(key.clone());
1411            }
1412        }
1413        if !dependencies.is_empty() {
1414            let mut index = self.dependency_index.write();
1415            for dependency in dependencies {
1416                index
1417                    .entry(ScopedLabel::new(
1418                        key.namespace.as_str(),
1419                        dependency.as_str(),
1420                    ))
1421                    .or_default()
1422                    .insert(key.clone());
1423            }
1424        }
1425    }
1426
1427    fn deindex_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1428        Self::remove_indexed_labels(&self.tag_index, key, &entry.tags);
1429        Self::remove_indexed_labels(&self.dependency_index, key, &entry.dependencies);
1430    }
1431
1432    fn remove_indexed_labels(
1433        index: &RwLock<HashMap<ScopedLabel, HashSet<BlobCacheKey>>>,
1434        key: &BlobCacheKey,
1435        labels: &BTreeSet<String>,
1436    ) {
1437        if labels.is_empty() {
1438            return;
1439        }
1440        let mut index = index.write();
1441        for label in labels {
1442            let scoped = ScopedLabel::new(key.namespace.as_str(), label.as_str());
1443            let should_remove = if let Some(keys) = index.get_mut(&scoped) {
1444                keys.remove(key);
1445                keys.is_empty()
1446            } else {
1447                false
1448            };
1449            if should_remove {
1450                index.remove(&scoped);
1451            }
1452        }
1453    }
1454
1455    fn record_removed_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1456        self.bytes_in_use.fetch_sub(entry.size, Ordering::Relaxed);
1457        self.deindex_entry(key, entry);
1458    }
1459
1460    fn record_invalidated_entry(&self, key: &BlobCacheKey, entry: &Entry) {
1461        self.record_removed_entry(key, entry);
1462        if let Some(l2) = &self.l2 {
1463            l2.delete_key(key);
1464        }
1465        self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
1466    }
1467
1468    fn invalidate_indexed_many(
1469        &self,
1470        namespace: &str,
1471        labels: &[&str],
1472        kind: IndexedKind,
1473    ) -> usize {
1474        if labels.is_empty() || !self.namespace_exists(namespace) {
1475            return 0;
1476        }
1477
1478        // Snapshot the candidate keys for every label up front so the
1479        // shard-locking pass below sees a stable set. We deduplicate by
1480        // BlobCacheKey so a key tagged with multiple invalidated labels is
1481        // still removed (and counted) exactly once.
1482        let mut candidates: HashMap<BlobCacheKey, HashSet<String>> = HashMap::new();
1483        {
1484            let index = match kind {
1485                IndexedKind::Tag => self.tag_index.read(),
1486                IndexedKind::Dependency => self.dependency_index.read(),
1487            };
1488            for label in labels {
1489                let scoped = ScopedLabel::new(namespace, *label);
1490                if let Some(keys) = index.get(&scoped) {
1491                    for key in keys {
1492                        candidates
1493                            .entry(key.clone())
1494                            .or_default()
1495                            .insert((*label).to_string());
1496                    }
1497                }
1498            }
1499        }
1500
1501        if candidates.is_empty() {
1502            return 0;
1503        }
1504
1505        // Group candidates by shard so each shard lock is taken at most
1506        // once per call.
1507        let mut by_shard: HashMap<usize, Vec<(BlobCacheKey, HashSet<String>)>> = HashMap::new();
1508        for (key, matched_labels) in candidates {
1509            let shard_idx = self.shard_index(&key);
1510            by_shard
1511                .entry(shard_idx)
1512                .or_default()
1513                .push((key, matched_labels));
1514        }
1515
1516        let mut removed = Vec::new();
1517        for (shard_idx, keys) in by_shard {
1518            let mut shard = self.shards[shard_idx].write();
1519            for (key, matched_labels) in keys {
1520                let still_matches = match kind {
1521                    IndexedKind::Tag => shard.entry_has_any_tag(&key, &matched_labels),
1522                    IndexedKind::Dependency => {
1523                        shard.entry_has_any_dependency(&key, &matched_labels)
1524                    }
1525                };
1526                if still_matches {
1527                    if let Some(entry) = shard.remove(&key) {
1528                        removed.push((key, entry));
1529                    }
1530                }
1531            }
1532        }
1533
1534        let count = removed.len();
1535        for (key, entry) in removed {
1536            self.record_invalidated_entry(&key, &entry);
1537        }
1538        count
1539    }
1540
1541    fn shard_index(&self, key: &BlobCacheKey) -> usize {
1542        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1543        key.hash(&mut hasher);
1544        (hasher.finish() as usize) % self.shards.len()
1545    }
1546
1547    fn check_version(
1548        &self,
1549        shard: &Shard,
1550        key: &BlobCacheKey,
1551        attempted: Option<u64>,
1552        namespace_generation: u64,
1553    ) -> Result<(), CacheError> {
1554        let Some(attempted) = attempted else {
1555            return Ok(());
1556        };
1557        let Some(existing) = shard.existing_version(key, namespace_generation) else {
1558            return Ok(());
1559        };
1560        if existing >= attempted {
1561            self.stats
1562                .version_mismatches
1563                .fetch_add(1, Ordering::Relaxed);
1564            Err(CacheError::VersionMismatch {
1565                existing,
1566                attempted,
1567            })
1568        } else {
1569            Ok(())
1570        }
1571    }
1572
1573    fn evict_until_within_budget(&self, preferred_start: usize) {
1574        while self.bytes_in_use.load(Ordering::Relaxed) > self.config.l1_bytes_max {
1575            let mut evicted = false;
1576            for offset in 0..self.shards.len() {
1577                let idx = (preferred_start + offset) % self.shards.len();
1578                let mut shard = self.shards[idx].write();
1579                if let Some((key, entry)) = shard.evict_one() {
1580                    self.bytes_in_use.fetch_sub(entry.size, Ordering::Relaxed);
1581                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
1582                    evicted = true;
1583                    drop(shard);
1584                    self.deindex_entry(&key, &entry);
1585                    break;
1586                }
1587            }
1588            if !evicted {
1589                break;
1590            }
1591        }
1592    }
1593}
1594
1595fn unix_now_ms() -> u64 {
1596    SystemTime::now()
1597        .duration_since(UNIX_EPOCH)
1598        .map(|duration| duration.as_millis() as u64)
1599        .unwrap_or(0)
1600}
1601
1602impl Default for BlobCache {
1603    fn default() -> Self {
1604        Self::with_defaults()
1605    }
1606}
1607
1608#[cfg(test)]
1609mod tests;