1use std::collections::{HashMap, VecDeque};
21use std::path::PathBuf;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum EvictionPolicy {
28 Lru,
30 Lfu,
33 Fifo,
35 Random,
37 TinyLfu,
39}
40
41#[derive(Debug, Clone)]
43pub struct TierConfig {
44 pub name: String,
46 pub capacity_bytes: usize,
48 pub access_latency_us: u64,
50 pub eviction_policy: EvictionPolicy,
52 pub disk_path: Option<PathBuf>,
58 pub promotion_threshold: u64,
65 pub compress: bool,
71}
72
73impl TierConfig {
74 pub fn memory(name: impl Into<String>, capacity_bytes: usize) -> Self {
76 Self {
77 name: name.into(),
78 capacity_bytes,
79 access_latency_us: 1,
80 eviction_policy: EvictionPolicy::Lru,
81 disk_path: None,
82 promotion_threshold: 0,
83 compress: false,
84 }
85 }
86
87 pub fn disk(name: impl Into<String>, capacity_bytes: usize, path: impl Into<PathBuf>) -> Self {
89 Self {
90 name: name.into(),
91 capacity_bytes,
92 access_latency_us: 1_000,
93 eviction_policy: EvictionPolicy::Lru,
94 disk_path: Some(path.into()),
95 promotion_threshold: 1,
96 compress: true,
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
105pub struct TierStats {
106 pub name: String,
108 pub hits: u64,
110 pub size_used_bytes: usize,
112 pub entry_count: usize,
114 pub promotions: u64,
116 pub compressions: u64,
118}
119
120#[derive(Debug, Clone)]
122pub struct TieredCacheStats {
123 pub total_hits: u64,
125 pub total_misses: u64,
127 pub hit_rate: f64,
129 pub tier_stats: Vec<TierStats>,
131}
132
133fn rle_compress(data: &[u8]) -> Vec<u8> {
140 if data.is_empty() {
141 return Vec::new();
142 }
143 let mut out = Vec::with_capacity(data.len());
144 let mut i = 0;
145 while i < data.len() {
146 let byte = data[i];
147 let mut run = 1usize;
148 while i + run < data.len() && data[i + run] == byte && run < 255 {
149 run += 1;
150 }
151 out.push(run as u8);
152 out.push(byte);
153 i += run;
154 }
155 out
156}
157
158fn rle_decompress(data: &[u8]) -> Vec<u8> {
160 if data.is_empty() {
161 return Vec::new();
162 }
163 let mut out = Vec::with_capacity(data.len() * 2);
164 let mut i = 0;
165 while i + 1 < data.len() {
166 let count = data[i] as usize;
167 let byte = data[i + 1];
168 for _ in 0..count {
169 out.push(byte);
170 }
171 i += 2;
172 }
173 out
174}
175
176struct CacheTier {
179 config: TierConfig,
180 data: HashMap<String, (Vec<u8>, u64, u64)>,
186 size_used: usize,
187 fifo_order: VecDeque<String>,
189 hits: u64,
191 promotions: u64,
193 compressions: u64,
195 tick: u64,
197 rng_state: u32,
199}
200
201impl CacheTier {
202 fn new(config: TierConfig) -> Self {
203 if let Some(ref path) = config.disk_path {
205 let _ = std::fs::create_dir_all(path);
206 }
207 Self {
208 config,
209 data: HashMap::new(),
210 size_used: 0,
211 fifo_order: VecDeque::new(),
212 hits: 0,
213 promotions: 0,
214 compressions: 0,
215 tick: 1,
216 rng_state: 0xDEAD_BEEF,
217 }
218 }
219
220 fn xorshift32(&mut self) -> u32 {
222 let mut x = self.rng_state;
223 x ^= x << 13;
224 x ^= x >> 17;
225 x ^= x << 5;
226 self.rng_state = x;
227 x
228 }
229
230 fn disk_path_for(&self, key: &str) -> Option<PathBuf> {
232 self.config.disk_path.as_ref().map(|base| {
233 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
236 for b in key.as_bytes() {
237 h ^= u64::from(*b);
238 h = h.wrapping_mul(0x0000_0100_0000_01b3);
239 }
240 base.join(format!("{h:016x}"))
241 })
242 }
243
244 fn flush_to_disk(&self, key: &str, bytes: &[u8]) {
246 if let Some(path) = self.disk_path_for(key) {
247 let _ = std::fs::write(path, bytes);
248 }
249 }
250
251 fn read_from_disk(&self, key: &str) -> Option<Vec<u8>> {
253 let path = self.disk_path_for(key)?;
254 std::fs::read(path).ok()
255 }
256
257 fn remove_from_disk(&self, key: &str) {
259 if let Some(path) = self.disk_path_for(key) {
260 let _ = std::fs::remove_file(path);
261 }
262 }
263
264 fn encode(&mut self, raw: &[u8]) -> Vec<u8> {
266 if self.config.compress {
267 self.compressions += 1;
268 rle_compress(raw)
269 } else {
270 raw.to_vec()
271 }
272 }
273
274 fn decode(&self, stored: &[u8]) -> Vec<u8> {
276 if self.config.compress {
277 rle_decompress(stored)
278 } else {
279 stored.to_vec()
280 }
281 }
282
283 fn get(&mut self, key: &str) -> Option<Vec<u8>> {
284 let tick = self.tick;
285 self.tick += 1;
286
287 if self.config.disk_path.is_some() {
290 if let Some(entry) = self.data.get_mut(key) {
291 entry.1 = tick;
292 entry.2 += 1;
293 self.hits += 1;
294 return self.read_from_disk(key).map(|stored| self.decode(&stored));
296 }
297 return None;
298 }
299
300 if let Some(entry) = self.data.get_mut(key) {
301 entry.1 = tick; entry.2 += 1; self.hits += 1;
304 let raw = entry.0.clone();
305 let decoded = self.decode(&raw);
306 Some(decoded)
307 } else {
308 None
309 }
310 }
311
312 fn put(&mut self, key: String, data: Vec<u8>) {
315 let encoded = self.encode(&data);
316 let stored_len = encoded.len();
317
318 if stored_len > self.config.capacity_bytes {
320 return;
321 }
322
323 while self.size_used + stored_len > self.config.capacity_bytes {
325 if self.evict_one().is_none() {
326 break;
327 }
328 }
329
330 let tick = self.tick;
331 self.tick += 1;
332 self.size_used += stored_len;
333 self.fifo_order.push_back(key.clone());
334
335 if self.config.disk_path.is_some() {
338 self.flush_to_disk(&key, &encoded);
339 self.data.insert(key, (Vec::new(), tick, 1));
340 } else {
341 self.data.insert(key, (encoded, tick, 1));
342 }
343 }
344
345 fn frequency(&self, key: &str) -> u64 {
347 self.data.get(key).map(|(_, _, f)| *f).unwrap_or(0)
348 }
349
350 fn remove(&mut self, key: &str) -> bool {
352 if let Some((data, _, _)) = self.data.remove(key) {
353 let stored_len = if self.config.disk_path.is_some() {
354 data.len()
357 } else {
358 data.len()
359 };
360 self.size_used = self.size_used.saturating_sub(stored_len);
361 self.fifo_order.retain(|k| k != key);
362 if self.config.disk_path.is_some() {
363 self.remove_from_disk(key);
364 }
365 true
366 } else {
367 false
368 }
369 }
370
371 fn evict_one(&mut self) -> Option<(String, Vec<u8>)> {
373 if self.data.is_empty() {
374 return None;
375 }
376 let victim_key = match &self.config.eviction_policy {
377 EvictionPolicy::Lru => self.pick_lru(),
378 EvictionPolicy::Lfu => self.pick_lfu(),
379 EvictionPolicy::Fifo => self.pick_fifo(),
380 EvictionPolicy::Random => self.pick_random(),
381 EvictionPolicy::TinyLfu => self.pick_tiny_lfu(),
382 }?;
383
384 let (stored, _, _) = self.data.remove(&victim_key)?;
385 let data = if self.config.disk_path.is_some() {
386 let from_disk = self.read_from_disk(&victim_key).unwrap_or_default();
388 self.remove_from_disk(&victim_key);
389 self.decode(&from_disk)
390 } else {
391 self.decode(&stored)
392 };
393 let size_removed = if stored.is_empty() && self.config.disk_path.is_some() {
394 data.len()
397 } else {
398 stored.len()
399 };
400 self.size_used = self.size_used.saturating_sub(size_removed);
401 self.fifo_order.retain(|k| *k != victim_key);
402 Some((victim_key, data))
403 }
404
405 fn pick_lru(&self) -> Option<String> {
406 self.data
407 .iter()
408 .min_by_key(|(_, (_, last_access, _))| *last_access)
409 .map(|(k, _)| k.clone())
410 }
411
412 fn pick_lfu(&self) -> Option<String> {
413 self.data
414 .iter()
415 .min_by(|(_, (_, la_a, freq_a)), (_, (_, la_b, freq_b))| {
416 freq_a.cmp(freq_b).then(la_a.cmp(la_b))
417 })
418 .map(|(k, _)| k.clone())
419 }
420
421 fn pick_fifo(&self) -> Option<String> {
422 self.fifo_order.front().cloned()
423 }
424
425 fn pick_random(&mut self) -> Option<String> {
426 if self.data.is_empty() {
427 return None;
428 }
429 let count = self.data.len();
430 let rnd = self.xorshift32() as usize % count;
431 self.data.keys().nth(rnd).cloned()
432 }
433
434 fn pick_tiny_lfu(&mut self) -> Option<String> {
436 let candidate = self
437 .data
438 .iter()
439 .min_by(|(_, (_, la_a, freq_a)), (_, (_, la_b, freq_b))| {
440 let sketch_a = freq_a % 4;
441 let sketch_b = freq_b % 4;
442 sketch_a.cmp(&sketch_b).then(la_a.cmp(la_b))
443 })
444 .map(|(k, v)| (k.clone(), v.2))?;
445
446 let (key, freq) = candidate;
447 if freq >= 2 {
448 let rnd = self.xorshift32() as u64;
449 if rnd % freq >= freq / 2 {
450 return self.pick_lfu();
451 }
452 }
453 Some(key)
454 }
455}
456
457impl Drop for CacheTier {
458 fn drop(&mut self) {
459 if let Some(ref base) = self.config.disk_path {
463 for key in self.data.keys() {
464 if let Some(path) = self.disk_path_for(key) {
465 let _ = std::fs::remove_file(path);
466 }
467 }
468 let _ = std::fs::remove_dir(base);
470 }
471 }
472}
473
474pub struct TieredCache {
483 tiers: Vec<CacheTier>,
484 total_hits: u64,
485 total_misses: u64,
486 tier_hits: Vec<u64>,
488}
489
490impl TieredCache {
491 pub fn new(tiers: Vec<TierConfig>) -> Self {
494 let n = tiers.len();
495 Self {
496 tiers: tiers.into_iter().map(CacheTier::new).collect(),
497 total_hits: 0,
498 total_misses: 0,
499 tier_hits: vec![0; n],
500 }
501 }
502
503 pub fn get(&mut self, key: &str) -> Option<Vec<u8>> {
508 for tier_idx in 0..self.tiers.len() {
509 if let Some(data) = self.tiers[tier_idx].get(key) {
510 self.total_hits += 1;
511 self.tier_hits[tier_idx] += 1;
512 if tier_idx > 0 {
514 let freq = self.tiers[tier_idx].frequency(key);
515 let threshold = self.tiers[tier_idx].config.promotion_threshold;
516 if freq >= threshold {
517 self.tiers[tier_idx].promotions += 1;
518 let key_owned = key.to_string();
519 self.tiers[tier_idx - 1].put(key_owned, data.clone());
520 }
521 }
522 return Some(data);
523 }
524 }
525 self.total_misses += 1;
526 None
527 }
528
529 pub fn put(&mut self, key: &str, data: Vec<u8>) {
531 self.tiers[0].put(key.to_string(), data);
532 }
533
534 pub fn put_at_tier(&mut self, tier_idx: usize, key: &str, data: Vec<u8>) {
538 if let Some(tier) = self.tiers.get_mut(tier_idx) {
539 tier.put(key.to_string(), data);
540 }
541 }
542
543 pub fn evict_tier(&mut self, tier_idx: usize) -> Option<(String, Vec<u8>)> {
546 self.tiers.get_mut(tier_idx)?.evict_one()
547 }
548
549 pub fn stats(&self) -> TieredCacheStats {
551 let total = self.total_hits + self.total_misses;
552 let hit_rate = if total == 0 {
553 0.0
554 } else {
555 self.total_hits as f64 / total as f64
556 };
557 let tier_stats = self
558 .tiers
559 .iter()
560 .enumerate()
561 .map(|(i, t)| TierStats {
562 name: t.config.name.clone(),
563 hits: self.tier_hits[i],
564 size_used_bytes: t.size_used,
565 entry_count: t.data.len(),
566 promotions: t.promotions,
567 compressions: t.compressions,
568 })
569 .collect();
570 TieredCacheStats {
571 total_hits: self.total_hits,
572 total_misses: self.total_misses,
573 hit_rate,
574 tier_stats,
575 }
576 }
577
578 pub fn warmup(&mut self, entries: &[(String, Vec<u8>)]) {
580 for (key, data) in entries {
581 let data_len = data.len();
582 if self.tiers[0].size_used + data_len <= self.tiers[0].config.capacity_bytes {
583 let tick = self.tiers[0].tick;
584 self.tiers[0].tick += 1;
585 self.tiers[0].size_used += data_len;
586 self.tiers[0].fifo_order.push_back(key.clone());
587 self.tiers[0]
588 .data
589 .insert(key.clone(), (data.clone(), tick, 1));
590 }
591 }
592 }
593
594 pub fn invalidate(&mut self, key: &str) -> bool {
597 let mut found = false;
598 for tier in &mut self.tiers {
599 if tier.remove(key) {
600 found = true;
601 }
602 }
603 found
604 }
605
606 pub fn tier_count(&self) -> usize {
608 self.tiers.len()
609 }
610}
611
612#[cfg(test)]
615mod tests {
616 use super::*;
617
618 fn two_tier_cache(l1_bytes: usize, l2_bytes: usize) -> TieredCache {
619 TieredCache::new(vec![
620 TierConfig {
621 name: "L1".into(),
622 capacity_bytes: l1_bytes,
623 access_latency_us: 1,
624 eviction_policy: EvictionPolicy::Lru,
625 disk_path: None,
626 promotion_threshold: 0,
627 compress: false,
628 },
629 TierConfig {
630 name: "L2".into(),
631 capacity_bytes: l2_bytes,
632 access_latency_us: 10,
633 eviction_policy: EvictionPolicy::Lfu,
634 disk_path: None,
635 promotion_threshold: 0,
636 compress: false,
637 },
638 ])
639 }
640
641 #[test]
643 fn test_basic_put_get() {
644 let mut cache = two_tier_cache(1024, 4096);
645 cache.put("key1", b"hello".to_vec());
646 assert_eq!(cache.get("key1"), Some(b"hello".to_vec()));
647 }
648
649 #[test]
651 fn test_miss() {
652 let mut cache = two_tier_cache(1024, 4096);
653 assert_eq!(cache.get("absent"), None);
654 assert_eq!(cache.stats().total_misses, 1);
655 }
656
657 #[test]
659 fn test_hit_rate() {
660 let mut cache = two_tier_cache(1024, 4096);
661 cache.put("k", b"v".to_vec());
662 cache.get("k"); cache.get("nope"); let s = cache.stats();
665 assert!((s.hit_rate - 0.5).abs() < 1e-9);
666 }
667
668 #[test]
670 fn test_l1_lru_eviction() {
671 let mut cache = two_tier_cache(3, 1024);
672 cache.put("a", b"1".to_vec());
673 cache.put("b", b"2".to_vec());
674 cache.put("c", b"3".to_vec());
675 cache.get("a");
676 cache.put("d", b"4".to_vec());
677 assert_eq!(cache.get("b"), None);
678 assert!(cache.get("a").is_some());
679 }
680
681 #[test]
683 fn test_invalidate() {
684 let mut cache = two_tier_cache(1024, 4096);
685 cache.put("x", b"data".to_vec());
686 assert!(cache.invalidate("x"));
687 assert_eq!(cache.get("x"), None);
688 }
689
690 #[test]
692 fn test_invalidate_absent() {
693 let mut cache = two_tier_cache(1024, 4096);
694 assert!(!cache.invalidate("ghost"));
695 }
696
697 #[test]
699 fn test_warmup() {
700 let mut cache = two_tier_cache(1024, 4096);
701 let entries = vec![
702 ("alpha".to_string(), b"AAA".to_vec()),
703 ("beta".to_string(), b"BBB".to_vec()),
704 ];
705 cache.warmup(&entries);
706 assert_eq!(cache.get("alpha"), Some(b"AAA".to_vec()));
707 assert_eq!(cache.get("beta"), Some(b"BBB".to_vec()));
708 }
709
710 #[test]
712 fn test_stats_entry_count() {
713 let mut cache = two_tier_cache(1024, 4096);
714 cache.put("a", b"1".to_vec());
715 cache.put("b", b"2".to_vec());
716 assert_eq!(cache.stats().tier_stats[0].entry_count, 2);
717 }
718
719 #[test]
721 fn test_fifo_eviction() {
722 let mut cache = TieredCache::new(vec![TierConfig {
723 name: "fifo".into(),
724 capacity_bytes: 3,
725 access_latency_us: 0,
726 eviction_policy: EvictionPolicy::Fifo,
727 disk_path: None,
728 promotion_threshold: 0,
729 compress: false,
730 }]);
731 cache.put("first", b"1".to_vec());
732 cache.put("second", b"2".to_vec());
733 cache.put("third", b"3".to_vec());
734 cache.put("fourth", b"4".to_vec());
735 assert_eq!(cache.get("first"), None);
736 }
737
738 #[test]
740 fn test_random_eviction_no_panic() {
741 let mut cache = TieredCache::new(vec![TierConfig {
742 name: "rand".into(),
743 capacity_bytes: 5,
744 access_latency_us: 0,
745 eviction_policy: EvictionPolicy::Random,
746 disk_path: None,
747 promotion_threshold: 0,
748 compress: false,
749 }]);
750 for i in 0..20u8 {
751 cache.put(&i.to_string(), vec![i]);
752 }
753 assert!(cache.stats().tier_stats[0].entry_count <= 5);
754 }
755
756 #[test]
758 fn test_tiny_lfu_eviction_no_panic() {
759 let mut cache = TieredCache::new(vec![TierConfig {
760 name: "tiny".into(),
761 capacity_bytes: 5,
762 access_latency_us: 0,
763 eviction_policy: EvictionPolicy::TinyLfu,
764 disk_path: None,
765 promotion_threshold: 0,
766 compress: false,
767 }]);
768 for i in 0..20u8 {
769 cache.put(&i.to_string(), vec![i]);
770 }
771 assert!(cache.stats().tier_stats[0].entry_count <= 5);
772 }
773
774 #[test]
776 fn test_evict_tier() {
777 let mut cache = two_tier_cache(1024, 4096);
778 cache.put("a", b"data".to_vec());
779 let evicted = cache.evict_tier(0);
780 assert!(evicted.is_some());
781 let (k, _) = evicted.expect("eviction should succeed");
782 assert_eq!(k, "a");
783 }
784
785 #[test]
787 fn test_evict_empty_tier() {
788 let mut cache = two_tier_cache(1024, 4096);
789 assert!(cache.evict_tier(0).is_none());
790 }
791
792 #[test]
794 fn test_size_used_bytes() {
795 let mut cache = two_tier_cache(1024, 4096);
796 cache.put("a", vec![0u8; 100]);
797 cache.put("b", vec![0u8; 200]);
798 assert_eq!(cache.stats().tier_stats[0].size_used_bytes, 300);
799 }
800
801 #[test]
803 fn test_tier_hit_counters() {
804 let mut cache = two_tier_cache(1024, 4096);
805 cache.put("k", b"v".to_vec());
806 cache.get("k");
807 cache.get("k");
808 let s = cache.stats();
809 assert_eq!(s.tier_stats[0].hits, 2);
810 }
811
812 #[test]
814 fn test_compression_roundtrip() {
815 let mut cache = TieredCache::new(vec![TierConfig {
816 name: "compressed".into(),
817 capacity_bytes: 1024 * 1024,
818 access_latency_us: 10,
819 eviction_policy: EvictionPolicy::Lru,
820 disk_path: None,
821 promotion_threshold: 0,
822 compress: true,
823 }]);
824 let data = vec![0xABu8; 512];
826 cache.put("k", data.clone());
827 let retrieved = cache.get("k").expect("should be present");
828 assert_eq!(
829 retrieved, data,
830 "compressed entry should decompress correctly"
831 );
832 }
833
834 #[test]
836 fn test_compression_stats() {
837 let mut cache = TieredCache::new(vec![TierConfig {
838 name: "c".into(),
839 capacity_bytes: 1024 * 1024,
840 access_latency_us: 0,
841 eviction_policy: EvictionPolicy::Lru,
842 disk_path: None,
843 promotion_threshold: 0,
844 compress: true,
845 }]);
846 cache.put("a", vec![1u8; 64]);
847 cache.put("b", vec![2u8; 64]);
848 let s = cache.stats();
849 assert_eq!(
850 s.tier_stats[0].compressions, 2,
851 "two puts should compress twice"
852 );
853 }
854
855 #[test]
857 fn test_adaptive_promotion_threshold() {
858 let mut cache = TieredCache::new(vec![
861 TierConfig {
862 name: "L1".into(),
863 capacity_bytes: 10,
864 access_latency_us: 1,
865 eviction_policy: EvictionPolicy::Lru,
866 disk_path: None,
867 promotion_threshold: 0,
868 compress: false,
869 },
870 TierConfig {
871 name: "L2".into(),
872 capacity_bytes: 1024,
873 access_latency_us: 10,
874 eviction_policy: EvictionPolicy::Lru,
875 disk_path: None,
876 promotion_threshold: 3,
877 compress: false,
878 },
879 ]);
880
881 cache.put_at_tier(1, "hot", b"v".to_vec());
883
884 cache.get("hot"); cache.get("hot"); cache.get("hot"); let s = cache.stats();
893 assert!(
894 s.tier_stats[1].promotions >= 1,
895 "entry should have been promoted after reaching threshold"
896 );
897 }
898
899 #[test]
901 fn test_disk_tier_basic() {
902 let dir = std::env::temp_dir().join(format!(
903 "oximedia_tiered_disk_{}",
904 std::time::SystemTime::now()
905 .duration_since(std::time::UNIX_EPOCH)
906 .map(|d| d.subsec_nanos())
907 .unwrap_or(42)
908 ));
909 let mut cache = TieredCache::new(vec![TierConfig::disk("disk", 1024 * 1024, &dir)]);
910 cache.put("segment-001", b"media data here".to_vec());
911 let got = cache.get("segment-001");
912 assert_eq!(
913 got,
914 Some(b"media data here".to_vec()),
915 "disk tier should retrieve the value correctly"
916 );
917 }
919
920 #[test]
922 fn test_tier_config_memory_helper() {
923 let cfg = TierConfig::memory("L1", 4096);
924 assert_eq!(cfg.name, "L1");
925 assert_eq!(cfg.capacity_bytes, 4096);
926 assert!(cfg.disk_path.is_none());
927 assert!(!cfg.compress);
928 }
929
930 #[test]
932 fn test_tier_count() {
933 let cache = two_tier_cache(1024, 4096);
934 assert_eq!(cache.tier_count(), 2);
935 }
936
937 #[test]
939 fn test_put_at_tier() {
940 let mut cache = two_tier_cache(1024, 4096);
941 cache.put_at_tier(1, "l2-key", b"l2-value".to_vec());
942 assert_eq!(cache.stats().tier_stats[1].entry_count, 1);
943 assert_eq!(cache.get("l2-key"), Some(b"l2-value".to_vec()));
945 }
946
947 #[test]
949 fn test_rle_roundtrip() {
950 for input in [
951 b"".as_ref(),
952 b"hello",
953 b"\x00\x00\x00\x00",
954 b"AAABBBCCC",
955 b"abcdefghij",
956 ] {
957 let compressed = rle_compress(input);
958 let decompressed = rle_decompress(&compressed);
959 assert_eq!(decompressed, input, "rle roundtrip failed for {:?}", input);
960 }
961 }
962}