Skip to main content

oxigdal_cloud/cache/
multi.rs

1//! Multi-level caching and cache warming
2
3use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7
8#[cfg(feature = "async")]
9use tokio::sync::RwLock;
10
11use bytes::Bytes;
12
13use super::backends::PersistentDiskCache;
14use super::eviction::LruTtlCache;
15use super::metadata::{CacheKey, CacheStats};
16use super::{CacheConfig, WarmingStrategy};
17use crate::error::{CacheError, CloudError, Result};
18
19/// Cache warmer for proactive cache population
20#[cfg(feature = "cache")]
21pub struct CacheWarmer<C> {
22    /// The cache to warm
23    cache: Arc<C>,
24    /// Warming strategy
25    strategy: WarmingStrategy,
26    /// Access history for pattern detection
27    access_history: Arc<RwLock<VecDeque<CacheKey>>>,
28    /// Maximum history size
29    max_history: usize,
30}
31
32#[cfg(feature = "cache")]
33impl<C> CacheWarmer<C> {
34    /// Creates a new cache warmer
35    pub fn new(cache: Arc<C>, strategy: WarmingStrategy) -> Self {
36        Self {
37            cache,
38            strategy,
39            access_history: Arc::new(RwLock::new(VecDeque::new())),
40            max_history: 1000,
41        }
42    }
43
44    /// Records an access for pattern detection
45    pub async fn record_access(&self, key: &CacheKey) {
46        let mut history = self.access_history.write().await;
47        history.push_back(key.clone());
48        while history.len() > self.max_history {
49            history.pop_front();
50        }
51    }
52
53    /// Gets warming targets based on strategy
54    pub async fn get_warming_targets(&self, current_key: &CacheKey) -> Vec<CacheKey> {
55        match self.strategy {
56            WarmingStrategy::None => Vec::new(),
57            WarmingStrategy::AccessPattern => self.get_pattern_targets(current_key).await,
58            WarmingStrategy::SpatialAdjacent => Vec::new(), // Requires spatial info
59            WarmingStrategy::PyramidLevels => Vec::new(),   // Requires tile info
60            WarmingStrategy::Custom => Vec::new(),
61        }
62    }
63
64    /// Gets targets based on access patterns
65    async fn get_pattern_targets(&self, current_key: &CacheKey) -> Vec<CacheKey> {
66        let history = self.access_history.read().await;
67
68        // Find sequences that start with current_key
69        let mut next_keys: HashMap<CacheKey, usize> = HashMap::new();
70
71        for window in history.iter().collect::<Vec<_>>().windows(2) {
72            if window[0] == current_key {
73                *next_keys.entry(window[1].clone()).or_insert(0) += 1;
74            }
75        }
76
77        // Return most common next keys
78        let mut targets: Vec<_> = next_keys.into_iter().collect();
79        targets.sort_by_key(|x| std::cmp::Reverse(x.1));
80        targets.into_iter().take(5).map(|(k, _)| k).collect()
81    }
82}
83
84/// Memory cache layer (simplified for compatibility)
85#[cfg(feature = "cache")]
86pub type MemoryCache = LruTtlCache;
87
88/// Disk cache layer
89pub type DiskCache = PersistentDiskCache;
90
91/// Multi-level cache combining memory and disk tiers
92#[cfg(feature = "cache")]
93pub struct MultiLevelCache {
94    /// Memory cache (L1)
95    pub(crate) memory: LruTtlCache,
96    /// Disk cache (L2)
97    disk: Option<PersistentDiskCache>,
98    /// Cache warmer
99    warmer: Option<Arc<CacheWarmer<LruTtlCache>>>,
100}
101
102#[cfg(feature = "cache")]
103impl MultiLevelCache {
104    /// Creates a new multi-level cache
105    pub fn new(config: CacheConfig) -> Result<Self> {
106        let memory = LruTtlCache::new(config.clone())?;
107
108        let disk = if config.persistent && config.cache_dir.is_some() {
109            Some(PersistentDiskCache::new(config.clone())?)
110        } else {
111            None
112        };
113
114        let warmer = if config.warming_strategy != WarmingStrategy::None {
115            Some(Arc::new(CacheWarmer::new(
116                Arc::new(LruTtlCache::new(config.clone())?),
117                config.warming_strategy,
118            )))
119        } else {
120            None
121        };
122
123        Ok(Self {
124            memory,
125            disk,
126            warmer,
127        })
128    }
129
130    /// Gets an entry from the cache
131    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
132        // Record access for warming
133        if let Some(ref warmer) = self.warmer {
134            warmer.record_access(key).await;
135        }
136
137        // Try L1 (memory) first
138        if let Ok(data) = self.memory.get(key).await {
139            tracing::trace!("Cache hit (memory): {}", key);
140            return Ok(data);
141        }
142
143        // Try L2 (disk) if available
144        if let Some(ref disk) = self.disk {
145            if let Ok(data) = disk.get(key).await {
146                tracing::trace!("Cache hit (disk): {}", key);
147
148                // Promote to memory cache
149                self.memory.put(key.clone(), data.clone(), None).await.ok();
150
151                return Ok(data);
152            }
153        }
154
155        tracing::trace!("Cache miss: {}", key);
156        Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
157    }
158
159    /// Puts an entry into the cache
160    pub async fn put(&self, key: CacheKey, data: Bytes) -> Result<()> {
161        // Write to both levels
162        self.memory.put(key.clone(), data.clone(), None).await?;
163
164        if let Some(ref disk) = self.disk {
165            disk.put(key, data, None).await?;
166        }
167
168        Ok(())
169    }
170
171    /// Puts an entry with TTL
172    pub async fn put_with_ttl(&self, key: CacheKey, data: Bytes, ttl: Duration) -> Result<()> {
173        self.memory
174            .put(key.clone(), data.clone(), Some(ttl))
175            .await?;
176
177        if let Some(ref disk) = self.disk {
178            disk.put(key, data, Some(ttl)).await?;
179        }
180
181        Ok(())
182    }
183
184    /// Removes an entry from the cache
185    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
186        self.memory.remove(key).await?;
187
188        if let Some(ref disk) = self.disk {
189            disk.remove(key).await?;
190        }
191
192        Ok(())
193    }
194
195    /// Clears the cache
196    pub async fn clear(&self) -> Result<()> {
197        self.memory.clear().await?;
198
199        if let Some(ref disk) = self.disk {
200            disk.clear().await?;
201        }
202
203        Ok(())
204    }
205
206    /// Returns cache statistics
207    #[must_use]
208    pub fn stats(&self) -> &CacheStats {
209        self.memory.stats()
210    }
211
212    /// Returns the current memory cache size
213    pub fn memory_size(&self) -> usize {
214        self.memory.current_size.load(Ordering::SeqCst)
215    }
216}