Skip to main content

prax_query/data_cache/
tiered.rs

1//! Tiered cache combining multiple cache backends.
2//!
3//! This module implements a multi-level cache where:
4//! - L1 (local): Fast in-memory cache for hot data
5//! - L2 (distributed): Redis cache for shared state
6//!
7//! # Cache Flow
8//!
9//! ```text
10//! GET request:
11//! 1. Check L1 (memory) -> Hit? Return
12//! 2. Check L2 (Redis) -> Hit? Populate L1, Return
13//! 3. Miss -> Fetch from source, populate L1 & L2
14//!
15//! SET request:
16//! 1. Write to L2 (Redis) first
17//! 2. Write to L1 (memory)
18//!
19//! INVALIDATE:
20//! 1. Invalidate L2 (Redis)
21//! 2. Invalidate L1 (memory)
22//! ```
23//!
24//! # Example
25//!
26//! ```rust,ignore
27//! use prax_query::data_cache::{TieredCache, MemoryCache, RedisCache};
28//!
29//! let memory = MemoryCache::builder()
30//!     .max_capacity(1000)
31//!     .time_to_live(Duration::from_secs(60))
32//!     .build();
33//!
34//! let redis = RedisCache::new(RedisCacheConfig::default()).await?;
35//!
36//! let cache = TieredCache::new(memory, redis);
37//! ```
38
39use std::time::Duration;
40
41use super::backend::{BackendStats, CacheBackend, CacheResult};
42use super::invalidation::EntityTag;
43use super::key::{CacheKey, KeyPattern};
44
45/// Configuration for tiered cache.
46#[derive(Debug, Clone)]
47pub struct TieredCacheConfig {
48    /// Whether to write to L1 on L2 hit (write-through to L1).
49    pub write_through_l1: bool,
50    /// Whether to write to L2 on L1 write (write-through to L2).
51    pub write_through_l2: bool,
52    /// L1 TTL (usually shorter than L2).
53    pub l1_ttl: Option<Duration>,
54    /// L2 TTL.
55    pub l2_ttl: Option<Duration>,
56    /// Whether L1 failures should fail the operation.
57    pub l1_required: bool,
58    /// Whether L2 failures should fail the operation.
59    pub l2_required: bool,
60}
61
62impl Default for TieredCacheConfig {
63    fn default() -> Self {
64        Self {
65            write_through_l1: true,
66            write_through_l2: true,
67            l1_ttl: Some(Duration::from_secs(60)), // 1 minute L1
68            l2_ttl: Some(Duration::from_secs(300)), // 5 minutes L2
69            l1_required: false,
70            l2_required: false,
71        }
72    }
73}
74
75impl TieredCacheConfig {
76    /// Set L1 TTL.
77    pub fn with_l1_ttl(mut self, ttl: Duration) -> Self {
78        self.l1_ttl = Some(ttl);
79        self
80    }
81
82    /// Set L2 TTL.
83    pub fn with_l2_ttl(mut self, ttl: Duration) -> Self {
84        self.l2_ttl = Some(ttl);
85        self
86    }
87
88    /// Make L1 required.
89    pub fn require_l1(mut self) -> Self {
90        self.l1_required = true;
91        self
92    }
93
94    /// Make L2 required.
95    pub fn require_l2(mut self) -> Self {
96        self.l2_required = true;
97        self
98    }
99
100    /// Disable write-through to L1.
101    pub fn no_write_l1(mut self) -> Self {
102        self.write_through_l1 = false;
103        self
104    }
105
106    /// Disable write-through to L2.
107    pub fn no_write_l2(mut self) -> Self {
108        self.write_through_l2 = false;
109        self
110    }
111}
112
113/// A tiered cache with L1 (local) and L2 (distributed) layers.
114pub struct TieredCache<L1, L2>
115where
116    L1: CacheBackend,
117    L2: CacheBackend,
118{
119    l1: L1,
120    l2: L2,
121    config: TieredCacheConfig,
122}
123
124impl<L1, L2> TieredCache<L1, L2>
125where
126    L1: CacheBackend,
127    L2: CacheBackend,
128{
129    /// Create a new tiered cache with default config.
130    pub fn new(l1: L1, l2: L2) -> Self {
131        Self {
132            l1,
133            l2,
134            config: TieredCacheConfig::default(),
135        }
136    }
137
138    /// Create with custom config.
139    pub fn with_config(l1: L1, l2: L2, config: TieredCacheConfig) -> Self {
140        Self { l1, l2, config }
141    }
142
143    /// Get the L1 cache.
144    pub fn l1(&self) -> &L1 {
145        &self.l1
146    }
147
148    /// Get the L2 cache.
149    pub fn l2(&self) -> &L2 {
150        &self.l2
151    }
152
153    /// Get the config.
154    pub fn config(&self) -> &TieredCacheConfig {
155        &self.config
156    }
157}
158
159impl<L1, L2> CacheBackend for TieredCache<L1, L2>
160where
161    L1: CacheBackend,
162    L2: CacheBackend,
163{
164    async fn get<T>(&self, key: &CacheKey) -> CacheResult<Option<T>>
165    where
166        T: serde::de::DeserializeOwned,
167    {
168        // Try L1 first
169        match self.l1.get::<T>(key).await {
170            Ok(Some(value)) => return Ok(Some(value)),
171            Ok(None) => {} // Continue to L2
172            Err(e) if self.config.l1_required => return Err(e),
173            Err(_) => {} // L1 error but not required, continue
174        }
175
176        // Try L2
177        match self.l2.get::<T>(key).await {
178            Ok(Some(value)) => {
179                // Note: We can't populate L1 here because T isn't guaranteed to be Serialize
180                // The caller should use get_and_populate if they want L1 population
181                Ok(Some(value))
182            }
183            Ok(None) => Ok(None),
184            Err(e) if self.config.l2_required => Err(e),
185            Err(_) => Ok(None),
186        }
187    }
188
189    async fn set<T>(&self, key: &CacheKey, value: &T, ttl: Option<Duration>) -> CacheResult<()>
190    where
191        T: serde::Serialize + Sync,
192    {
193        // Write to L2 first (source of truth for distributed)
194        if self.config.write_through_l2 {
195            let l2_ttl = ttl.or(self.config.l2_ttl);
196            match self.l2.set(key, value, l2_ttl).await {
197                Ok(()) => {}
198                Err(e) if self.config.l2_required => return Err(e),
199                Err(_) => {} // Log but continue
200            }
201        }
202
203        // Write to L1
204        if self.config.write_through_l1 {
205            let l1_ttl = ttl
206                .map(|t| t.min(self.config.l1_ttl.unwrap_or(t)))
207                .or(self.config.l1_ttl);
208
209            match self.l1.set(key, value, l1_ttl).await {
210                Ok(()) => {}
211                Err(e) if self.config.l1_required => return Err(e),
212                Err(_) => {} // Log but continue
213            }
214        }
215
216        Ok(())
217    }
218
219    async fn delete(&self, key: &CacheKey) -> CacheResult<bool> {
220        // Delete from both layers
221        let l2_deleted = match self.l2.delete(key).await {
222            Ok(deleted) => deleted,
223            Err(e) if self.config.l2_required => return Err(e),
224            Err(_) => false,
225        };
226
227        let l1_deleted = match self.l1.delete(key).await {
228            Ok(deleted) => deleted,
229            Err(e) if self.config.l1_required => return Err(e),
230            Err(_) => false,
231        };
232
233        Ok(l1_deleted || l2_deleted)
234    }
235
236    async fn exists(&self, key: &CacheKey) -> CacheResult<bool> {
237        // Check L1 first
238        if let Ok(true) = self.l1.exists(key).await {
239            return Ok(true);
240        }
241
242        // Check L2
243        self.l2.exists(key).await
244    }
245
246    // Note: get_many uses the default sequential implementation
247    // A more optimized version would batch L1/L2 lookups but requires complex trait bounds
248
249    async fn invalidate_pattern(&self, pattern: &KeyPattern) -> CacheResult<u64> {
250        // Invalidate both layers
251        let l2_count = self.l2.invalidate_pattern(pattern).await.unwrap_or(0);
252        let l1_count = self.l1.invalidate_pattern(pattern).await.unwrap_or(0);
253
254        Ok(l1_count.max(l2_count))
255    }
256
257    async fn invalidate_tags(&self, tags: &[EntityTag]) -> CacheResult<u64> {
258        let l2_count = self.l2.invalidate_tags(tags).await.unwrap_or(0);
259        let l1_count = self.l1.invalidate_tags(tags).await.unwrap_or(0);
260
261        Ok(l1_count.max(l2_count))
262    }
263
264    async fn clear(&self) -> CacheResult<()> {
265        // Clear both layers
266        let l2_result = self.l2.clear().await;
267        let l1_result = self.l1.clear().await;
268
269        // Return first error if any layer is required
270        if self.config.l2_required {
271            l2_result?;
272        }
273        if self.config.l1_required {
274            l1_result?;
275        }
276
277        Ok(())
278    }
279
280    async fn len(&self) -> CacheResult<usize> {
281        // Return L2 size as it's the source of truth
282        self.l2.len().await
283    }
284
285    async fn stats(&self) -> CacheResult<BackendStats> {
286        let l1_stats = self.l1.stats().await.unwrap_or_default();
287        let l2_stats = self.l2.stats().await.unwrap_or_default();
288
289        Ok(BackendStats {
290            entries: l2_stats.entries,           // L2 is source of truth
291            memory_bytes: l1_stats.memory_bytes, // L1 memory usage
292            connections: l2_stats.connections,   // L2 connections
293            info: Some(format!(
294                "Tiered: L1={} entries, L2={} entries",
295                l1_stats.entries, l2_stats.entries
296            )),
297        })
298    }
299}
300
301/// Builder for tiered cache.
302pub struct TieredCacheBuilder<L1, L2>
303where
304    L1: CacheBackend,
305    L2: CacheBackend,
306{
307    l1: Option<L1>,
308    l2: Option<L2>,
309    config: TieredCacheConfig,
310}
311
312impl<L1, L2> Default for TieredCacheBuilder<L1, L2>
313where
314    L1: CacheBackend,
315    L2: CacheBackend,
316{
317    fn default() -> Self {
318        Self {
319            l1: None,
320            l2: None,
321            config: TieredCacheConfig::default(),
322        }
323    }
324}
325
326impl<L1, L2> TieredCacheBuilder<L1, L2>
327where
328    L1: CacheBackend,
329    L2: CacheBackend,
330{
331    /// Create a new builder.
332    pub fn new() -> Self {
333        Self::default()
334    }
335
336    /// Set the L1 cache.
337    pub fn l1(mut self, cache: L1) -> Self {
338        self.l1 = Some(cache);
339        self
340    }
341
342    /// Set the L2 cache.
343    pub fn l2(mut self, cache: L2) -> Self {
344        self.l2 = Some(cache);
345        self
346    }
347
348    /// Set the config.
349    pub fn config(mut self, config: TieredCacheConfig) -> Self {
350        self.config = config;
351        self
352    }
353
354    /// Set L1 TTL.
355    pub fn l1_ttl(mut self, ttl: Duration) -> Self {
356        self.config.l1_ttl = Some(ttl);
357        self
358    }
359
360    /// Set L2 TTL.
361    pub fn l2_ttl(mut self, ttl: Duration) -> Self {
362        self.config.l2_ttl = Some(ttl);
363        self
364    }
365
366    /// Build the tiered cache.
367    ///
368    /// # Panics
369    /// Panics if L1 or L2 is not set.
370    pub fn build(self) -> TieredCache<L1, L2> {
371        TieredCache {
372            l1: self.l1.expect("L1 cache must be set"),
373            l2: self.l2.expect("L2 cache must be set"),
374            config: self.config,
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::super::backend::NoopCache;
382    use super::super::memory::{MemoryCache, MemoryCacheConfig};
383    use super::*;
384
385    #[tokio::test]
386    async fn test_tiered_cache_l1_hit() {
387        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
388        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
389
390        let cache = TieredCache::new(l1, l2);
391        let key = CacheKey::new("test", "key1");
392
393        // Set value
394        cache.set(&key, &"hello", None).await.unwrap();
395
396        // Should hit L1
397        let value: Option<String> = cache.get(&key).await.unwrap();
398        assert_eq!(value, Some("hello".to_string()));
399    }
400
401    #[tokio::test]
402    async fn test_tiered_cache_l2_fallback() {
403        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
404        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
405
406        // Only set in L2
407        let key = CacheKey::new("test", "key1");
408        l2.set(&key, &"from l2", None).await.unwrap();
409
410        let cache = TieredCache::with_config(
411            l1,
412            l2,
413            TieredCacheConfig {
414                write_through_l1: true,
415                ..Default::default()
416            },
417        );
418
419        // Should get from L2
420        let value: Option<String> = cache.get(&key).await.unwrap();
421        assert_eq!(value, Some("from l2".to_string()));
422
423        // Note: L1 population on L2 hit would require T: Serialize
424        // Use the set method to populate both caches explicitly
425    }
426
427    #[tokio::test]
428    async fn test_tiered_cache_invalidation() {
429        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
430        let l2 = MemoryCache::new(MemoryCacheConfig::new(100));
431
432        let cache = TieredCache::new(l1, l2);
433        let key = CacheKey::new("User", "id:1");
434
435        // Set value
436        cache.set(&key, &"user data", None).await.unwrap();
437
438        // Invalidate by pattern
439        let count = cache
440            .invalidate_pattern(&KeyPattern::entity("User"))
441            .await
442            .unwrap();
443
444        assert!(count >= 1);
445
446        // Should be gone
447        let value: Option<String> = cache.get(&key).await.unwrap();
448        assert!(value.is_none());
449    }
450
451    #[tokio::test]
452    async fn test_tiered_cache_with_noop_l2() {
453        let l1 = MemoryCache::new(MemoryCacheConfig::new(100));
454        let l2 = NoopCache;
455
456        let cache = TieredCache::new(l1, l2);
457        let key = CacheKey::new("test", "key1");
458
459        // Set should still work (L1 only)
460        cache.set(&key, &"hello", None).await.unwrap();
461
462        // Should get from L1
463        let value: Option<String> = cache.get(&key).await.unwrap();
464        assert_eq!(value, Some("hello".to_string()));
465    }
466}