Skip to main content

oximedia_cache/
tiered_cache.rs

1//! Multi-tier cache (L1 memory → L2 memory → disk).
2//!
3//! Each tier has an independent [`EvictionPolicy`] and a byte-level capacity.
4//! On a miss the implementation searches lower tiers in order; on a hit in a
5//! lower tier the entry is promoted to L1.
6//!
7//! ## New in 0.1.2
8//!
9//! * **File-backed disk tier** — `TierConfig::disk_path` enables a real
10//!   file-backed tier backed by a directory on disk.  Each cache key maps to
11//!   a file inside that directory; reads and writes use `std::fs`.
12//! * **Adaptive promotion thresholds** — each tier now tracks access
13//!   frequency per key.  A hit in tier *i* only promotes to tier *i-1* when
14//!   the key's frequency exceeds the tier's `promotion_threshold`.  This
15//!   prevents scan pollution from one-shot accesses filling the hot tier.
16//! * **Entry compression** — tiers with `compress: true` store LZ4-style
17//!   run-length encoding (pure Rust, no external deps) so that L2+ tiers
18//!   occupy less memory / disk space.
19//!
20//! ## New in 0.1.8 Wave 13
21//!
22//! * **P² adaptive promotion** — the promotion threshold can be auto-tuned
23//!   using a P² quantile estimator (Jain & Chlamtac 1985) that tracks the
24//!   75th-percentile of per-key access frequencies.  Enable via
25//!   `TieredCacheBuilder::enable_adaptive_promotion(true)`.
26//! * **Arena allocation** — when `use_arena` is enabled, tier entry bytes are
27//!   stored in a `BumpArena` (bump allocator) so the `HashMap` holds cheap
28//!   `(offset, len)` handles instead of owned `Vec<u8>`.
29
30use std::collections::{HashMap, VecDeque};
31use std::path::PathBuf;
32
33// ── P² Quantile Estimator (Jain & Chlamtac 1985) ─────────────────────────────
34
35/// Online running estimator for an arbitrary quantile using the P² algorithm.
36///
37/// Tracks 5 marker positions to estimate the `p`-quantile without storing all
38/// observations.  Only valid after `n ≥ 5` samples have been fed (warmup guard).
39#[derive(Debug, Clone)]
40pub struct P2QuantileEstimator {
41    /// Target quantile (0 < p < 1).  Default 0.75 for 75th-percentile.
42    p: f64,
43    /// Total number of observations fed so far.
44    n: u64,
45    /// Marker heights: estimated quantile values at 5 marker positions.
46    q: [f64; 5],
47    /// Desired marker positions (real-valued).
48    dn: [f64; 5],
49    /// Actual marker positions (integer counts).
50    np: [f64; 5],
51}
52
53impl P2QuantileEstimator {
54    /// Create a new estimator for quantile `p` (0 < p < 1).
55    pub fn new(p: f64) -> Self {
56        let p = p.clamp(1e-6, 1.0 - 1e-6);
57        Self {
58            p,
59            n: 0,
60            q: [0.0; 5],
61            dn: [0.0, p / 2.0, p, (1.0 + p) / 2.0, 1.0],
62            np: [1.0, 1.0 + 2.0 * p, 1.0 + 4.0 * p, 3.0 + 2.0 * p, 5.0],
63        }
64    }
65
66    /// Feed a new observation.
67    pub fn update(&mut self, x: f64) {
68        if self.n < 5 {
69            // Collect the first 5 values into q[].
70            self.q[self.n as usize] = x;
71            self.n += 1;
72            if self.n == 5 {
73                // Sort the initial 5 samples to initialise the markers.
74                self.q
75                    .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
76            }
77            return;
78        }
79        self.n += 1;
80
81        // Step 1: find cell k.
82        let k = if x < self.q[0] {
83            self.q[0] = x;
84            0usize
85        } else if x < self.q[1] {
86            0
87        } else if x < self.q[2] {
88            1
89        } else if x < self.q[3] {
90            2
91        } else if x <= self.q[4] {
92            3
93        } else {
94            self.q[4] = x;
95            3
96        };
97
98        // Step 2: increment positions.
99        for i in (k + 1)..5 {
100            self.np[i] += 1.0;
101        }
102
103        // Step 3: update desired positions.
104        let n_f = self.n as f64;
105        self.dn[0] = 0.0;
106        self.dn[1] = (n_f - 1.0) * self.p / 2.0 + 1.0;
107        self.dn[2] = (n_f - 1.0) * self.p + 1.0;
108        self.dn[3] = (n_f - 1.0) * (1.0 + self.p) / 2.0 + 1.0;
109        self.dn[4] = n_f as f64;
110
111        // Step 4: adjust markers 1–3 (0-indexed).
112        for i in 1..4 {
113            let d = self.dn[i] - self.np[i];
114            let sign_d: f64 = if d >= 0.0 { 1.0 } else { -1.0 };
115            if (d >= 1.0 && self.np[i + 1] - self.np[i] > 1.0)
116                || (d <= -1.0 && self.np[i - 1] - self.np[i] < -1.0)
117            {
118                // Try parabolic interpolation.
119                let qi_new = self.parabolic(i, sign_d);
120                if qi_new > self.q[i - 1] && qi_new < self.q[i + 1] {
121                    self.q[i] = qi_new;
122                } else {
123                    // Linear fallback.
124                    let idx = if d >= 0.0 { i + 1 } else { i.saturating_sub(1) };
125                    let dq = self.q[idx] - self.q[i];
126                    let dn = self.np[idx] - self.np[i];
127                    self.q[i] += sign_d * dq / dn;
128                }
129                self.np[i] += sign_d;
130            }
131        }
132    }
133
134    fn parabolic(&self, i: usize, sign: f64) -> f64 {
135        let qi = self.q[i];
136        let qi_prev = self.q[i - 1];
137        let qi_next = self.q[i + 1];
138        let ni = self.np[i];
139        let ni_prev = self.np[i - 1];
140        let ni_next = self.np[i + 1];
141        let term1 = sign / (ni_next - ni_prev);
142        let left = (ni - ni_prev + sign) * (qi_next - qi) / (ni_next - ni);
143        let right = (ni_next - ni - sign) * (qi - qi_prev) / (ni - ni_prev);
144        qi + term1 * (left + right)
145    }
146
147    /// Return the current quantile estimate.
148    ///
149    /// Returns `None` when fewer than 5 observations have been fed (warmup guard).
150    pub fn estimate(&self) -> Option<f64> {
151        if self.n < 5 {
152            None
153        } else {
154            Some(self.q[2])
155        }
156    }
157
158    /// Total number of observations seen.
159    pub fn count(&self) -> u64 {
160        self.n
161    }
162}
163
164// ── BumpArena ─────────────────────────────────────────────────────────────────
165
166/// A simple bump allocator for byte-slice entries.
167///
168/// Allocations are cheap (pointer-bump only).  Deallocation is not
169/// supported per-entry; call [`reset`] to reclaim the whole arena at once
170/// (typically called after a batch eviction sweep).
171///
172/// [`reset`]: BumpArena::reset
173#[derive(Debug, Clone)]
174pub struct BumpArena {
175    data: Vec<u8>,
176    pos: usize,
177}
178
179impl BumpArena {
180    /// Create a new `BumpArena` with `initial_capacity` bytes pre-allocated.
181    pub fn new(initial_capacity: usize) -> Self {
182        Self {
183            data: Vec::with_capacity(initial_capacity),
184            pos: 0,
185        }
186    }
187
188    /// Append `bytes` to the arena and return `(offset, len)`.
189    ///
190    /// Grows the backing vec if necessary.
191    pub fn alloc(&mut self, bytes: &[u8]) -> (usize, usize) {
192        let offset = self.pos;
193        let len = bytes.len();
194        // Extend backing storage if needed.
195        if self.pos + len > self.data.len() {
196            self.data.resize(self.pos + len, 0u8);
197        }
198        self.data[self.pos..self.pos + len].copy_from_slice(bytes);
199        self.pos += len;
200        (offset, len)
201    }
202
203    /// Retrieve a byte slice stored at `(offset, len)`.
204    pub fn get(&self, offset: usize, len: usize) -> &[u8] {
205        &self.data[offset..offset + len]
206    }
207
208    /// Reset the arena, reclaiming all memory for future allocations.
209    ///
210    /// Existing `(offset, len)` handles become invalid after this call.
211    pub fn reset(&mut self) {
212        self.pos = 0;
213        // Do not deallocate the backing vec; just reset the cursor.
214    }
215
216    /// Current number of bytes used.
217    pub fn used(&self) -> usize {
218        self.pos
219    }
220}
221
222// ── Public configuration types ───────────────────────────────────────────────
223
224/// Eviction strategy for a single cache tier.
225#[derive(Debug, Clone, PartialEq, Eq)]
226pub enum EvictionPolicy {
227    /// Evict the entry with the oldest `last_access` timestamp.
228    Lru,
229    /// Evict the entry with the lowest access frequency; tie-break on
230    /// `last_access`.
231    Lfu,
232    /// Evict the entry that was inserted first (queue order).
233    Fifo,
234    /// Evict a random entry using a deterministic xorshift32 PRNG.
235    Random,
236    /// Approximate LFU with a tiny Count-Min admission filter.
237    TinyLfu,
238}
239
240/// Configuration for one tier.
241#[derive(Debug, Clone)]
242pub struct TierConfig {
243    /// Human-readable name (e.g. `"L1"`, `"L2"`, `"disk"`).
244    pub name: String,
245    /// Maximum number of bytes this tier may hold.
246    pub capacity_bytes: usize,
247    /// Simulated read latency in microseconds (used in future work / profiling).
248    pub access_latency_us: u64,
249    /// How entries are selected for eviction when the tier is full.
250    pub eviction_policy: EvictionPolicy,
251    /// Optional path to a directory on disk for file-backed storage.
252    ///
253    /// When `Some(path)`, entries evicted from this tier are stored as
254    /// individual files under `path/<key_hash>`.  Memory entries act as
255    /// an in-memory index; disk is the actual backing store.
256    pub disk_path: Option<PathBuf>,
257    /// Minimum access frequency before a hit in this tier promotes the entry
258    /// to the previous (hotter) tier.
259    ///
260    /// A value of `0` means "always promote" (original behaviour).
261    /// A value of `3` means the key must have been accessed at least 3 times
262    /// in this tier before it is considered hot enough to move up.
263    pub promotion_threshold: u64,
264    /// Whether to compress entry bytes in this tier.
265    ///
266    /// When `true`, values are compressed with a simple run-length encoder
267    /// before being stored and decoded on retrieval.  Useful for L2+ tiers to
268    /// reduce memory / disk footprint.
269    pub compress: bool,
270    /// Whether to use a P² quantile estimator to auto-tune the promotion
271    /// threshold.  When `true`, the static `promotion_threshold` acts as a
272    /// fallback during warmup (first 5 observations); afterwards the 75th
273    /// percentile of observed per-key access frequencies is used.
274    pub adaptive_promotion: bool,
275    /// When `true`, tier entry bytes are stored in a `BumpArena` and the
276    /// `HashMap` keeps lightweight `(offset, len)` handles instead of owned
277    /// `Vec<u8>`.  Falls back to owned storage when `false` (default).
278    pub use_arena: bool,
279}
280
281impl TierConfig {
282    /// Create a minimal in-memory tier config with default values.
283    pub fn memory(name: impl Into<String>, capacity_bytes: usize) -> Self {
284        Self {
285            name: name.into(),
286            capacity_bytes,
287            access_latency_us: 1,
288            eviction_policy: EvictionPolicy::Lru,
289            disk_path: None,
290            promotion_threshold: 0,
291            compress: false,
292            adaptive_promotion: false,
293            use_arena: false,
294        }
295    }
296
297    /// Create a disk-backed tier config.
298    pub fn disk(name: impl Into<String>, capacity_bytes: usize, path: impl Into<PathBuf>) -> Self {
299        Self {
300            name: name.into(),
301            capacity_bytes,
302            access_latency_us: 1_000,
303            eviction_policy: EvictionPolicy::Lru,
304            disk_path: Some(path.into()),
305            promotion_threshold: 1,
306            compress: true,
307            adaptive_promotion: false,
308            use_arena: false,
309        }
310    }
311
312    /// Enable or disable P²-adaptive promotion threshold tuning.
313    pub fn enable_adaptive_promotion(mut self, enabled: bool) -> Self {
314        self.adaptive_promotion = enabled;
315        self
316    }
317
318    /// Enable or disable arena allocation for tier entries.
319    pub fn enable_arena(mut self, enabled: bool) -> Self {
320        self.use_arena = enabled;
321        self
322    }
323}
324
325// ── Stats ────────────────────────────────────────────────────────────────────
326
327/// Per-tier statistics snapshot.
328#[derive(Debug, Clone)]
329pub struct TierStats {
330    /// Human-readable tier name.
331    pub name: String,
332    /// Number of cache hits served by this tier.
333    pub hits: u64,
334    /// Number of bytes currently stored in this tier.
335    pub size_used_bytes: usize,
336    /// Number of distinct entries currently in this tier.
337    pub entry_count: usize,
338    /// Number of promotions from this tier to the tier above.
339    pub promotions: u64,
340    /// Number of times an entry was compressed before storage.
341    pub compressions: u64,
342}
343
344/// Aggregate statistics snapshot for the whole [`TieredCache`].
345#[derive(Debug, Clone)]
346pub struct TieredCacheStats {
347    /// Total successful lookups across all tiers.
348    pub total_hits: u64,
349    /// Total failed lookups (miss on every tier).
350    pub total_misses: u64,
351    /// `total_hits / (total_hits + total_misses)`, or `0.0` when no requests.
352    pub hit_rate: f64,
353    /// Per-tier detail.
354    pub tier_stats: Vec<TierStats>,
355}
356
357// ── Compression helpers (pure Rust run-length encoding) ──────────────────────
358
359/// Compress `data` using a simple run-length encoding.
360///
361/// Format: pairs of `(count: u8, byte: u8)`.  Runs longer than 255 are split.
362/// Non-run data is encoded as run-length 1.
363fn rle_compress(data: &[u8]) -> Vec<u8> {
364    if data.is_empty() {
365        return Vec::new();
366    }
367    let mut out = Vec::with_capacity(data.len());
368    let mut i = 0;
369    while i < data.len() {
370        let byte = data[i];
371        let mut run = 1usize;
372        while i + run < data.len() && data[i + run] == byte && run < 255 {
373            run += 1;
374        }
375        out.push(run as u8);
376        out.push(byte);
377        i += run;
378    }
379    out
380}
381
382/// Decompress data produced by [`rle_compress`].
383fn rle_decompress(data: &[u8]) -> Vec<u8> {
384    if data.is_empty() {
385        return Vec::new();
386    }
387    let mut out = Vec::with_capacity(data.len() * 2);
388    let mut i = 0;
389    while i + 1 < data.len() {
390        let count = data[i] as usize;
391        let byte = data[i + 1];
392        for _ in 0..count {
393            out.push(byte);
394        }
395        i += 2;
396    }
397    out
398}
399
400// ── Internal per-tier storage ────────────────────────────────────────────────
401
402/// Entry in a `CacheTier`: either owned bytes or an arena handle.
403enum TierEntry {
404    /// Standard heap-owned payload.
405    Owned(Vec<u8>),
406    /// Handle into a `BumpArena`: `(offset, len)`.
407    Arena(usize, usize),
408}
409
410struct CacheTier {
411    config: TierConfig,
412    /// `key → (entry, last_access_tick, frequency)`
413    ///
414    /// For disk-backed tiers the entry bytes are stored as a sentinel (empty)
415    /// since the canonical copy lives on disk.  For memory tiers the full
416    /// (possibly compressed) payload is stored.
417    data: HashMap<String, (TierEntry, u64, u64)>,
418    size_used: usize,
419    /// Insertion-order queue used by the FIFO policy.
420    fifo_order: VecDeque<String>,
421    /// Hit counter for this tier.
422    hits: u64,
423    /// Promotion counter: how many times an entry was promoted from this tier.
424    promotions: u64,
425    /// Compression counter.
426    compressions: u64,
427    /// Internal logical tick (monotonically increasing, per insert/get).
428    tick: u64,
429    /// xorshift32 state for the Random eviction policy.
430    rng_state: u32,
431    /// P² quantile estimator for adaptive promotion threshold (75th percentile
432    /// of per-key access frequency).  Present when `config.adaptive_promotion`.
433    p2_estimator: Option<P2QuantileEstimator>,
434    /// Optional bump arena.  Present when `config.use_arena`.
435    arena: Option<BumpArena>,
436}
437
438impl CacheTier {
439    fn new(config: TierConfig) -> Self {
440        // Create the disk directory if needed.
441        if let Some(ref path) = config.disk_path {
442            let _ = std::fs::create_dir_all(path);
443        }
444        let p2_estimator = if config.adaptive_promotion {
445            Some(P2QuantileEstimator::new(0.75))
446        } else {
447            None
448        };
449        let arena = if config.use_arena {
450            Some(BumpArena::new(config.capacity_bytes))
451        } else {
452            None
453        };
454        Self {
455            config,
456            data: HashMap::new(),
457            size_used: 0,
458            fifo_order: VecDeque::new(),
459            hits: 0,
460            promotions: 0,
461            compressions: 0,
462            tick: 1,
463            rng_state: 0xDEAD_BEEF,
464            p2_estimator,
465            arena,
466        }
467    }
468
469    /// Step the xorshift32 PRNG and return the new state.
470    fn xorshift32(&mut self) -> u32 {
471        let mut x = self.rng_state;
472        x ^= x << 13;
473        x ^= x >> 17;
474        x ^= x << 5;
475        self.rng_state = x;
476        x
477    }
478
479    /// Build the file path for a key in a disk-backed tier.
480    fn disk_path_for(&self, key: &str) -> Option<PathBuf> {
481        self.config.disk_path.as_ref().map(|base| {
482            // Use a simple FNV-1a hash of the key as the filename to avoid
483            // any filesystem-unsafe characters.
484            let mut h: u64 = 0xcbf2_9ce4_8422_2325;
485            for b in key.as_bytes() {
486                h ^= u64::from(*b);
487                h = h.wrapping_mul(0x0000_0100_0000_01b3);
488            }
489            base.join(format!("{h:016x}"))
490        })
491    }
492
493    /// Flush a key's in-memory bytes to disk (disk-backed tier only).
494    fn flush_to_disk(&self, key: &str, bytes: &[u8]) {
495        if let Some(path) = self.disk_path_for(key) {
496            let _ = std::fs::write(path, bytes);
497        }
498    }
499
500    /// Read a key's bytes from disk (disk-backed tier only).
501    fn read_from_disk(&self, key: &str) -> Option<Vec<u8>> {
502        let path = self.disk_path_for(key)?;
503        std::fs::read(path).ok()
504    }
505
506    /// Remove a key's file from disk (disk-backed tier only).
507    fn remove_from_disk(&self, key: &str) {
508        if let Some(path) = self.disk_path_for(key) {
509            let _ = std::fs::remove_file(path);
510        }
511    }
512
513    /// Encode `raw` bytes for storage (apply compression if configured).
514    fn encode(&mut self, raw: &[u8]) -> Vec<u8> {
515        if self.config.compress {
516            self.compressions += 1;
517            rle_compress(raw)
518        } else {
519            raw.to_vec()
520        }
521    }
522
523    /// Decode storage bytes back to raw (decompress if configured).
524    fn decode(&self, stored: &[u8]) -> Vec<u8> {
525        if self.config.compress {
526            rle_decompress(stored)
527        } else {
528            stored.to_vec()
529        }
530    }
531
532    /// Byte length of a `TierEntry` without cloning.
533    fn entry_len(&self, entry: &TierEntry) -> usize {
534        match entry {
535            TierEntry::Owned(v) => v.len(),
536            TierEntry::Arena(_, len) => *len,
537        }
538    }
539
540    fn get(&mut self, key: &str) -> Option<Vec<u8>> {
541        let tick = self.tick;
542        self.tick += 1;
543
544        // For disk-backed tiers, check if we have the key in the in-memory
545        // index but the data must be read from disk.
546        if self.config.disk_path.is_some() {
547            if let Some(entry) = self.data.get_mut(key) {
548                entry.1 = tick;
549                entry.2 += 1;
550                self.hits += 1;
551                // Feed frequency into P² estimator (if adaptive).
552                let freq = entry.2;
553                if let Some(ref mut est) = self.p2_estimator {
554                    est.update(freq as f64);
555                }
556                // Read canonical bytes from disk.
557                return self.read_from_disk(key).map(|stored| self.decode(&stored));
558            }
559            return None;
560        }
561
562        if let Some(entry) = self.data.get_mut(key) {
563            entry.1 = tick; // update last_access
564            entry.2 += 1; // increment frequency
565            self.hits += 1;
566            // Feed frequency into P² estimator (if adaptive).
567            let freq = entry.2;
568            if let Some(ref mut est) = self.p2_estimator {
569                est.update(freq as f64);
570            }
571            // Clone the stored bytes before immutably borrowing again.
572            let raw: Vec<u8> = match &entry.0 {
573                TierEntry::Owned(v) => v.clone(),
574                TierEntry::Arena(offset, len) => {
575                    if let Some(arena) = &self.arena {
576                        arena.get(*offset, *len).to_vec()
577                    } else {
578                        vec![]
579                    }
580                }
581            };
582            let decoded = self.decode(&raw);
583            Some(decoded)
584        } else {
585            None
586        }
587    }
588
589    /// Insert `(key, data)` into this tier, evicting entries as needed until
590    /// there is room.
591    fn put(&mut self, key: String, data: Vec<u8>) {
592        let encoded = self.encode(&data);
593        let stored_len = encoded.len();
594
595        // If the data alone exceeds the tier capacity, skip insertion.
596        if stored_len > self.config.capacity_bytes {
597            return;
598        }
599
600        // Evict until there is enough room.
601        while self.size_used + stored_len > self.config.capacity_bytes {
602            if self.evict_one().is_none() {
603                break;
604            }
605        }
606
607        let tick = self.tick;
608        self.tick += 1;
609        self.size_used += stored_len;
610        self.fifo_order.push_back(key.clone());
611
612        // For disk-backed tiers, write encoded bytes to disk and store a
613        // small sentinel (empty vec) in the in-memory map as an index entry.
614        if self.config.disk_path.is_some() {
615            self.flush_to_disk(&key, &encoded);
616            self.data
617                .insert(key, (TierEntry::Owned(Vec::new()), tick, 1));
618        } else if self.config.use_arena {
619            // Arena path: append to bump arena.
620            let (offset, len) = if let Some(ref mut arena) = self.arena {
621                arena.alloc(&encoded)
622            } else {
623                // Shouldn't happen; fall back to owned.
624                let v = encoded;
625                self.data.insert(key, (TierEntry::Owned(v), tick, 1));
626                return;
627            };
628            self.data
629                .insert(key, (TierEntry::Arena(offset, len), tick, 1));
630        } else {
631            self.data.insert(key, (TierEntry::Owned(encoded), tick, 1));
632        }
633    }
634
635    /// Return the access frequency for `key`, or 0 if not present.
636    fn frequency(&self, key: &str) -> u64 {
637        self.data.get(key).map(|(_, _, f)| *f).unwrap_or(0)
638    }
639
640    /// Return the effective promotion threshold, using the P² estimate when
641    /// available and the warmup guard has passed.
642    fn effective_promotion_threshold(&self) -> u64 {
643        if let Some(ref est) = self.p2_estimator {
644            if let Some(q75) = est.estimate() {
645                // Round up to nearest u64; at least 1.
646                return (q75.ceil() as u64).max(1);
647            }
648        }
649        self.config.promotion_threshold
650    }
651
652    /// Remove `key` from this tier.  Returns `true` if it was present.
653    fn remove(&mut self, key: &str) -> bool {
654        if let Some((entry, _, _)) = self.data.remove(key) {
655            let stored_len = self.entry_len(&entry);
656            self.size_used = self.size_used.saturating_sub(stored_len);
657            self.fifo_order.retain(|k| k != key);
658            if self.config.disk_path.is_some() {
659                self.remove_from_disk(key);
660            }
661            true
662        } else {
663            false
664        }
665    }
666
667    /// Evict one entry according to the configured policy.
668    fn evict_one(&mut self) -> Option<(String, Vec<u8>)> {
669        if self.data.is_empty() {
670            return None;
671        }
672        let victim_key = match &self.config.eviction_policy {
673            EvictionPolicy::Lru => self.pick_lru(),
674            EvictionPolicy::Lfu => self.pick_lfu(),
675            EvictionPolicy::Fifo => self.pick_fifo(),
676            EvictionPolicy::Random => self.pick_random(),
677            EvictionPolicy::TinyLfu => self.pick_tiny_lfu(),
678        }?;
679
680        let (entry, _, _) = self.data.remove(&victim_key)?;
681        let is_disk_sentinel = self.config.disk_path.is_some()
682            && matches!(&entry, TierEntry::Owned(v) if v.is_empty());
683        let stored_bytes: Vec<u8> = match &entry {
684            TierEntry::Owned(v) => v.clone(),
685            TierEntry::Arena(offset, len) => {
686                if let Some(arena) = &self.arena {
687                    arena.get(*offset, *len).to_vec()
688                } else {
689                    vec![]
690                }
691            }
692        };
693        let data = if self.config.disk_path.is_some() {
694            // Return decoded bytes from disk for possible demotion to lower tier.
695            let from_disk = self.read_from_disk(&victim_key).unwrap_or_default();
696            self.remove_from_disk(&victim_key);
697            self.decode(&from_disk)
698        } else {
699            self.decode(&stored_bytes)
700        };
701        let size_removed = if is_disk_sentinel {
702            // For disk tiers the in-memory sentinel is empty; use data.len as
703            // approximate (compression may differ, but this avoids drift).
704            data.len()
705        } else {
706            stored_bytes.len()
707        };
708        self.size_used = self.size_used.saturating_sub(size_removed);
709        self.fifo_order.retain(|k| *k != victim_key);
710        Some((victim_key, data))
711    }
712
713    fn pick_lru(&self) -> Option<String> {
714        self.data
715            .iter()
716            .min_by_key(|(_, (_, last_access, _))| *last_access)
717            .map(|(k, _)| k.clone())
718    }
719
720    fn pick_lfu(&self) -> Option<String> {
721        self.data
722            .iter()
723            .min_by(|(_, (_, la_a, freq_a)), (_, (_, la_b, freq_b))| {
724                freq_a.cmp(freq_b).then(la_a.cmp(la_b))
725            })
726            .map(|(k, _)| k.clone())
727    }
728
729    fn pick_fifo(&self) -> Option<String> {
730        self.fifo_order.front().cloned()
731    }
732
733    fn pick_random(&mut self) -> Option<String> {
734        if self.data.is_empty() {
735            return None;
736        }
737        let count = self.data.len();
738        let rnd = self.xorshift32() as usize % count;
739        self.data.keys().nth(rnd).cloned()
740    }
741
742    /// TinyLFU: use `frequency % 4` as a count-min-sketch approximation.
743    fn pick_tiny_lfu(&mut self) -> Option<String> {
744        let candidate = self
745            .data
746            .iter()
747            .min_by(|(_, (_, la_a, freq_a)), (_, (_, la_b, freq_b))| {
748                let sketch_a = freq_a % 4;
749                let sketch_b = freq_b % 4;
750                sketch_a.cmp(&sketch_b).then(la_a.cmp(la_b))
751            })
752            .map(|(k, v)| (k.clone(), v.2))?;
753
754        let (key, freq) = candidate;
755        if freq >= 2 {
756            let rnd = self.xorshift32() as u64;
757            if rnd % freq >= freq / 2 {
758                return self.pick_lfu();
759            }
760        }
761        Some(key)
762    }
763}
764
765impl Drop for CacheTier {
766    fn drop(&mut self) {
767        // For disk-backed tiers: remove all files on drop to avoid leaving
768        // stale data. In production you would persist; here we clean up the
769        // test directory.  Only do this when the base path still exists.
770        if let Some(ref base) = self.config.disk_path {
771            let keys: Vec<String> = self.data.keys().cloned().collect();
772            for key in keys {
773                if let Some(path) = self.disk_path_for(&key) {
774                    let _ = std::fs::remove_file(path);
775                }
776            }
777            // Attempt to remove the directory if empty (best-effort).
778            let _ = std::fs::remove_dir(base);
779        }
780    }
781}
782
783// ── TieredCache ───────────────────────────────────────────────────────────────
784
785/// A multi-tier cache where each tier has its own [`TierConfig`].
786///
787/// Reads check tiers in order (L1 first); a hit in tier *i* > 0 promotes the
788/// entry to tier *i-1* when the key's access frequency in that tier meets or
789/// exceeds the tier's [`TierConfig::promotion_threshold`].  Writes always go
790/// to L1 only.
791pub struct TieredCache {
792    tiers: Vec<CacheTier>,
793    total_hits: u64,
794    total_misses: u64,
795    /// Per-tier hit counters (parallel to `tiers`).
796    tier_hits: Vec<u64>,
797}
798
799impl TieredCache {
800    /// Construct a `TieredCache` from a list of tier configurations.
801    /// The first element is L1 (fastest / smallest), last is the slowest.
802    pub fn new(tiers: Vec<TierConfig>) -> Self {
803        let n = tiers.len();
804        Self {
805            tiers: tiers.into_iter().map(CacheTier::new).collect(),
806            total_hits: 0,
807            total_misses: 0,
808            tier_hits: vec![0; n],
809        }
810    }
811
812    /// Look up `key` across all tiers in order.
813    ///
814    /// On a hit in tier *i* > 0, the entry is promoted to tier *i-1* if the
815    /// key's frequency in that tier meets or exceeds the effective promotion
816    /// threshold (static or P²-adaptive depending on configuration).
817    pub fn get(&mut self, key: &str) -> Option<Vec<u8>> {
818        for tier_idx in 0..self.tiers.len() {
819            if let Some(data) = self.tiers[tier_idx].get(key) {
820                self.total_hits += 1;
821                self.tier_hits[tier_idx] += 1;
822                // Adaptive promotion: only promote if frequency threshold met.
823                if tier_idx > 0 {
824                    let freq = self.tiers[tier_idx].frequency(key);
825                    let threshold = self.tiers[tier_idx].effective_promotion_threshold();
826                    if freq >= threshold {
827                        self.tiers[tier_idx].promotions += 1;
828                        let key_owned = key.to_string();
829                        self.tiers[tier_idx - 1].put(key_owned, data.clone());
830                    }
831                }
832                return Some(data);
833            }
834        }
835        self.total_misses += 1;
836        None
837    }
838
839    /// Insert `(key, data)` into the L1 tier.
840    pub fn put(&mut self, key: &str, data: Vec<u8>) {
841        self.tiers[0].put(key.to_string(), data);
842    }
843
844    /// Insert `(key, data)` directly into tier `tier_idx`.
845    ///
846    /// Useful for pre-populating lower tiers (e.g. from a warm-up snapshot).
847    pub fn put_at_tier(&mut self, tier_idx: usize, key: &str, data: Vec<u8>) {
848        if let Some(tier) = self.tiers.get_mut(tier_idx) {
849            tier.put(key.to_string(), data);
850        }
851    }
852
853    /// Evict one entry from tier `tier_idx` according to that tier's policy.
854    /// Returns the evicted `(key, data)` or `None` if the tier is empty.
855    pub fn evict_tier(&mut self, tier_idx: usize) -> Option<(String, Vec<u8>)> {
856        self.tiers.get_mut(tier_idx)?.evict_one()
857    }
858
859    /// Return an aggregate statistics snapshot.
860    pub fn stats(&self) -> TieredCacheStats {
861        let total = self.total_hits + self.total_misses;
862        let hit_rate = if total == 0 {
863            0.0
864        } else {
865            self.total_hits as f64 / total as f64
866        };
867        let tier_stats = self
868            .tiers
869            .iter()
870            .enumerate()
871            .map(|(i, t)| TierStats {
872                name: t.config.name.clone(),
873                hits: self.tier_hits[i],
874                size_used_bytes: t.size_used,
875                entry_count: t.data.len(),
876                promotions: t.promotions,
877                compressions: t.compressions,
878            })
879            .collect();
880        TieredCacheStats {
881            total_hits: self.total_hits,
882            total_misses: self.total_misses,
883            hit_rate,
884            tier_stats,
885        }
886    }
887
888    /// Bulk-insert `entries` into L1 without triggering eviction.
889    pub fn warmup(&mut self, entries: &[(String, Vec<u8>)]) {
890        for (key, data) in entries {
891            let data_len = data.len();
892            if self.tiers[0].size_used + data_len <= self.tiers[0].config.capacity_bytes {
893                let tick = self.tiers[0].tick;
894                self.tiers[0].tick += 1;
895                self.tiers[0].size_used += data_len;
896                self.tiers[0].fifo_order.push_back(key.clone());
897                self.tiers[0]
898                    .data
899                    .insert(key.clone(), (TierEntry::Owned(data.clone()), tick, 1));
900            }
901        }
902    }
903
904    /// Remove `key` from every tier.  Returns `true` if it was found in at
905    /// least one tier.
906    pub fn invalidate(&mut self, key: &str) -> bool {
907        let mut found = false;
908        for tier in &mut self.tiers {
909            if tier.remove(key) {
910                found = true;
911            }
912        }
913        found
914    }
915
916    /// Return the number of tiers.
917    pub fn tier_count(&self) -> usize {
918        self.tiers.len()
919    }
920
921    /// Return the number of promotions from tier `tier_idx` to the tier above.
922    pub fn tier_promotions(&self, tier_idx: usize) -> u64 {
923        self.tiers.get(tier_idx).map(|t| t.promotions).unwrap_or(0)
924    }
925
926    /// Return the number of hits recorded for tier `tier_idx`.
927    pub fn tier_hit_count(&self, tier_idx: usize) -> u64 {
928        self.tier_hits.get(tier_idx).copied().unwrap_or(0)
929    }
930
931    /// Reset the bump arena of tier `tier_idx` (reclaims all arena memory).
932    ///
933    /// After a reset, all previously stored arena handles for that tier are
934    /// invalid; this is intended for use after a bulk eviction sweep.
935    pub fn reset_tier_arena(&mut self, tier_idx: usize) {
936        if let Some(tier) = self.tiers.get_mut(tier_idx) {
937            if let Some(ref mut arena) = tier.arena {
938                arena.reset();
939            }
940        }
941    }
942}
943
944// ── Tests ─────────────────────────────────────────────────────────────────────
945
946#[cfg(test)]
947mod tests {
948    use super::*;
949
950    fn two_tier_cache(l1_bytes: usize, l2_bytes: usize) -> TieredCache {
951        TieredCache::new(vec![
952            TierConfig {
953                name: "L1".into(),
954                capacity_bytes: l1_bytes,
955                eviction_policy: EvictionPolicy::Lru,
956                ..TierConfig::memory("L1", l1_bytes)
957            },
958            TierConfig {
959                name: "L2".into(),
960                capacity_bytes: l2_bytes,
961                eviction_policy: EvictionPolicy::Lfu,
962                ..TierConfig::memory("L2", l2_bytes)
963            },
964        ])
965    }
966
967    // 1. Basic put and get
968    #[test]
969    fn test_basic_put_get() {
970        let mut cache = two_tier_cache(1024, 4096);
971        cache.put("key1", b"hello".to_vec());
972        assert_eq!(cache.get("key1"), Some(b"hello".to_vec()));
973    }
974
975    // 2. Miss returns None
976    #[test]
977    fn test_miss() {
978        let mut cache = two_tier_cache(1024, 4096);
979        assert_eq!(cache.get("absent"), None);
980        assert_eq!(cache.stats().total_misses, 1);
981    }
982
983    // 3. Hit rate calculation
984    #[test]
985    fn test_hit_rate() {
986        let mut cache = two_tier_cache(1024, 4096);
987        cache.put("k", b"v".to_vec());
988        cache.get("k"); // hit
989        cache.get("nope"); // miss
990        let s = cache.stats();
991        assert!((s.hit_rate - 0.5).abs() < 1e-9);
992    }
993
994    // 4. L1 eviction under LRU policy
995    #[test]
996    fn test_l1_lru_eviction() {
997        let mut cache = two_tier_cache(3, 1024);
998        cache.put("a", b"1".to_vec());
999        cache.put("b", b"2".to_vec());
1000        cache.put("c", b"3".to_vec());
1001        cache.get("a");
1002        cache.put("d", b"4".to_vec());
1003        assert_eq!(cache.get("b"), None);
1004        assert!(cache.get("a").is_some());
1005    }
1006
1007    // 5. invalidate removes from all tiers
1008    #[test]
1009    fn test_invalidate() {
1010        let mut cache = two_tier_cache(1024, 4096);
1011        cache.put("x", b"data".to_vec());
1012        assert!(cache.invalidate("x"));
1013        assert_eq!(cache.get("x"), None);
1014    }
1015
1016    // 6. invalidate on absent key returns false
1017    #[test]
1018    fn test_invalidate_absent() {
1019        let mut cache = two_tier_cache(1024, 4096);
1020        assert!(!cache.invalidate("ghost"));
1021    }
1022
1023    // 7. warmup populates L1 without eviction
1024    #[test]
1025    fn test_warmup() {
1026        let mut cache = two_tier_cache(1024, 4096);
1027        let entries = vec![
1028            ("alpha".to_string(), b"AAA".to_vec()),
1029            ("beta".to_string(), b"BBB".to_vec()),
1030        ];
1031        cache.warmup(&entries);
1032        assert_eq!(cache.get("alpha"), Some(b"AAA".to_vec()));
1033        assert_eq!(cache.get("beta"), Some(b"BBB".to_vec()));
1034    }
1035
1036    // 8. stats entry_count
1037    #[test]
1038    fn test_stats_entry_count() {
1039        let mut cache = two_tier_cache(1024, 4096);
1040        cache.put("a", b"1".to_vec());
1041        cache.put("b", b"2".to_vec());
1042        assert_eq!(cache.stats().tier_stats[0].entry_count, 2);
1043    }
1044
1045    // 9. FIFO eviction policy
1046    #[test]
1047    fn test_fifo_eviction() {
1048        let mut cache = TieredCache::new(vec![TierConfig {
1049            eviction_policy: EvictionPolicy::Fifo,
1050            ..TierConfig::memory("fifo", 3)
1051        }]);
1052        cache.put("first", b"1".to_vec());
1053        cache.put("second", b"2".to_vec());
1054        cache.put("third", b"3".to_vec());
1055        cache.put("fourth", b"4".to_vec());
1056        assert_eq!(cache.get("first"), None);
1057    }
1058
1059    // 10. Random eviction policy (smoke test)
1060    #[test]
1061    fn test_random_eviction_no_panic() {
1062        let mut cache = TieredCache::new(vec![TierConfig {
1063            eviction_policy: EvictionPolicy::Random,
1064            ..TierConfig::memory("rand", 5)
1065        }]);
1066        for i in 0..20u8 {
1067            cache.put(&i.to_string(), vec![i]);
1068        }
1069        assert!(cache.stats().tier_stats[0].entry_count <= 5);
1070    }
1071
1072    // 11. TinyLFU eviction policy (smoke test)
1073    #[test]
1074    fn test_tiny_lfu_eviction_no_panic() {
1075        let mut cache = TieredCache::new(vec![TierConfig {
1076            eviction_policy: EvictionPolicy::TinyLfu,
1077            ..TierConfig::memory("tiny", 5)
1078        }]);
1079        for i in 0..20u8 {
1080            cache.put(&i.to_string(), vec![i]);
1081        }
1082        assert!(cache.stats().tier_stats[0].entry_count <= 5);
1083    }
1084
1085    // 12. evict_tier API
1086    #[test]
1087    fn test_evict_tier() {
1088        let mut cache = two_tier_cache(1024, 4096);
1089        cache.put("a", b"data".to_vec());
1090        let evicted = cache.evict_tier(0);
1091        assert!(evicted.is_some());
1092        let (k, _) = evicted.expect("eviction should succeed");
1093        assert_eq!(k, "a");
1094    }
1095
1096    // 13. evict_tier on empty tier returns None
1097    #[test]
1098    fn test_evict_empty_tier() {
1099        let mut cache = two_tier_cache(1024, 4096);
1100        assert!(cache.evict_tier(0).is_none());
1101    }
1102
1103    // 14. size_used_bytes tracks usage
1104    #[test]
1105    fn test_size_used_bytes() {
1106        let mut cache = two_tier_cache(1024, 4096);
1107        cache.put("a", vec![0u8; 100]);
1108        cache.put("b", vec![0u8; 200]);
1109        assert_eq!(cache.stats().tier_stats[0].size_used_bytes, 300);
1110    }
1111
1112    // 15. Tier hit counters: L1 hit increments tier 0
1113    #[test]
1114    fn test_tier_hit_counters() {
1115        let mut cache = two_tier_cache(1024, 4096);
1116        cache.put("k", b"v".to_vec());
1117        cache.get("k");
1118        cache.get("k");
1119        let s = cache.stats();
1120        assert_eq!(s.tier_stats[0].hits, 2);
1121    }
1122
1123    // 16. Compression: compressed tier stores and retrieves correctly
1124    #[test]
1125    fn test_compression_roundtrip() {
1126        let mut cache = TieredCache::new(vec![TierConfig {
1127            compress: true,
1128            ..TierConfig::memory("compressed", 1024 * 1024)
1129        }]);
1130        // Highly compressible data: run of the same byte.
1131        let data = vec![0xABu8; 512];
1132        cache.put("k", data.clone());
1133        let retrieved = cache.get("k").expect("should be present");
1134        assert_eq!(
1135            retrieved, data,
1136            "compressed entry should decompress correctly"
1137        );
1138    }
1139
1140    // 17. Compression: stats track compression count
1141    #[test]
1142    fn test_compression_stats() {
1143        let mut cache = TieredCache::new(vec![TierConfig {
1144            compress: true,
1145            ..TierConfig::memory("c", 1024 * 1024)
1146        }]);
1147        cache.put("a", vec![1u8; 64]);
1148        cache.put("b", vec![2u8; 64]);
1149        let s = cache.stats();
1150        assert_eq!(
1151            s.tier_stats[0].compressions, 2,
1152            "two puts should compress twice"
1153        );
1154    }
1155
1156    // 18. Adaptive promotion: high-frequency key promotes; low-frequency stays
1157    #[test]
1158    fn test_adaptive_promotion_threshold() {
1159        // L1: tiny (only fits 10 bytes), threshold 0.
1160        // L2: larger, threshold 3 (must access 3 times before promotion).
1161        let mut cache = TieredCache::new(vec![
1162            TierConfig::memory("L1", 10),
1163            TierConfig {
1164                promotion_threshold: 3,
1165                ..TierConfig::memory("L2", 1024)
1166            },
1167        ]);
1168
1169        // Put a single-byte value directly into L2.
1170        cache.put_at_tier(1, "hot", b"v".to_vec());
1171
1172        // First and second accesses: frequency < 3, no promotion.
1173        cache.get("hot"); // freq becomes 1
1174        cache.get("hot"); // freq becomes 2
1175
1176        // Third access meets threshold (>= 3) — no, wait: threshold is 3 and
1177        // frequency after get becomes 3. Let us verify get "hot" a third time.
1178        cache.get("hot"); // freq becomes 3 → promoted
1179
1180        let s = cache.stats();
1181        assert!(
1182            s.tier_stats[1].promotions >= 1,
1183            "entry should have been promoted after reaching threshold"
1184        );
1185    }
1186
1187    // 19. Disk-backed tier stores and retrieves entries
1188    #[test]
1189    fn test_disk_tier_basic() {
1190        let dir = std::env::temp_dir().join(format!(
1191            "oximedia_tiered_disk_{}",
1192            std::time::SystemTime::now()
1193                .duration_since(std::time::UNIX_EPOCH)
1194                .map(|d| d.subsec_nanos())
1195                .unwrap_or(42)
1196        ));
1197        let mut cache = TieredCache::new(vec![TierConfig::disk("disk", 1024 * 1024, &dir)]);
1198        cache.put("segment-001", b"media data here".to_vec());
1199        let got = cache.get("segment-001");
1200        assert_eq!(
1201            got,
1202            Some(b"media data here".to_vec()),
1203            "disk tier should retrieve the value correctly"
1204        );
1205        // dir is cleaned up by CacheTier::drop
1206    }
1207
1208    // 20. TierConfig::memory helper
1209    #[test]
1210    fn test_tier_config_memory_helper() {
1211        let cfg = TierConfig::memory("L1", 4096);
1212        assert_eq!(cfg.name, "L1");
1213        assert_eq!(cfg.capacity_bytes, 4096);
1214        assert!(cfg.disk_path.is_none());
1215        assert!(!cfg.compress);
1216    }
1217
1218    // 21. tier_count
1219    #[test]
1220    fn test_tier_count() {
1221        let cache = two_tier_cache(1024, 4096);
1222        assert_eq!(cache.tier_count(), 2);
1223    }
1224
1225    // 22. put_at_tier inserts into specified tier
1226    #[test]
1227    fn test_put_at_tier() {
1228        let mut cache = two_tier_cache(1024, 4096);
1229        cache.put_at_tier(1, "l2-key", b"l2-value".to_vec());
1230        assert_eq!(cache.stats().tier_stats[1].entry_count, 1);
1231        // Can retrieve via get (searches all tiers).
1232        assert_eq!(cache.get("l2-key"), Some(b"l2-value".to_vec()));
1233    }
1234
1235    // 23. RLE compress + decompress are inverse
1236    #[test]
1237    fn test_rle_roundtrip() {
1238        for input in [
1239            b"".as_ref(),
1240            b"hello",
1241            b"\x00\x00\x00\x00",
1242            b"AAABBBCCC",
1243            b"abcdefghij",
1244        ] {
1245            let compressed = rle_compress(input);
1246            let decompressed = rle_decompress(&compressed);
1247            assert_eq!(decompressed, input, "rle roundtrip failed for {:?}", input);
1248        }
1249    }
1250}