skp_cache/manager/
mod.rs

1//! High-level cache manager
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use std::collections::{HashSet, VecDeque};
6
7use skp_cache_core::{
8    CacheBackend, CacheEntry, CacheKey, CacheMetrics, CacheOperation, CacheOptions,
9    CacheResult, CacheTier, DependencyBackend, JsonSerializer, NoopMetrics, Result, Serializer,
10    TaggableBackend,
11};
12
13mod coalescer;
14use coalescer::Coalescer;
15
16mod read_through;
17pub use read_through::{Loader, ReadThroughCache, CacheManagerReadThroughExt};
18
19mod groups;
20pub use groups::CacheGroup;
21
22/// Configuration for CacheManager
23#[derive(Debug, Clone)]
24pub struct CacheManagerConfig {
25    /// Default TTL for entries without explicit TTL
26    pub default_ttl: Option<Duration>,
27    /// Namespace prefix for all keys
28    pub namespace: Option<String>,
29    /// TTL jitter percentage (0.0 - 1.0) to prevent thundering herd
30    pub ttl_jitter: f64,
31}
32
33impl Default for CacheManagerConfig {
34    fn default() -> Self {
35        Self {
36            default_ttl: Some(Duration::from_secs(300)),
37            namespace: None,
38            ttl_jitter: 0.1, // 10% jitter
39        }
40    }
41}
42
43impl CacheManagerConfig {
44    /// Create config with specific default TTL
45    pub fn with_ttl(ttl: Duration) -> Self {
46        Self {
47            default_ttl: Some(ttl),
48            ..Default::default()
49        }
50    }
51
52    /// Create config with namespace
53    pub fn with_namespace(namespace: impl Into<String>) -> Self {
54        Self {
55            namespace: Some(namespace.into()),
56            ..Default::default()
57        }
58    }
59
60    /// Disable TTL jitter
61    pub fn no_jitter(mut self) -> Self {
62        self.ttl_jitter = 0.0;
63        self
64    }
65}
66
67/// High-level cache manager with pluggable serialization and metrics
68///
69/// Generic over:
70/// - `B`: The cache backend (Memory, Redis, MultiTier)
71/// - `S`: The serializer (JSON, MessagePack, Bincode)
72/// - `M`: The metrics collector
73pub struct CacheManager<B, S = JsonSerializer, M = NoopMetrics>
74where
75    B: CacheBackend + DependencyBackend,
76    S: Serializer,
77    M: CacheMetrics,
78{
79    backend: Arc<B>,
80    serializer: Arc<S>,
81    metrics: Arc<M>,
82    config: CacheManagerConfig,
83    coalescer: Coalescer,
84}
85
86// Constructors for default serializer/metrics
87impl<B: CacheBackend + DependencyBackend> CacheManager<B, JsonSerializer, NoopMetrics> {
88    /// Create a new CacheManager with default JSON serializer and no metrics
89    pub fn new(backend: B) -> Self {
90        Self::with_config(backend, CacheManagerConfig::default())
91    }
92
93    /// Create with custom config
94    pub fn with_config(backend: B, config: CacheManagerConfig) -> Self {
95        Self {
96            backend: Arc::new(backend),
97            serializer: Arc::new(JsonSerializer),
98            metrics: Arc::new(NoopMetrics),
99            config,
100            coalescer: Coalescer::new(),
101        }
102    }
103}
104
105// Full generic implementation
106impl<B, S, M> CacheManager<B, S, M>
107where
108    B: CacheBackend + DependencyBackend,
109    S: Serializer,
110    M: CacheMetrics,
111{
112    /// Create a CacheManager with custom serializer and metrics
113    pub fn with_serializer_and_metrics(
114        backend: B,
115        serializer: S,
116        metrics: M,
117        config: CacheManagerConfig,
118    ) -> Self {
119        Self {
120            backend: Arc::new(backend),
121            serializer: Arc::new(serializer),
122            metrics: Arc::new(metrics),
123            config,
124            coalescer: Coalescer::new(),
125        }
126    }
127
128    /// Create a namespaced cache group
129    pub fn group(&self, namespace: impl Into<String>) -> CacheGroup<'_, B, S, M> {
130        CacheGroup::new(self, namespace.into())
131    }
132
133    /// Get the full key with namespace prefix
134    fn full_key(&self, key: &str) -> String {
135        match &self.config.namespace {
136            Some(ns) => format!("{}:{}", ns, key),
137            None => key.to_string(),
138        }
139    }
140
141    /// Apply TTL jitter to prevent thundering herd on expiry
142    fn apply_ttl_jitter(&self, ttl: Duration) -> Duration {
143        if self.config.ttl_jitter > 0.0 {
144            let jitter_range = (ttl.as_secs_f64() * self.config.ttl_jitter) as u64;
145            if jitter_range > 0 {
146                let jitter = rand::random::<u64>() % jitter_range;
147                return ttl + Duration::from_secs(jitter);
148            }
149        }
150        ttl
151    }
152
153    /// Get a value from cache
154    pub async fn get<T>(&self, key: impl CacheKey) -> Result<CacheResult<T>>
155    where
156        T: serde::de::DeserializeOwned,
157    {
158        let full_key = self.full_key(&key.full_key());
159        let start = Instant::now();
160
161        // Use coalescer to prevent thundering herd
162        let backend = self.backend.clone();
163        let key_clone = full_key.clone();
164
165        let req_result = self.coalescer.do_request(&full_key, move || async move {
166            backend.get(&key_clone).await
167        }).await?;
168
169        let result = match req_result {
170            Some(entry) => {
171                if entry.is_expired() && !entry.is_stale() {
172                    self.metrics.record_miss(&full_key);
173                    CacheResult::Miss
174                } else if entry.is_stale() {
175                    self.metrics.record_stale_hit(&full_key);
176                    CacheResult::Stale(self.deserialize_entry(entry)?)
177                } else {
178                    self.metrics.record_hit(&full_key, CacheTier::L1Memory);
179                    CacheResult::Hit(self.deserialize_entry(entry)?)
180                }
181            }
182            None => {
183                self.metrics.record_miss(&full_key);
184                CacheResult::Miss
185            }
186        };
187
188        self.metrics
189            .record_latency(CacheOperation::Get, start.elapsed());
190        Ok(result)
191    }
192
193    /// Set a value in cache
194    pub async fn set<T>(
195        &self,
196        key: impl CacheKey,
197        value: T,
198        options: impl Into<CacheOptions>,
199    ) -> Result<()>
200    where
201        T: serde::Serialize,
202    {
203        let full_key = self.full_key(&key.full_key());
204        let options = options.into();
205
206        // Serialize
207        let serialize_start = Instant::now();
208        let serialized = self.serializer.serialize(&value)?;
209        self.metrics
210            .record_latency(CacheOperation::Serialize, serialize_start.elapsed());
211
212        self.set_raw(&full_key, serialized, options).await
213    }
214
215    /// Internal set with full logic (jitter, cascade, metrics)
216    async fn set_raw(&self, full_key: &str, value: Vec<u8>, mut options: CacheOptions) -> Result<()> {
217        // Apply default TTL if not specified
218        if options.ttl.is_none() {
219            options.ttl = self.config.default_ttl;
220        }
221
222        // Apply jitter
223        if let Some(ttl) = options.ttl {
224            options.ttl = Some(self.apply_ttl_jitter(ttl));
225        }
226
227        // Get dependents for cascade invalidation BEFORE setting
228        // (Assuming existing key's dependents might need invalidation if value changes?)
229        // Actually, usually dependents depend on the VALUE or the KEY existence.
230        // If we update the value, dependents are likely stale.
231        let dependents = self.backend.get_dependents(full_key).await.unwrap_or_default();
232
233        // Store
234        let set_start = Instant::now();
235        self.backend.set(full_key, value, &options).await?;
236        self.metrics
237            .record_latency(CacheOperation::Set, set_start.elapsed());
238            
239        // Cascade invalidation
240        for dep in dependents {
241             let _ = self.invalidate_recursive(&dep).await;
242        }
243
244        Ok(())
245    }
246
247    /// Get a value from cache, or compute it if missing (coalesced)
248    pub async fn get_or_compute<T, F, Fut>(
249        &self,
250        key: impl CacheKey,
251        computer: F,
252        options: Option<CacheOptions>,
253    ) -> Result<CacheResult<T>>
254    where
255        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
256        F: FnOnce() -> Fut + Send + 'static,
257        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
258    {
259        let full_key = self.full_key(&key.full_key());
260        let backend = self.backend.clone();
261        let key_str = full_key.clone();
262        let opts = options.unwrap_or_default();
263        let manager = self.clone();
264        
265        // Coalesce the request
266        let req_result = self.coalescer.do_request(&full_key, move || async move {
267             // 1. Check Backend
268             if let Some(entry) = backend.get(&key_str).await? {
269                 if !entry.is_expired() {
270                      return Ok(Some(entry));
271                 }
272                 
273                 // SWR Logic: If stale, trigger background refresh
274                 if entry.is_stale() {
275                      let manager_bg = manager.clone();
276                      let key_bg = key_str.clone();
277                      let opts_bg = opts.clone();
278                      
279                      manager.coalescer.try_spawn_refresh(&key_str, move || async move {
280                           if let Ok(val) = computer().await {
281                                // Serialize depends on T. We need T to serialize!
282                                // Manager has serializer.
283                                // We call set_internal (set_raw).
284                                // But set_raw expects Vec<u8>.
285                                // CacheManager has serializer.
286                                // But `computer` returns T.
287                                // We need to serialize T.
288                                // `manager_bg.serializer.serialize(&val)`.
289                                if let Ok(serialized) = manager_bg.serializer.serialize(&val) {
290                                     let _ = manager_bg.set_raw(&key_bg, serialized, opts_bg).await;
291                                }
292                           }
293                      });
294                      
295                      return Ok(Some(entry));
296                 }
297             }
298             
299             // 2. Compute (Miss case)
300             let val = computer().await?;
301             let serialized = manager.serializer.serialize(&val)?;
302             let size = serialized.len();
303             
304             // 3. Set (using set_raw for full logic)
305             manager.set_raw(&key_str, serialized.clone(), opts).await?;
306             
307             Ok(Some(CacheEntry::new(serialized, size)))
308        }).await?;
309
310        match req_result {
311            Some(entry) => {
312                if entry.is_stale() {
313                    Ok(CacheResult::Stale(self.deserialize_entry(entry)?))
314                } else {
315                    Ok(CacheResult::Hit(self.deserialize_entry(entry)?))
316                }
317            },
318            None => Err(skp_cache_core::CacheError::Internal("Compute returned None".into()))
319        }
320    }
321
322    /// Delete a key from cache (with cascade invalidation)
323    pub async fn delete(&self, key: impl CacheKey) -> Result<bool> {
324        let full_key = self.full_key(&key.full_key());
325        let start = Instant::now();
326        
327        // Use recursive invalidation
328        let result = self.invalidate_recursive(&full_key).await?;
329        
330        self.metrics
331            .record_latency(CacheOperation::Delete, start.elapsed());
332        Ok(result.0)
333    }
334
335    /// Invalidate a key and all its dependents (cascade invalidation)
336    /// 
337    /// Returns the number of entries invalidated
338    pub async fn invalidate(&self, key: impl CacheKey) -> Result<u64> {
339        let full_key = self.full_key(&key.full_key());
340        let start = Instant::now();
341        
342        let result = self.invalidate_recursive(&full_key).await?;
343        
344        self.metrics
345            .record_latency(CacheOperation::Invalidate, start.elapsed());
346        Ok(result.1)
347    }
348
349    /// Recursive invalidation of dependents
350    /// Returns (initial_key_deleted, total_count)
351    async fn invalidate_recursive(&self, key: &str) -> Result<(bool, u64)> {
352        let mut queue = VecDeque::new();
353        queue.push_back(key.to_string());
354        let mut visited = HashSet::new();
355        visited.insert(key.to_string());
356        
357        let mut initial_deleted = false;
358        let mut first = true;
359        let mut count = 0u64;
360        
361        while let Some(k) = queue.pop_front() {
362             // Get dependents first
363             if let Ok(deps) = self.backend.get_dependents(&k).await {
364                  for dep in deps {
365                      if visited.insert(dep.clone()) {
366                          queue.push_back(dep);
367                      }
368                  }
369             }
370             // Delete
371             let deleted = self.backend.delete(&k).await?;
372             if deleted {
373                 count += 1;
374             }
375             if first {
376                 initial_deleted = deleted;
377                 first = false;
378             }
379        }
380        Ok((initial_deleted, count))
381    }
382
383    /// Check if key exists in cache
384    pub async fn exists(&self, key: impl CacheKey) -> Result<bool> {
385        let full_key = self.full_key(&key.full_key());
386        self.backend.exists(&full_key).await
387    }
388
389    /// Clear all entries from cache
390    pub async fn clear(&self) -> Result<()> {
391        self.backend.clear().await
392    }
393
394    /// Get cache statistics
395    pub async fn stats(&self) -> Result<skp_cache_core::CacheStats> {
396        self.backend.stats().await
397    }
398
399    /// Get the number of entries
400    pub async fn len(&self) -> Result<usize> {
401        self.backend.len().await
402    }
403
404    /// Check if cache is empty
405    pub async fn is_empty(&self) -> Result<bool> {
406        self.backend.is_empty().await
407    }
408
409    /// Deserialize a cache entry
410    fn deserialize_entry<T>(&self, entry: CacheEntry<Vec<u8>>) -> Result<CacheEntry<T>>
411    where
412        T: serde::de::DeserializeOwned,
413    {
414        let deserialize_start = Instant::now();
415        let value: T = self.serializer.deserialize(&entry.value)?;
416        self.metrics
417            .record_latency(CacheOperation::Deserialize, deserialize_start.elapsed());
418
419        Ok(CacheEntry {
420            value,
421            created_at: entry.created_at,
422            last_accessed: entry.last_accessed,
423            access_count: entry.access_count,
424            ttl: entry.ttl,
425            stale_while_revalidate: entry.stale_while_revalidate,
426            tags: entry.tags,
427            dependencies: entry.dependencies,
428            cost: entry.cost,
429            size: entry.size,
430            etag: entry.etag,
431            version: entry.version,
432        })
433    }
434}
435
436impl<B, S, M> Clone for CacheManager<B, S, M>
437where
438    B: CacheBackend + DependencyBackend,
439    S: Serializer,
440    M: CacheMetrics,
441{
442    fn clone(&self) -> Self {
443        Self {
444            backend: self.backend.clone(),
445            serializer: self.serializer.clone(),
446            metrics: self.metrics.clone(),
447            config: self.config.clone(),
448            coalescer: self.coalescer.clone(),
449        }
450    }
451}
452
453// Taggable operations
454impl<B, S, M> CacheManager<B, S, M>
455where
456    B: CacheBackend + DependencyBackend + TaggableBackend,
457    S: Serializer,
458    M: CacheMetrics,
459{
460    /// Delete all entries with a specific tag
461    pub async fn delete_by_tag(&self, tag: &str) -> Result<u64> {
462        let start = Instant::now();
463        let count = self.backend.delete_by_tag(tag).await?;
464        self.metrics
465            .record_latency(CacheOperation::Invalidate, start.elapsed());
466        Ok(count)
467    }
468
469    /// Get all keys by tag
470    pub async fn get_keys_by_tag(&self, tag: &str) -> Result<Vec<String>> {
471        self.backend.get_by_tag(tag).await
472    }
473}