Skip to main content

oxigdal_cache_advanced/
multi_tier.rs

1//! Multi-tier cache implementation
2//!
3//! Implements a three-tier caching system:
4//! - L1: In-memory cache (fastest, smallest)
5//! - L2: SSD cache (fast, medium size)
6//! - L3: Network/disk cache (slower, largest)
7//!
8//! Features:
9//! - Automatic promotion/demotion between tiers
10//! - Per-tier eviction policies
11//! - Tier usage statistics
12//! - Async operations
13
14use crate::compression::{AdaptiveCompressor, CompressedData, CompressionCodec, DataType};
15use crate::error::{CacheError, Result};
16use crate::eviction::{EvictionPolicy, LruEviction};
17use crate::{CacheConfig, CacheStats};
18use async_trait::async_trait;
19use bytes::Bytes;
20use dashmap::DashMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::fs;
25use tokio::io::{AsyncReadExt, AsyncWriteExt};
26use tokio::sync::RwLock;
27
28/// Cache key type
29pub type CacheKey = String;
30
31/// Cache value with metadata
32#[derive(Debug, Clone)]
33pub struct CacheValue {
34    /// Actual data
35    pub data: Bytes,
36    /// Data type hint for compression
37    pub data_type: DataType,
38    /// Creation timestamp
39    pub created_at: chrono::DateTime<chrono::Utc>,
40    /// Last access timestamp
41    pub last_accessed: chrono::DateTime<chrono::Utc>,
42    /// Access count
43    pub access_count: u64,
44    /// Size in bytes
45    pub size: usize,
46}
47
48impl CacheValue {
49    /// Create new cache value
50    pub fn new(data: Bytes, data_type: DataType) -> Self {
51        let now = chrono::Utc::now();
52        let size = data.len();
53
54        Self {
55            data,
56            data_type,
57            created_at: now,
58            last_accessed: now,
59            access_count: 0,
60            size,
61        }
62    }
63
64    /// Record an access
65    pub fn record_access(&mut self) {
66        self.last_accessed = chrono::Utc::now();
67        self.access_count += 1;
68    }
69
70    /// Get age in seconds
71    pub fn age_seconds(&self) -> i64 {
72        let now = chrono::Utc::now();
73        (now - self.created_at).num_seconds()
74    }
75
76    /// Get time since last access in seconds
77    pub fn idle_seconds(&self) -> i64 {
78        let now = chrono::Utc::now();
79        (now - self.last_accessed).num_seconds()
80    }
81}
82
83/// Cache tier trait
84#[async_trait]
85pub trait CacheTier: Send + Sync {
86    /// Get value from this tier
87    async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>>;
88
89    /// Put value into this tier
90    async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()>;
91
92    /// Remove value from this tier
93    async fn remove(&self, key: &CacheKey) -> Result<bool>;
94
95    /// Check if key exists
96    async fn contains(&self, key: &CacheKey) -> bool;
97
98    /// Get tier statistics
99    async fn stats(&self) -> CacheStats;
100
101    /// Clear the tier
102    async fn clear(&self) -> Result<()>;
103
104    /// Get tier name
105    fn name(&self) -> &str;
106
107    /// Get tier capacity in bytes
108    fn capacity(&self) -> usize;
109
110    /// Get current size in bytes
111    async fn current_size(&self) -> usize;
112}
113
114/// L1 in-memory cache tier
115pub struct L1MemoryTier {
116    /// Cache storage
117    cache: Arc<DashMap<CacheKey, CacheValue>>,
118    /// Maximum size in bytes
119    max_size: usize,
120    /// Current size in bytes
121    current_size: Arc<RwLock<usize>>,
122    /// Eviction policy
123    eviction: Arc<RwLock<Box<dyn EvictionPolicy<CacheKey>>>>,
124    /// Statistics
125    stats: Arc<RwLock<CacheStats>>,
126}
127
128impl L1MemoryTier {
129    /// Create new L1 memory tier
130    pub fn new(max_size: usize) -> Self {
131        Self {
132            cache: Arc::new(DashMap::new()),
133            max_size,
134            current_size: Arc::new(RwLock::new(0)),
135            eviction: Arc::new(RwLock::new(Box::new(LruEviction::new()))),
136            stats: Arc::new(RwLock::new(CacheStats::new())),
137        }
138    }
139
140    /// Evict items until we have enough space
141    async fn make_space(&self, needed: usize) -> Result<()> {
142        let mut current = self.current_size.write().await;
143
144        while *current + needed > self.max_size {
145            let mut eviction = self.eviction.write().await;
146
147            if let Some(victim_key) = eviction.select_victim() {
148                if let Some((_, victim_value)) = self.cache.remove(&victim_key) {
149                    *current = current.saturating_sub(victim_value.size);
150                    eviction.on_remove(&victim_key);
151
152                    let mut stats = self.stats.write().await;
153                    stats.evictions += 1;
154                    stats.item_count = stats.item_count.saturating_sub(1);
155                } else {
156                    // Victim not in cache, try another
157                    continue;
158                }
159            } else {
160                // No victims available
161                return Err(CacheError::CacheFull("L1 cache full".to_string()));
162            }
163        }
164
165        Ok(())
166    }
167}
168
169#[async_trait]
170impl CacheTier for L1MemoryTier {
171    async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
172        let mut stats = self.stats.write().await;
173
174        if let Some(mut entry) = self.cache.get_mut(key) {
175            entry.record_access();
176            stats.hits += 1;
177
178            let mut eviction = self.eviction.write().await;
179            eviction.on_access(key);
180
181            Ok(Some(entry.clone()))
182        } else {
183            stats.misses += 1;
184            Ok(None)
185        }
186    }
187
188    async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
189        let size = value.size;
190
191        // Make space if needed
192        self.make_space(size).await?;
193
194        // Insert into cache
195        self.cache.insert(key.clone(), value);
196
197        // Update size
198        let mut current_size = self.current_size.write().await;
199        *current_size += size;
200
201        // Update eviction policy
202        let mut eviction = self.eviction.write().await;
203        eviction.on_insert(key.clone(), size);
204
205        // Update stats
206        let mut stats = self.stats.write().await;
207        stats.bytes_stored = *current_size as u64;
208        stats.item_count += 1;
209
210        Ok(())
211    }
212
213    async fn remove(&self, key: &CacheKey) -> Result<bool> {
214        if let Some((_, value)) = self.cache.remove(key) {
215            let mut current_size = self.current_size.write().await;
216            *current_size = current_size.saturating_sub(value.size);
217
218            let mut eviction = self.eviction.write().await;
219            eviction.on_remove(key);
220
221            let mut stats = self.stats.write().await;
222            stats.bytes_stored = *current_size as u64;
223            stats.item_count = stats.item_count.saturating_sub(1);
224
225            Ok(true)
226        } else {
227            Ok(false)
228        }
229    }
230
231    async fn contains(&self, key: &CacheKey) -> bool {
232        self.cache.contains_key(key)
233    }
234
235    async fn stats(&self) -> CacheStats {
236        self.stats.read().await.clone()
237    }
238
239    async fn clear(&self) -> Result<()> {
240        self.cache.clear();
241
242        let mut current_size = self.current_size.write().await;
243        *current_size = 0;
244
245        let mut eviction = self.eviction.write().await;
246        eviction.clear();
247
248        let mut stats = self.stats.write().await;
249        *stats = CacheStats::new();
250
251        Ok(())
252    }
253
254    fn name(&self) -> &str {
255        "L1-Memory"
256    }
257
258    fn capacity(&self) -> usize {
259        self.max_size
260    }
261
262    async fn current_size(&self) -> usize {
263        *self.current_size.read().await
264    }
265}
266
267/// L2 SSD/disk cache tier
268pub struct L2DiskTier {
269    /// Cache directory
270    cache_dir: PathBuf,
271    /// Maximum size in bytes
272    max_size: usize,
273    /// Index of cached files
274    index: Arc<DashMap<CacheKey, CacheValue>>,
275    /// Current size in bytes
276    current_size: Arc<RwLock<usize>>,
277    /// Eviction policy
278    eviction: Arc<RwLock<Box<dyn EvictionPolicy<CacheKey>>>>,
279    /// Compressor
280    compressor: Arc<RwLock<AdaptiveCompressor>>,
281    /// Statistics
282    stats: Arc<RwLock<CacheStats>>,
283}
284
285impl L2DiskTier {
286    /// Create new L2 disk tier
287    pub async fn new(cache_dir: PathBuf, max_size: usize) -> Result<Self> {
288        // Create cache directory
289        fs::create_dir_all(&cache_dir).await?;
290
291        let tier = Self {
292            cache_dir,
293            max_size,
294            index: Arc::new(DashMap::new()),
295            current_size: Arc::new(RwLock::new(0)),
296            eviction: Arc::new(RwLock::new(Box::new(LruEviction::new()))),
297            compressor: Arc::new(RwLock::new(AdaptiveCompressor::new())),
298            stats: Arc::new(RwLock::new(CacheStats::new())),
299        };
300
301        // Load existing cache files
302        tier.load_index().await?;
303
304        Ok(tier)
305    }
306
307    /// Load cache index from disk
308    async fn load_index(&self) -> Result<()> {
309        let mut entries = fs::read_dir(&self.cache_dir).await?;
310        let mut total_size = 0;
311
312        while let Some(entry) = entries.next_entry().await? {
313            if let Ok(metadata) = entry.metadata().await {
314                if metadata.is_file() {
315                    let file_size = metadata.len() as usize;
316                    total_size += file_size;
317
318                    // Extract key from filename (remove .cache extension)
319                    if let Some(file_name) = entry.file_name().to_str() {
320                        if file_name.ends_with(".cache") {
321                            let key = file_name.trim_end_matches(".cache").to_string();
322
323                            // Create minimal cache value for index
324                            let value = CacheValue {
325                                data: Bytes::new(),
326                                data_type: DataType::Binary,
327                                created_at: chrono::Utc::now(),
328                                last_accessed: chrono::Utc::now(),
329                                access_count: 0,
330                                size: file_size,
331                            };
332
333                            self.index.insert(key.clone(), value);
334
335                            let mut eviction = self.eviction.write().await;
336                            eviction.on_insert(key, file_size);
337                        }
338                    }
339                }
340            }
341        }
342
343        let mut current_size = self.current_size.write().await;
344        *current_size = total_size;
345
346        let mut stats = self.stats.write().await;
347        stats.bytes_stored = total_size as u64;
348        stats.item_count = self.index.len();
349
350        Ok(())
351    }
352
353    /// Get file path for key
354    fn get_file_path(&self, key: &CacheKey) -> PathBuf {
355        self.cache_dir.join(format!("{}.cache", key))
356    }
357
358    /// Evict items until we have enough space
359    async fn make_space(&self, needed: usize) -> Result<()> {
360        let mut current = self.current_size.write().await;
361
362        while *current + needed > self.max_size {
363            let mut eviction = self.eviction.write().await;
364
365            if let Some(victim_key) = eviction.select_victim() {
366                let file_path = self.get_file_path(&victim_key);
367
368                if let Some((_, victim_value)) = self.index.remove(&victim_key) {
369                    // Delete file
370                    let _ = fs::remove_file(file_path).await;
371
372                    *current = current.saturating_sub(victim_value.size);
373                    eviction.on_remove(&victim_key);
374
375                    let mut stats = self.stats.write().await;
376                    stats.evictions += 1;
377                    stats.item_count = stats.item_count.saturating_sub(1);
378                } else {
379                    continue;
380                }
381            } else {
382                return Err(CacheError::CacheFull("L2 cache full".to_string()));
383            }
384        }
385
386        Ok(())
387    }
388}
389
390#[async_trait]
391impl CacheTier for L2DiskTier {
392    async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
393        let mut stats = self.stats.write().await;
394
395        if let Some(mut index_entry) = self.index.get_mut(key) {
396            let file_path = self.get_file_path(key);
397
398            // Read from disk
399            let mut file = fs::File::open(file_path).await?;
400            let mut compressed_bytes = Vec::new();
401            file.read_to_end(&mut compressed_bytes).await?;
402
403            // Deserialize compressed data
404            let compressed: CompressedData = serde_json::from_slice(&compressed_bytes)?;
405
406            // Decompress
407            let mut compressor = self.compressor.write().await;
408            let data = compressed.decompress(&mut compressor)?;
409
410            index_entry.record_access();
411            stats.hits += 1;
412
413            let mut eviction = self.eviction.write().await;
414            eviction.on_access(key);
415
416            Ok(Some(CacheValue {
417                data,
418                data_type: index_entry.data_type,
419                created_at: index_entry.created_at,
420                last_accessed: index_entry.last_accessed,
421                access_count: index_entry.access_count,
422                size: index_entry.size,
423            }))
424        } else {
425            stats.misses += 1;
426            Ok(None)
427        }
428    }
429
430    async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
431        // Compress data
432        let mut compressor = self.compressor.write().await;
433        let codec = compressor.select_codec(value.data_type);
434        let compressed_data = compressor.compress(&value.data, codec, value.data_type)?;
435
436        // If data was too small to compress, use CompressionCodec::None
437        let actual_codec = if compressed_data.len() == value.data.len() && value.data.len() < 1024 {
438            CompressionCodec::None
439        } else {
440            codec
441        };
442        drop(compressor);
443
444        let compressed =
445            CompressedData::new(compressed_data.to_vec(), actual_codec, value.data.len());
446
447        // Serialize
448        let serialized = serde_json::to_vec(&compressed)?;
449        let file_size = serialized.len();
450
451        // Make space
452        self.make_space(file_size).await?;
453
454        // Write to disk
455        let file_path = self.get_file_path(&key);
456        let mut file = fs::File::create(file_path).await?;
457        file.write_all(&serialized).await?;
458        file.flush().await?;
459
460        // Update index
461        let index_value = CacheValue {
462            data: Bytes::new(),
463            data_type: value.data_type,
464            created_at: value.created_at,
465            last_accessed: value.last_accessed,
466            access_count: value.access_count,
467            size: file_size,
468        };
469
470        self.index.insert(key.clone(), index_value);
471
472        // Update size
473        let mut current_size = self.current_size.write().await;
474        *current_size += file_size;
475
476        // Update eviction policy
477        let mut eviction = self.eviction.write().await;
478        eviction.on_insert(key, file_size);
479
480        // Update stats
481        let mut stats = self.stats.write().await;
482        stats.bytes_stored = *current_size as u64;
483        stats.item_count += 1;
484
485        Ok(())
486    }
487
488    async fn remove(&self, key: &CacheKey) -> Result<bool> {
489        if let Some((_, value)) = self.index.remove(key) {
490            let file_path = self.get_file_path(key);
491            let _ = fs::remove_file(file_path).await;
492
493            let mut current_size = self.current_size.write().await;
494            *current_size = current_size.saturating_sub(value.size);
495
496            let mut eviction = self.eviction.write().await;
497            eviction.on_remove(key);
498
499            let mut stats = self.stats.write().await;
500            stats.bytes_stored = *current_size as u64;
501            stats.item_count = stats.item_count.saturating_sub(1);
502
503            Ok(true)
504        } else {
505            Ok(false)
506        }
507    }
508
509    async fn contains(&self, key: &CacheKey) -> bool {
510        self.index.contains_key(key)
511    }
512
513    async fn stats(&self) -> CacheStats {
514        self.stats.read().await.clone()
515    }
516
517    async fn clear(&self) -> Result<()> {
518        // Remove all cache files
519        let mut entries = fs::read_dir(&self.cache_dir).await?;
520
521        while let Some(entry) = entries.next_entry().await? {
522            if entry.path().extension().and_then(|s| s.to_str()) == Some("cache") {
523                let _ = fs::remove_file(entry.path()).await;
524            }
525        }
526
527        self.index.clear();
528
529        let mut current_size = self.current_size.write().await;
530        *current_size = 0;
531
532        let mut eviction = self.eviction.write().await;
533        eviction.clear();
534
535        let mut stats = self.stats.write().await;
536        *stats = CacheStats::new();
537
538        Ok(())
539    }
540
541    fn name(&self) -> &str {
542        "L2-Disk"
543    }
544
545    fn capacity(&self) -> usize {
546        self.max_size
547    }
548
549    async fn current_size(&self) -> usize {
550        *self.current_size.read().await
551    }
552}
553
554/// Multi-tier cache
555pub struct MultiTierCache {
556    /// L1 tier
557    l1: Arc<dyn CacheTier>,
558    /// L2 tier
559    l2: Option<Arc<dyn CacheTier>>,
560    /// L3 tier (optional)
561    l3: Option<Arc<dyn CacheTier>>,
562    /// Configuration
563    #[allow(dead_code)]
564    config: CacheConfig,
565    /// Global statistics
566    global_stats: Arc<RwLock<CacheStats>>,
567}
568
569impl MultiTierCache {
570    /// Create new multi-tier cache
571    pub async fn new(config: CacheConfig) -> Result<Self> {
572        let l1 = Arc::new(L1MemoryTier::new(config.l1_size)) as Arc<dyn CacheTier>;
573
574        let l2 = if config.l2_size > 0 {
575            if let Some(cache_dir) = &config.cache_dir {
576                let l2_dir = cache_dir.join("l2");
577                Some(Arc::new(L2DiskTier::new(l2_dir, config.l2_size).await?) as Arc<dyn CacheTier>)
578            } else {
579                None
580            }
581        } else {
582            None
583        };
584
585        Ok(Self {
586            l1,
587            l2,
588            l3: None, // L3 network tier can be added later
589            config,
590            global_stats: Arc::new(RwLock::new(CacheStats::new())),
591        })
592    }
593
594    /// Get value from cache (checks all tiers)
595    pub async fn get(&self, key: &CacheKey) -> Result<Option<CacheValue>> {
596        // Try L1 first
597        if let Some(value) = self.l1.get(key).await? {
598            let mut stats = self.global_stats.write().await;
599            stats.hits += 1;
600            return Ok(Some(value));
601        }
602
603        // Try L2
604        if let Some(l2) = &self.l2 {
605            if let Some(value) = l2.get(key).await? {
606                // Promote to L1
607                let _ = self.l1.put(key.clone(), value.clone()).await;
608
609                let mut stats = self.global_stats.write().await;
610                stats.hits += 1;
611                return Ok(Some(value));
612            }
613        }
614
615        // Try L3
616        if let Some(l3) = &self.l3 {
617            if let Some(value) = l3.get(key).await? {
618                // Promote to L2 and L1
619                if let Some(l2) = &self.l2 {
620                    let _ = l2.put(key.clone(), value.clone()).await;
621                }
622                let _ = self.l1.put(key.clone(), value.clone()).await;
623
624                let mut stats = self.global_stats.write().await;
625                stats.hits += 1;
626                return Ok(Some(value));
627            }
628        }
629
630        let mut stats = self.global_stats.write().await;
631        stats.misses += 1;
632        Ok(None)
633    }
634
635    /// Put value into cache (writes to all tiers)
636    pub async fn put(&self, key: CacheKey, value: CacheValue) -> Result<()> {
637        // Write to L1
638        self.l1.put(key.clone(), value.clone()).await?;
639
640        // Write to L2
641        if let Some(l2) = &self.l2 {
642            let _ = l2.put(key.clone(), value.clone()).await;
643        }
644
645        // Write to L3
646        if let Some(l3) = &self.l3 {
647            let _ = l3.put(key, value).await;
648        }
649
650        Ok(())
651    }
652
653    /// Remove value from all tiers
654    pub async fn remove(&self, key: &CacheKey) -> Result<bool> {
655        let mut removed = false;
656
657        removed |= self.l1.remove(key).await?;
658
659        if let Some(l2) = &self.l2 {
660            removed |= l2.remove(key).await?;
661        }
662
663        if let Some(l3) = &self.l3 {
664            removed |= l3.remove(key).await?;
665        }
666
667        Ok(removed)
668    }
669
670    /// Check if key exists in any tier
671    pub async fn contains(&self, key: &CacheKey) -> bool {
672        if self.l1.contains(key).await {
673            return true;
674        }
675
676        if let Some(l2) = &self.l2 {
677            if l2.contains(key).await {
678                return true;
679            }
680        }
681
682        if let Some(l3) = &self.l3 {
683            if l3.contains(key).await {
684                return true;
685            }
686        }
687
688        false
689    }
690
691    /// Get global statistics
692    pub async fn stats(&self) -> CacheStats {
693        self.global_stats.read().await.clone()
694    }
695
696    /// Get per-tier statistics
697    pub async fn tier_stats(&self) -> HashMap<String, CacheStats> {
698        let mut stats = HashMap::new();
699
700        stats.insert(self.l1.name().to_string(), self.l1.stats().await);
701
702        if let Some(l2) = &self.l2 {
703            stats.insert(l2.name().to_string(), l2.stats().await);
704        }
705
706        if let Some(l3) = &self.l3 {
707            stats.insert(l3.name().to_string(), l3.stats().await);
708        }
709
710        stats
711    }
712
713    /// Clear all tiers
714    pub async fn clear(&self) -> Result<()> {
715        self.l1.clear().await?;
716
717        if let Some(l2) = &self.l2 {
718            l2.clear().await?;
719        }
720
721        if let Some(l3) = &self.l3 {
722            l3.clear().await?;
723        }
724
725        let mut stats = self.global_stats.write().await;
726        *stats = CacheStats::new();
727
728        Ok(())
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    #[tokio::test]
737    async fn test_l1_memory_tier() {
738        let tier = L1MemoryTier::new(1024 * 1024); // 1MB
739
740        let key = "test_key".to_string();
741        let value = CacheValue::new(Bytes::from("test data"), DataType::Binary);
742
743        // Put and get
744        tier.put(key.clone(), value.clone())
745            .await
746            .expect("put failed");
747        let retrieved = tier.get(&key).await.expect("get failed");
748
749        assert!(retrieved.is_some());
750        assert_eq!(retrieved.as_ref().map(|v| &v.data), Some(&value.data));
751
752        // Stats
753        let stats = tier.stats().await;
754        assert_eq!(stats.hits, 1);
755        assert_eq!(stats.misses, 0);
756        assert_eq!(stats.item_count, 1);
757    }
758
759    #[tokio::test]
760    async fn test_l1_eviction() {
761        let tier = L1MemoryTier::new(100); // Very small cache
762
763        let value1 = CacheValue::new(Bytes::from("a".repeat(40)), DataType::Binary);
764        let value2 = CacheValue::new(Bytes::from("b".repeat(40)), DataType::Binary);
765        let value3 = CacheValue::new(Bytes::from("c".repeat(40)), DataType::Binary);
766
767        tier.put("key1".to_string(), value1)
768            .await
769            .expect("put failed");
770        tier.put("key2".to_string(), value2)
771            .await
772            .expect("put failed");
773
774        // This should trigger eviction
775        tier.put("key3".to_string(), value3)
776            .await
777            .expect("put failed");
778
779        let stats = tier.stats().await;
780        assert!(stats.evictions > 0);
781    }
782
783    #[tokio::test]
784    async fn test_multi_tier_cache() {
785        let temp_dir = std::env::temp_dir().join("oxigdal_cache_test");
786        let config = CacheConfig {
787            l1_size: 1024,
788            l2_size: 4096,
789            l3_size: 0,
790            enable_compression: true,
791            enable_prefetch: false,
792            enable_distributed: false,
793            cache_dir: Some(temp_dir.clone()),
794        };
795
796        let cache = MultiTierCache::new(config)
797            .await
798            .expect("cache creation failed");
799
800        let key = "test_multi".to_string();
801        let value = CacheValue::new(Bytes::from("multi-tier test data"), DataType::Text);
802
803        // Put
804        cache
805            .put(key.clone(), value.clone())
806            .await
807            .expect("put failed");
808
809        // Get
810        let retrieved = cache.get(&key).await.expect("get failed");
811        assert!(retrieved.is_some());
812
813        // Clean up
814        let _ = tokio::fs::remove_dir_all(temp_dir).await;
815    }
816}