Skip to main content

oximedia_cache/
cache_partitioning.rs

1//! Cache partitioning: isolate cache space per tenant, stream, or workload.
2//!
3//! [`PartitionedCache`] manages a collection of named [`CachePartition`]s,
4//! each with an independent LRU policy and byte-level capacity budget.
5
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9// ── CacheEntry ────────────────────────────────────────────────────────────────
10
11/// A single cached value together with its metadata.
12#[derive(Debug, Clone)]
13pub struct CacheEntry {
14    /// Raw value bytes.
15    pub data: Vec<u8>,
16    /// Size in bytes; if `0` the `data.len()` is used.
17    pub size_bytes: usize,
18    /// Optional expiry wall-clock instant.
19    pub expires_at: Option<Instant>,
20    /// Logical priority tag (higher = more important to keep).
21    pub priority: u32,
22    /// Wall-clock time at which this entry was last read.
23    pub last_accessed: Instant,
24    /// Number of times this entry has been accessed.
25    pub access_count: u64,
26}
27
28impl CacheEntry {
29    /// Create a new `CacheEntry` with default metadata.
30    pub fn new(data: Vec<u8>) -> Self {
31        let size_bytes = data.len();
32        Self {
33            data,
34            size_bytes,
35            expires_at: None,
36            priority: 0,
37            last_accessed: Instant::now(),
38            access_count: 0,
39        }
40    }
41
42    /// Create a `CacheEntry` with an explicit byte-size hint, TTL, and priority.
43    pub fn with_meta(
44        data: Vec<u8>,
45        size_bytes: usize,
46        ttl: Option<Duration>,
47        priority: u32,
48    ) -> Self {
49        let effective_size = if size_bytes == 0 {
50            data.len()
51        } else {
52            size_bytes
53        };
54        Self {
55            data,
56            size_bytes: effective_size,
57            expires_at: ttl.map(|d| Instant::now() + d),
58            priority,
59            last_accessed: Instant::now(),
60            access_count: 0,
61        }
62    }
63
64    /// Return `true` if this entry has expired.
65    pub fn is_expired(&self) -> bool {
66        self.expires_at
67            .map(|exp| Instant::now() >= exp)
68            .unwrap_or(false)
69    }
70}
71
72// ── CachePartition ────────────────────────────────────────────────────────────
73
74/// An isolated cache partition with its own byte-level capacity.
75///
76/// Internally uses an insertion-ordered key list (`Vec<String>`) for LRU
77/// tracking, and a `HashMap` for O(1) value access.  Eviction walks from
78/// the tail (oldest) towards the head (newest), skipping high-priority items.
79pub struct CachePartition {
80    /// Human-readable name for this partition.
81    pub name: String,
82    /// Maximum byte budget for this partition.
83    pub max_bytes: usize,
84    /// Key→entry store.
85    entries: HashMap<String, CacheEntry>,
86    /// LRU ordering: front = MRU, back = LRU.
87    lru_order: Vec<String>,
88    /// Currently used bytes.
89    used_bytes: usize,
90    /// Partition-level hit counter.
91    hits: u64,
92    /// Partition-level miss counter.
93    misses: u64,
94    /// Partition-level eviction counter.
95    evictions: u64,
96}
97
98impl CachePartition {
99    /// Create a new partition with the given name and byte capacity.
100    pub fn new(name: impl Into<String>, max_bytes: usize) -> Self {
101        Self {
102            name: name.into(),
103            max_bytes,
104            entries: HashMap::new(),
105            lru_order: Vec::new(),
106            used_bytes: 0,
107            hits: 0,
108            misses: 0,
109            evictions: 0,
110        }
111    }
112
113    /// Return the number of entries in this partition.
114    pub fn len(&self) -> usize {
115        self.entries.len()
116    }
117
118    /// Return `true` when the partition is empty.
119    pub fn is_empty(&self) -> bool {
120        self.entries.is_empty()
121    }
122
123    /// Return currently used bytes.
124    pub fn used_bytes(&self) -> usize {
125        self.used_bytes
126    }
127
128    /// Return partition statistics.
129    pub fn stats(&self) -> PartitionStats {
130        PartitionStats {
131            name: self.name.clone(),
132            entry_count: self.entries.len(),
133            used_bytes: self.used_bytes,
134            max_bytes: self.max_bytes,
135            hits: self.hits,
136            misses: self.misses,
137            evictions: self.evictions,
138        }
139    }
140
141    /// Retrieve the entry for `key`.
142    ///
143    /// Returns `None` if the key is absent or its TTL has expired (in which
144    /// case the entry is lazily removed).
145    pub fn get(&mut self, key: &str) -> Option<&CacheEntry> {
146        // Lazy TTL eviction.
147        let expired = self
148            .entries
149            .get(key)
150            .map(|e| e.is_expired())
151            .unwrap_or(false);
152        if expired {
153            self.remove_entry(key);
154            self.misses += 1;
155            return None;
156        }
157
158        if let Some(entry) = self.entries.get_mut(key) {
159            entry.last_accessed = Instant::now();
160            entry.access_count += 1;
161            self.hits += 1;
162            // Promote to MRU head.
163            self.lru_order.retain(|k| k != key);
164            self.lru_order.insert(0, key.to_string());
165            // SAFETY: we just confirmed the key exists.
166            self.entries.get(key)
167        } else {
168            self.misses += 1;
169            None
170        }
171    }
172
173    /// Peek at an entry without updating LRU order or access statistics.
174    pub fn peek(&self, key: &str) -> Option<&CacheEntry> {
175        self.entries.get(key)
176    }
177
178    /// Insert or update `(key, entry)` in this partition.
179    ///
180    /// If the entry does not fit even after evicting all lower-priority
181    /// entries the insert is silently dropped.
182    pub fn put(&mut self, key: String, entry: CacheEntry) {
183        // Remove existing entry if present so we can replace it.
184        if self.entries.contains_key(&key) {
185            self.remove_entry(&key);
186        }
187
188        // Evict until there is enough room.
189        while self.used_bytes + entry.size_bytes > self.max_bytes {
190            if self.evict_one_lru().is_none() {
191                break;
192            }
193        }
194
195        // If still no room, drop the insert.
196        if self.used_bytes + entry.size_bytes > self.max_bytes {
197            return;
198        }
199
200        self.used_bytes += entry.size_bytes;
201        self.lru_order.insert(0, key.clone());
202        self.entries.insert(key, entry);
203    }
204
205    /// Remove `key` from this partition. Returns `true` if it was present.
206    pub fn remove(&mut self, key: &str) -> bool {
207        self.remove_entry(key)
208    }
209
210    /// Evict the least-recently-used entry.
211    ///
212    /// Returns the evicted key or `None` if the partition is empty.
213    pub fn evict_one_lru(&mut self) -> Option<String> {
214        // Walk from LRU end to find an evictable entry.
215        let victim = self
216            .lru_order
217            .iter()
218            .rev()
219            .find(|k| {
220                // Skip high-priority entries (priority > 0) as a simple heuristic.
221                self.entries
222                    .get(*k)
223                    .map(|e| e.priority == 0)
224                    .unwrap_or(false)
225            })
226            .cloned()
227            .or_else(|| {
228                // If all entries have priority > 0, fall back to true LRU.
229                self.lru_order.last().cloned()
230            });
231
232        if let Some(key) = victim {
233            self.remove_entry(&key);
234            self.evictions += 1;
235            Some(key)
236        } else {
237            None
238        }
239    }
240
241    /// Evict entries from this partition until `bytes_to_free` bytes have been
242    /// freed or the partition is empty.
243    ///
244    /// Returns the number of bytes actually freed.
245    pub fn evict_bytes(&mut self, bytes_to_free: usize) -> usize {
246        let start_used = self.used_bytes;
247        while self.used_bytes + bytes_to_free > start_used {
248            if self.used_bytes == 0 {
249                break;
250            }
251            // Check if we've freed enough.
252            if start_used.saturating_sub(self.used_bytes) >= bytes_to_free {
253                break;
254            }
255            if self.evict_one_lru().is_none() {
256                break;
257            }
258        }
259        start_used.saturating_sub(self.used_bytes)
260    }
261
262    /// Purge all expired entries. Returns the count of entries removed.
263    pub fn purge_expired(&mut self) -> usize {
264        let expired: Vec<String> = self
265            .entries
266            .iter()
267            .filter(|(_, e)| e.is_expired())
268            .map(|(k, _)| k.clone())
269            .collect();
270        let count = expired.len();
271        for key in expired {
272            self.remove_entry(&key);
273        }
274        count
275    }
276
277    // ── Private helpers ───────────────────────────────────────────────────────
278
279    fn remove_entry(&mut self, key: &str) -> bool {
280        if let Some(entry) = self.entries.remove(key) {
281            self.used_bytes = self.used_bytes.saturating_sub(entry.size_bytes);
282            self.lru_order.retain(|k| k != key);
283            true
284        } else {
285            false
286        }
287    }
288}
289
290// ── PartitionStats ────────────────────────────────────────────────────────────
291
292/// Snapshot of per-partition statistics.
293#[derive(Debug, Clone)]
294pub struct PartitionStats {
295    /// Partition name.
296    pub name: String,
297    /// Number of entries.
298    pub entry_count: usize,
299    /// Used bytes.
300    pub used_bytes: usize,
301    /// Maximum allowed bytes.
302    pub max_bytes: usize,
303    /// Cumulative hit count.
304    pub hits: u64,
305    /// Cumulative miss count.
306    pub misses: u64,
307    /// Cumulative eviction count.
308    pub evictions: u64,
309}
310
311impl PartitionStats {
312    /// Return the utilisation ratio `used_bytes / max_bytes`, clamped to
313    /// `[0.0, 1.0]`.
314    pub fn utilisation(&self) -> f64 {
315        if self.max_bytes == 0 {
316            return 1.0;
317        }
318        (self.used_bytes as f64 / self.max_bytes as f64).min(1.0)
319    }
320}
321
322// ── PartitionedCache ──────────────────────────────────────────────────────────
323
324/// A cache composed of multiple named [`CachePartition`]s.
325///
326/// Each partition is isolated: operations on one partition never evict entries
327/// from another.  A configurable `default_partition` is used when callers
328/// omit the partition name.
329pub struct PartitionedCache {
330    partitions: HashMap<String, CachePartition>,
331    /// Name of the default partition used by `*_default` methods.
332    pub default_partition: String,
333}
334
335impl PartitionedCache {
336    /// Create a new `PartitionedCache` with a single default partition.
337    pub fn new(default_partition: impl Into<String>, default_capacity_bytes: usize) -> Self {
338        let name = default_partition.into();
339        let mut partitions = HashMap::new();
340        partitions.insert(
341            name.clone(),
342            CachePartition::new(name.clone(), default_capacity_bytes),
343        );
344        Self {
345            partitions,
346            default_partition: name,
347        }
348    }
349
350    /// Add a new named partition.  If a partition with the same name already
351    /// exists it is replaced.
352    pub fn add_partition(&mut self, name: impl Into<String>, max_bytes: usize) {
353        let n = name.into();
354        self.partitions
355            .insert(n.clone(), CachePartition::new(n, max_bytes));
356    }
357
358    /// Remove a partition by name.  Returns `true` if it existed.
359    ///
360    /// The default partition cannot be removed.
361    pub fn remove_partition(&mut self, name: &str) -> bool {
362        if name == self.default_partition {
363            return false;
364        }
365        self.partitions.remove(name).is_some()
366    }
367
368    /// Return `true` if a partition with `name` exists.
369    pub fn has_partition(&self, name: &str) -> bool {
370        self.partitions.contains_key(name)
371    }
372
373    /// Return a list of all partition names.
374    pub fn partition_names(&self) -> Vec<String> {
375        self.partitions.keys().cloned().collect()
376    }
377
378    /// Get the entry for `key` from `partition`.
379    ///
380    /// Returns `None` if the partition does not exist, the key is absent, or
381    /// the entry has expired.
382    pub fn get(&mut self, partition: &str, key: &str) -> Option<&CacheEntry> {
383        self.partitions.get_mut(partition)?.get(key)
384    }
385
386    /// Insert `(key, entry)` into `partition`.
387    ///
388    /// If `partition` does not exist the call is silently dropped.
389    pub fn put(&mut self, partition: &str, key: String, entry: CacheEntry) {
390        if let Some(p) = self.partitions.get_mut(partition) {
391            p.put(key, entry);
392        }
393    }
394
395    /// Remove `key` from `partition`.  Returns `true` if it was found.
396    pub fn remove(&mut self, partition: &str, key: &str) -> bool {
397        self.partitions
398            .get_mut(partition)
399            .map(|p| p.remove(key))
400            .unwrap_or(false)
401    }
402
403    /// Evict `bytes` of data from `partition`.
404    ///
405    /// Returns the number of bytes freed.
406    pub fn evict_from(&mut self, partition: &str, bytes: usize) -> usize {
407        self.partitions
408            .get_mut(partition)
409            .map(|p| p.evict_bytes(bytes))
410            .unwrap_or(0)
411    }
412
413    /// Return partition statistics for `partition`, or `None` if it does not
414    /// exist.
415    pub fn partition_stats(&self, partition: &str) -> Option<PartitionStats> {
416        self.partitions.get(partition).map(|p| p.stats())
417    }
418
419    /// Return statistics for all partitions.
420    pub fn all_stats(&self) -> Vec<PartitionStats> {
421        self.partitions.values().map(|p| p.stats()).collect()
422    }
423
424    /// Total number of entries across all partitions.
425    pub fn total_entries(&self) -> usize {
426        self.partitions.values().map(|p| p.len()).sum()
427    }
428
429    /// Total bytes used across all partitions.
430    pub fn total_used_bytes(&self) -> usize {
431        self.partitions.values().map(|p| p.used_bytes()).sum()
432    }
433
434    /// Purge expired entries from all partitions. Returns total count removed.
435    pub fn purge_all_expired(&mut self) -> usize {
436        self.partitions
437            .values_mut()
438            .map(|p| p.purge_expired())
439            .sum()
440    }
441}
442
443// ── Tests ─────────────────────────────────────────────────────────────────────
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448
449    fn make_cache(default_cap: usize) -> PartitionedCache {
450        PartitionedCache::new("default", default_cap)
451    }
452
453    fn entry(data: &[u8]) -> CacheEntry {
454        CacheEntry::new(data.to_vec())
455    }
456
457    fn entry_sized(data: &[u8], size: usize) -> CacheEntry {
458        CacheEntry::with_meta(data.to_vec(), size, None, 0)
459    }
460
461    // 1. Basic put and get on default partition
462    #[test]
463    fn test_put_get_default() {
464        let mut cache = make_cache(1024);
465        cache.put("default", "key1".to_string(), entry(b"hello"));
466        let e = cache.get("default", "key1").expect("entry should be found");
467        assert_eq!(e.data, b"hello");
468    }
469
470    // 2. Miss on absent key returns None
471    #[test]
472    fn test_get_absent() {
473        let mut cache = make_cache(1024);
474        assert!(cache.get("default", "missing").is_none());
475    }
476
477    // 3. Put on non-existent partition is silently dropped
478    #[test]
479    fn test_put_nonexistent_partition_ignored() {
480        let mut cache = make_cache(1024);
481        cache.put("ghost", "k".to_string(), entry(b"v"));
482        assert!(cache.get("ghost", "k").is_none());
483    }
484
485    // 4. add_partition creates isolated namespace
486    #[test]
487    fn test_add_partition_isolation() {
488        let mut cache = make_cache(1024);
489        cache.add_partition("tenant_a", 512);
490        cache.add_partition("tenant_b", 512);
491        cache.put("tenant_a", "shared".to_string(), entry(b"a-data"));
492        cache.put("tenant_b", "shared".to_string(), entry(b"b-data"));
493        let a = cache.get("tenant_a", "shared").expect("tenant_a entry");
494        assert_eq!(a.data, b"a-data");
495        let b = cache.get("tenant_b", "shared").expect("tenant_b entry");
496        assert_eq!(b.data, b"b-data");
497    }
498
499    // 5. evict_from frees space in the target partition
500    #[test]
501    fn test_evict_from() {
502        let mut cache = make_cache(10_000);
503        cache.add_partition("stream", 1000);
504        for i in 0..5u8 {
505            cache.put("stream", format!("seg-{i}"), entry_sized(&[i; 100], 100));
506        }
507        let freed = cache.evict_from("stream", 150);
508        assert!(freed > 0, "some bytes should be freed");
509    }
510
511    // 6. remove_partition removes it from the cache
512    #[test]
513    fn test_remove_partition() {
514        let mut cache = make_cache(1024);
515        cache.add_partition("temp", 256);
516        assert!(cache.has_partition("temp"));
517        let removed = cache.remove_partition("temp");
518        assert!(removed);
519        assert!(!cache.has_partition("temp"));
520    }
521
522    // 7. Default partition cannot be removed
523    #[test]
524    fn test_cannot_remove_default_partition() {
525        let mut cache = make_cache(1024);
526        let removed = cache.remove_partition("default");
527        assert!(!removed);
528        assert!(cache.has_partition("default"));
529    }
530
531    // 8. Partition stats track entries and bytes correctly
532    #[test]
533    fn test_partition_stats() {
534        let mut cache = make_cache(10_000);
535        cache.add_partition("analytics", 5000);
536        for i in 0u8..10 {
537            cache.put("analytics", format!("e-{i}"), entry_sized(&[i; 50], 50));
538        }
539        let stats = cache
540            .partition_stats("analytics")
541            .expect("stats should exist");
542        assert_eq!(stats.entry_count, 10);
543        assert_eq!(stats.used_bytes, 500);
544    }
545
546    // 9. LRU eviction respects partition boundary
547    #[test]
548    fn test_lru_eviction_within_partition() {
549        let mut cache = make_cache(10_000);
550        cache.add_partition("p1", 300); // fits 3 × 100-byte entries
551        cache.put("p1", "a".to_string(), entry_sized(b"A", 100));
552        cache.put("p1", "b".to_string(), entry_sized(b"B", 100));
553        cache.put("p1", "c".to_string(), entry_sized(b"C", 100));
554        // Access "a" to make "b" the LRU.
555        cache.get("p1", "a");
556        // Insert "d" → should evict "b".
557        cache.put("p1", "d".to_string(), entry_sized(b"D", 100));
558        assert!(cache.get("p1", "b").is_none(), "b should be evicted");
559        assert!(cache.get("p1", "a").is_some());
560        assert!(cache.get("p1", "d").is_some());
561    }
562
563    // 10. total_entries sums across partitions
564    #[test]
565    fn test_total_entries() {
566        let mut cache = make_cache(10_000);
567        cache.add_partition("x", 1000);
568        cache.add_partition("y", 1000);
569        cache.put("default", "d1".to_string(), entry(b"d"));
570        cache.put("x", "x1".to_string(), entry(b"x"));
571        cache.put("x", "x2".to_string(), entry(b"x"));
572        cache.put("y", "y1".to_string(), entry(b"y"));
573        assert_eq!(cache.total_entries(), 4);
574    }
575
576    // 11. purge_all_expired removes TTL-expired entries
577    #[test]
578    fn test_purge_all_expired() {
579        let mut cache = make_cache(10_000);
580        cache.add_partition("ttl_test", 1000);
581        let expired_entry =
582            CacheEntry::with_meta(b"expire".to_vec(), 6, Some(Duration::from_millis(0)), 0);
583        let live_entry =
584            CacheEntry::with_meta(b"live".to_vec(), 4, Some(Duration::from_secs(3600)), 0);
585        cache.put("ttl_test", "expired".to_string(), expired_entry);
586        cache.put("ttl_test", "live".to_string(), live_entry);
587        std::thread::sleep(Duration::from_millis(2));
588        let removed = cache.purge_all_expired();
589        assert_eq!(removed, 1);
590        assert!(cache.get("ttl_test", "expired").is_none());
591        assert!(cache.get("ttl_test", "live").is_some());
592    }
593
594    // 12. partition_stats utilisation calculation
595    #[test]
596    fn test_partition_utilisation() {
597        let mut cache = make_cache(10_000);
598        cache.add_partition("util", 1000);
599        for i in 0u8..5 {
600            cache.put("util", format!("k{i}"), entry_sized(&[i; 100], 100));
601        }
602        let stats = cache.partition_stats("util").expect("should exist");
603        let util = stats.utilisation();
604        assert!(
605            (util - 0.5).abs() < 1e-9,
606            "expected 50% utilisation, got {util}"
607        );
608    }
609}