Skip to main content

multi_tier_cache/backends/
moka_cache.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures_util::future::BoxFuture;
4use moka::future::Cache;
5use std::any::Any;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tracing::{debug, info};
10
11/// Cache entry with TTL information
12#[derive(Debug, Clone)]
13struct CacheEntry {
14    value: Bytes,
15    expires_at: Instant,
16}
17
18impl CacheEntry {
19    fn new(value: Bytes, ttl: Duration) -> Self {
20        Self {
21            value,
22            expires_at: Instant::now() + ttl,
23        }
24    }
25
26    fn is_expired(&self) -> bool {
27        Instant::now() > self.expires_at
28    }
29}
30
31/// Specialized entry for zero-cost deserialization on L1
32#[derive(Clone)]
33pub struct TypedCacheEntry {
34    pub value: Arc<dyn Any + Send + Sync>,
35    pub expires_at: Instant,
36}
37
38impl TypedCacheEntry {
39    pub fn new(value: Arc<dyn Any + Send + Sync>, ttl: Duration) -> Self {
40        Self {
41            value,
42            expires_at: Instant::now() + ttl,
43        }
44    }
45
46    #[must_use]
47    pub fn is_expired(&self) -> bool {
48        Instant::now() > self.expires_at
49    }
50}
51
52/// Configuration for `MokaCache`
53#[derive(Debug, Clone, Copy)]
54pub struct MokaCacheConfig {
55    /// Max capacity of the cache
56    pub max_capacity: u64,
57    /// Time to live for cache entries
58    pub time_to_live: Duration,
59    /// Time to idle for cache entries
60    pub time_to_idle: Duration,
61}
62
63impl Default for MokaCacheConfig {
64    fn default() -> Self {
65        Self {
66            max_capacity: 5000,
67            time_to_live: Duration::from_secs(3600),
68            time_to_idle: Duration::from_secs(120),
69        }
70    }
71}
72
73/// Moka in-memory cache with per-key TTL support
74pub struct MokaCache {
75    /// Moka cache instance for raw bytes
76    cache: Cache<String, CacheEntry>,
77    /// Moka cache instance for typed objects (Zero-cost optimization)
78    typed_cache: Cache<String, TypedCacheEntry>,
79    /// Hit counter
80    hits: Arc<AtomicU64>,
81    /// Miss counter
82    misses: Arc<AtomicU64>,
83    /// Set counter
84    sets: Arc<AtomicU64>,
85    /// Coalesced requests counter
86    #[allow(dead_code)]
87    coalesced_requests: Arc<AtomicU64>,
88}
89
90impl MokaCache {
91    /// Create new Moka cache
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if cache configuration is invalid.
96    pub fn new(config: MokaCacheConfig) -> Result<Self> {
97        info!("Initializing Moka Cache");
98
99        let cache = Cache::builder()
100            .max_capacity(config.max_capacity)
101            .time_to_live(config.time_to_live)
102            .time_to_idle(config.time_to_idle)
103            .build();
104
105        let typed_cache = Cache::builder()
106            .max_capacity(config.max_capacity)
107            .time_to_live(config.time_to_live)
108            .time_to_idle(config.time_to_idle)
109            .build();
110
111        info!(
112            capacity = config.max_capacity,
113            "Moka Cache initialized with Byte and Typed storage"
114        );
115
116        Ok(Self {
117            cache,
118            typed_cache,
119            hits: Arc::new(AtomicU64::new(0)),
120            misses: Arc::new(AtomicU64::new(0)),
121            sets: Arc::new(AtomicU64::new(0)),
122            coalesced_requests: Arc::new(AtomicU64::new(0)),
123        })
124    }
125
126    /// Set a typed value in the L1 cache (zero-cost optimization)
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the key or value cannot be inserted into the cache.
131    pub async fn set_typed(
132        &self,
133        key: &str,
134        value: Arc<dyn Any + Send + Sync>,
135        ttl: Duration,
136    ) -> Result<()> {
137        let entry = TypedCacheEntry::new(value, ttl);
138        self.typed_cache.insert(key.to_string(), entry).await;
139        self.sets.fetch_add(1, Ordering::Relaxed);
140        Ok(())
141    }
142
143    /// Get a typed value from the L1 cache
144    pub async fn get_typed(&self, key: &str) -> Option<Arc<dyn Any + Send + Sync>> {
145        match self.typed_cache.get(key).await {
146            Some(entry) => {
147                if entry.is_expired() {
148                    let _ = self.typed_cache.remove(key).await;
149                    self.misses.fetch_add(1, Ordering::Relaxed);
150                    None
151                } else {
152                    self.hits.fetch_add(1, Ordering::Relaxed);
153                    Some(entry.value)
154                }
155            }
156            _ => None,
157        }
158    }
159}
160
161// ===== Trait Implementations =====
162
163use crate::traits::{CacheBackend, L2CacheBackend};
164
165/// Implement `CacheBackend` trait for `MokaCache`
166impl CacheBackend for MokaCache {
167    fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
168        Box::pin(async move {
169            if let Some(entry) = self.cache.get(key).await {
170                if entry.is_expired() {
171                    let _ = self.cache.remove(key).await;
172                    self.misses.fetch_add(1, Ordering::Relaxed);
173                    None
174                } else {
175                    self.hits.fetch_add(1, Ordering::Relaxed);
176                    Some(entry.value)
177                }
178            } else {
179                self.misses.fetch_add(1, Ordering::Relaxed);
180                None
181            }
182        })
183    }
184
185    fn set_with_ttl<'a>(
186        &'a self,
187        key: &'a str,
188        value: Bytes,
189        ttl: Duration,
190    ) -> BoxFuture<'a, Result<()>> {
191        Box::pin(async move {
192            let entry = CacheEntry::new(value, ttl);
193            self.cache.insert(key.to_string(), entry).await;
194            self.sets.fetch_add(1, Ordering::Relaxed);
195            debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Moka] Cached key bytes with TTL");
196            Ok(())
197        })
198    }
199
200    fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Result<()>> {
201        Box::pin(async move {
202            self.cache.invalidate(key).await;
203            self.typed_cache.invalidate(key).await;
204            Ok(())
205        })
206    }
207
208    fn health_check(&self) -> BoxFuture<'_, bool> {
209        Box::pin(async move {
210            let test_key = "health_check_moka";
211            let test_value = Bytes::from("health_check");
212
213            match self
214                .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(60))
215                .await
216            {
217                Ok(()) => match self.get(test_key).await {
218                    Some(retrieved) => {
219                        let _ = self.remove(test_key).await;
220                        retrieved == test_value
221                    }
222                    None => false,
223                },
224                Err(_) => false,
225            }
226        })
227    }
228
229    fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, Result<()>> {
230        Box::pin(async move {
231            // For Moka (L1), we'll do a full invalidation for pattern requests to ensure consistency.
232            // Pattern invalidation is usually relatively rare compared to single-key lookups,
233            // so clearing L1 is a safe and robust fallback to ensure no stale data remains.
234            self.cache.invalidate_all();
235            self.typed_cache.invalidate_all();
236
237            // Ensure background invalidation tasks are processed
238            self.cache.run_pending_tasks().await;
239            self.typed_cache.run_pending_tasks().await;
240
241            debug!(pattern = %pattern, "[Moka] Invalidated all entries due to pattern '{}' request", pattern);
242            Ok(())
243        })
244    }
245
246    fn name(&self) -> &'static str {
247        "Moka"
248    }
249}
250
251impl L2CacheBackend for MokaCache {
252    fn get_with_ttl<'a>(
253        &'a self,
254        key: &'a str,
255    ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
256        Box::pin(async move {
257            // Moka doesn't easily expose remaining TTL for an entry
258            if let Some(entry) = self.cache.get(key).await {
259                if entry.is_expired() {
260                    let _ = self.cache.remove(key).await;
261                    self.misses.fetch_add(1, Ordering::Relaxed);
262                    None
263                } else {
264                    self.hits.fetch_add(1, Ordering::Relaxed);
265                    let now = Instant::now();
266                    let remaining = if entry.expires_at > now {
267                        Some(entry.expires_at.duration_since(now))
268                    } else {
269                        None
270                    };
271                    Some((entry.value, remaining))
272                }
273            } else {
274                self.misses.fetch_add(1, Ordering::Relaxed);
275                None
276            }
277        })
278    }
279}
280
281/// Cache statistics
282#[allow(dead_code)]
283#[derive(Debug, Clone)]
284pub struct CacheStats {
285    pub hits: u64,
286    pub misses: u64,
287    pub sets: u64,
288    pub coalesced_requests: u64,
289    pub size: u64,
290}