prax_query/data_cache/
tiered.rs

1//! Tiered cache combining multiple cache backends.
2//!
3//! This module implements a multi-level cache where:
4//! - L1 (local): Fast in-memory cache for hot data
5//! - L2 (distributed): Redis cache for shared state
6//!
7//! # Cache Flow
8//!
9//! ```text
10//! GET request:
11//! 1. Check L1 (memory) -> Hit? Return
12//! 2. Check L2 (Redis) -> Hit? Populate L1, Return
13//! 3. Miss -> Fetch from source, populate L1 & L2
14//!
15//! SET request:
16//! 1. Write to L2 (Redis) first
17//! 2. Write to L1 (memory)
18//!
19//! INVALIDATE:
20//! 1. Invalidate L2 (Redis)
21//! 2. Invalidate L1 (memory)
22//! ```
23//!
24//! # Example
25//!
26//! ```rust,ignore
27//! use prax_query::data_cache::{TieredCache, MemoryCache, RedisCache};
28//!
29//! let memory = MemoryCache::builder()
30//!     .max_capacity(1000)
31//!     .time_to_live(Duration::from_secs(60))
32//!     .build();
33//!
34//! let redis = RedisCache::new(RedisCacheConfig::default()).await?;
35//!
36//! let cache = TieredCache::new(memory, redis);
37//! ```
38
39use std::time::Duration;
40
41use super::backend::{BackendStats, CacheBackend, CacheResult};
42use super::invalidation::EntityTag;
43use super::key::{CacheKey, KeyPattern};
44
45/// Configuration for tiered cache.
46#[derive(Debug, Clone)]
47pub struct TieredCacheConfig {
48    /// Whether to write to L1 on L2 hit (write-through to L1).
49    pub write_through_l1: bool,
50    /// Whether to write to L2 on L1 write (write-through to L2).
51    pub write_through_l2: bool,
52    /// L1 TTL (usually shorter than L2).
53    pub l1_ttl: Option<Duration>,
54    /// L2 TTL.
55    pub l2_ttl: Option<Duration>,
56    /// Whether L1 failures should fail the operation.
57    pub l1_required: bool,
58    /// Whether L2 failures should fail the operation.
59    pub l2_required: bool,
60}
61
62impl Default for TieredCacheConfig {
63    fn default() -> Self {
64        Self {
65            write_through_l1: true,
66            write_through_l2: true,
67            l1_ttl: Some(Duration::from_secs(60)),   // 1 minute L1
68            l2_ttl: Some(Duration::from_secs(300)),  // 5 minutes L2
69            l1_required: false,
70            l2_required: false,
71        }
72    }
73}
74
75impl TieredCacheConfig {
76    /// Set L1 TTL.
77    pub fn with_l1_ttl(mut self, ttl: Duration) -> Self {
78        self.l1_ttl = Some(ttl);
79        self
80    }
81
82    /// Set L2 TTL.
83    pub fn with_l2_ttl(mut self, ttl: Duration) -> Self {
84        self.l2_ttl = Some(ttl);
85        self
86    }
87
88    /// Make L1 required.
89    pub fn require_l1(mut self) -> Self {
90        self.l1_required = true;
91        self
92    }
93
94    /// Make L2 required.
95    pub fn require_l2(mut self) -> Self {
96        self.l2_required = true;
97        self
98    }
99
100    /// Disable write-through to L1.
101    pub fn no_write_l1(mut self) -> Self {
102        self.write_through_l1 = false;
103        self
104    }
105
106    /// Disable write-through to L2.
107    pub fn no_write_l2(mut self) -> Self {
108        self.write_through_l2 = false;
109        self
110    }
111}
112
113/// A tiered cache with L1 (local) and L2 (distributed) layers.
114pub struct TieredCache<L1, L2>
115where
116    L1: CacheBackend,
117    L2: CacheBackend,
118{
119    l1: L1,
120    l2: L2,
121    config: TieredCacheConfig,
122}
123
124impl<L1, L2> TieredCache<L1, L2>
125where
126    L1: CacheBackend,
127    L2: CacheBackend,
128{
129    /// Create a new tiered cache with default config.
130    pub fn new(l1: L1, l2: L2) -> Self {
131        Self {
132            l1,
133            l2,
134            config: TieredCacheConfig::default(),
135        }
136    }
137
138    /// Create with custom config.
139    pub fn with_config(l1: L1, l2: L2, config: TieredCacheConfig) -> Self {
140        Self { l1, l2, config }
141    }
142
143    /// Get the L1 cache.
144    pub fn l1(&self) -> &L1 {
145        &self.l1
146    }
147
148    /// Get the L2 cache.
149    pub fn l2(&self) -> &L2 {
150        &self.l2
151    }
152
153    /// Get the config.
154    pub fn config(&self) -> &TieredCacheConfig {
155        &self.config
156    }
157}
158
159impl<L1, L2> CacheBackend for TieredCache<L1, L2>
160where
161    L1: CacheBackend,
162    L2: CacheBackend,
163{
164    async fn get<T>(&self, key: &CacheKey) -> CacheResult<Option<T>>
165    where
166        T: serde::de::DeserializeOwned,
167    {
168        // Try L1 first
169        match self.l1.get::<T>(key).await {
170            Ok(Some(value)) => return Ok(Some(value)),
171            Ok(None) => {} // Continue to L2
172            Err(e) if self.config.l1_required => return Err(e),
173            Err(_) => {} // L1 error but not required, continue
174        }
175
176        // Try L2
177        match self.l2.get::<T>(key).await {
178            Ok(Some(value)) => {
179                // Note: We can't populate L1 here because T isn't guaranteed to be Serialize
180                // The caller should use get_and_populate if they want L1 population
181                Ok(Some(value))
182            }
183            Ok(None) => Ok(None),
184            Err(e) if self.config.l2_required => Err(e),
185            Err(_) => Ok(None),
186        }
187    }
188
189    async fn set<T>(
190        &self,
191        key: &CacheKey,
192        value: &T,
193        ttl: Option<Duration>,
194    ) -> CacheResult<()>
195    where
196        T: serde::Serialize + Sync,
197    {
198        // Write to L2 first (source of truth for distributed)
199        if self.config.write_through_l2 {
200            let l2_ttl = ttl.or(self.config.l2_ttl);
201            match self.l2.set(key, value, l2_ttl).await {
202                Ok(()) => {}
203                Err(e) if self.config.l2_required => return Err(e),
204                Err(_) => {} // Log but continue
205            }
206        }
207
208        // Write to L1
209        if self.config.write_through_l1 {
210            let l1_ttl = ttl
211                .map(|t| t.min(self.config.l1_ttl.unwrap_or(t)))
212                .or(self.config.l1_ttl);
213
214            match self.l1.set(key, value, l1_ttl).await {
215                Ok(()) => {}
216                Err(e) if self.config.l1_required => return Err(e),
217                Err(_) => {} // Log but continue
218            }
219        }
220
221        Ok(())
222    }
223
224    async fn delete(&self, key: &CacheKey) -> CacheResult<bool> {
225        // Delete from both layers
226        let l2_deleted = match self.l2.delete(key).await {
227            Ok(deleted) => deleted,
228            Err(e) if self.config.l2_required => return Err(e),
229            Err(_) => false,
230        };
231
232        let l1_deleted = match self.l1.delete(key).await {
233            Ok(deleted) => deleted,
234            Err(e) if self.config.l1_required => return Err(e),
235            Err(_) => false,
236        };
237
238        Ok(l1_deleted || l2_deleted)
239    }
240
241    async fn exists(&self, key: &CacheKey) -> CacheResult<bool> {
242        // Check L1 first
243        if let Ok(true) = self.l1.exists(key).await {
244            return Ok(true);
245        }
246
247        // Check L2
248        self.l2.exists(key).await
249    }
250
251    // Note: get_many uses the default sequential implementation
252    // A more optimized version would batch L1/L2 lookups but requires complex trait bounds
253
254    async fn invalidate_pattern(&self, pattern: &KeyPattern) -> CacheResult<u64> {
255        // Invalidate both layers
256        let l2_count = self.l2.invalidate_pattern(pattern).await.unwrap_or(0);
257        let l1_count = self.l1.invalidate_pattern(pattern).await.unwrap_or(0);
258
259        Ok(l1_count.max(l2_count))
260    }
261
262    async fn invalidate_tags(&self, tags: &[EntityTag]) -> CacheResult<u64> {
263        let l2_count = self.l2.invalidate_tags(tags).await.unwrap_or(0);
264        let l1_count = self.l1.invalidate_tags(tags).await.unwrap_or(0);
265
266        Ok(l1_count.max(l2_count))
267    }
268
269    async fn clear(&self) -> CacheResult<()> {
270        // Clear both layers
271        let l2_result = self.l2.clear().await;
272        let l1_result = self.l1.clear().await;
273
274        // Return first error if any layer is required
275        if self.config.l2_required {
276            l2_result?;
277        }
278        if self.config.l1_required {
279            l1_result?;
280        }
281
282        Ok(())
283    }
284
285    async fn len(&self) -> CacheResult<usize> {
286        // Return L2 size as it's the source of truth
287        self.l2.len().await
288    }
289
290    async fn stats(&self) -> CacheResult<BackendStats> {
291        let l1_stats = self.l1.stats().await.unwrap_or_default();
292        let l2_stats = self.l2.stats().await.unwrap_or_default();
293
294        Ok(BackendStats {
295            entries: l2_stats.entries, // L2 is source of truth
296            memory_bytes: l1_stats.memory_bytes, // L1 memory usage
297            connections: l2_stats.connections, // L2 connections
298            info: Some(format!(
299                "Tiered: L1={} entries, L2={} entries",
300                l1_stats.entries, l2_stats.entries
301            )),
302        })
303    }
304}
305
306/// Builder for tiered cache.
307pub struct TieredCacheBuilder<L1, L2>
308where
309    L1: CacheBackend,
310    L2: CacheBackend,
311{
312    l1: Option<L1>,
313    l2: Option<L2>,
314    config: TieredCacheConfig,
315}
316
317impl<L1, L2> Default for TieredCacheBuilder<L1, L2>
318where
319    L1: CacheBackend,
320    L2: CacheBackend,
321{
322    fn default() -> Self {
323        Self {
324            l1: None,
325            l2: None,
326            config: TieredCacheConfig::default(),
327        }
328    }
329}
330
331impl<L1, L2> TieredCacheBuilder<L1, L2>
332where
333    L1: CacheBackend,
334    L2: CacheBackend,
335{
336    /// Create a new builder.
337    pub fn new() -> Self {
338        Self::default()
339    }
340
341    /// Set the L1 cache.
342    pub fn l1(mut self, cache: L1) -> Self {
343        self.l1 = Some(cache);
344        self
345    }
346
347    /// Set the L2 cache.
348    pub fn l2(mut self, cache: L2) -> Self {
349        self.l2 = Some(cache);
350        self
351    }
352
353    /// Set the config.
354    pub fn config(mut self, config: TieredCacheConfig) -> Self {
355        self.config = config;
356        self
357    }
358
359    /// Set L1 TTL.
360    pub fn l1_ttl(mut self, ttl: Duration) -> Self {
361        self.config.l1_ttl = Some(ttl);
362        self
363    }
364
365    /// Set L2 TTL.
366    pub fn l2_ttl(mut self, ttl: Duration) -> Self {
367        self.config.l2_ttl = Some(ttl);
368        self
369    }
370
371    /// Build the tiered cache.
372    ///
373    /// # Panics
374    /// Panics if L1 or L2 is not set.
375    pub fn build(self) -> TieredCache<L1, L2> {
376        TieredCache {
377            l1: self.l1.expect("L1 cache must be set"),
378            l2: self.l2.expect("L2 cache must be set"),
379            config: self.config,
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::super::backend::NoopCache;
387    use super::super::memory::{MemoryCache, MemoryCacheConfig};
388    use super::*;
389
390    #[tokio::test]
391    async fn test_tiered_cache_l1_hit() {
392        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
393        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
394
395        let cache = TieredCache::new(l1, l2);
396        let key = CacheKey::new("test", "key1");
397
398        // Set value
399        cache.set(&key, &"hello", None).await.unwrap();
400
401        // Should hit L1
402        let value: Option<String> = cache.get(&key).await.unwrap();
403        assert_eq!(value, Some("hello".to_string()));
404    }
405
406    #[tokio::test]
407    async fn test_tiered_cache_l2_fallback() {
408        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
409        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
410
411        // Only set in L2
412        let key = CacheKey::new("test", "key1");
413        l2.set(&key, &"from l2", None).await.unwrap();
414
415        let cache = TieredCache::with_config(
416            l1,
417            l2,
418            TieredCacheConfig {
419                write_through_l1: true,
420                ..Default::default()
421            },
422        );
423
424        // Should get from L2
425        let value: Option<String> = cache.get(&key).await.unwrap();
426        assert_eq!(value, Some("from l2".to_string()));
427
428        // Note: L1 population on L2 hit would require T: Serialize
429        // Use the set method to populate both caches explicitly
430    }
431
432    #[tokio::test]
433    async fn test_tiered_cache_invalidation() {
434        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
435        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
436
437        let cache = TieredCache::new(l1, l2);
438        let key = CacheKey::new("User", "id:1");
439
440        // Set value
441        cache.set(&key, &"user data", None).await.unwrap();
442
443        // Invalidate by pattern
444        let count = cache
445            .invalidate_pattern(&KeyPattern::entity("User"))
446            .await
447            .unwrap();
448
449        assert!(count >= 1);
450
451        // Should be gone
452        let value: Option<String> = cache.get(&key).await.unwrap();
453        assert!(value.is_none());
454    }
455
456    #[tokio::test]
457    async fn test_tiered_cache_with_noop_l2() {
458        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
459        let l2 = NoopCache;
460
461        let cache = TieredCache::new(l1, l2);
462        let key = CacheKey::new("test", "key1");
463
464        // Set should still work (L1 only)
465        cache.set(&key, &"hello", None).await.unwrap();
466
467        // Should get from L1
468        let value: Option<String> = cache.get(&key).await.unwrap();
469        assert_eq!(value, Some("hello".to_string()));
470    }
471}
472