multi_tier_cache/backends/
moka_cache.rs1use crate::error::CacheResult;
2use bytes::Bytes;
3use futures_util::future::BoxFuture;
4use moka::future::Cache;
5use std::any::Any;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tracing::{debug, info};
10
11#[derive(Debug, Clone)]
13struct CacheEntry {
14 value: Bytes,
15 expires_at: Instant,
16}
17
18impl CacheEntry {
19 fn new(value: Bytes, ttl: Duration) -> Self {
20 Self {
21 value,
22 expires_at: Instant::now() + ttl,
23 }
24 }
25
26 fn is_expired(&self) -> bool {
27 Instant::now() > self.expires_at
28 }
29}
30
31#[derive(Clone)]
33pub struct TypedCacheEntry {
34 pub value: Arc<dyn Any + Send + Sync>,
35 pub expires_at: Instant,
36}
37
38impl TypedCacheEntry {
39 pub fn new(value: Arc<dyn Any + Send + Sync>, ttl: Duration) -> Self {
40 Self {
41 value,
42 expires_at: Instant::now() + ttl,
43 }
44 }
45
46 #[must_use]
47 pub fn is_expired(&self) -> bool {
48 Instant::now() > self.expires_at
49 }
50}
51
52#[derive(Debug, Clone, Copy)]
54pub struct MokaCacheConfig {
55 pub max_capacity: u64,
57 pub time_to_live: Duration,
59 pub time_to_idle: Duration,
61}
62
63impl Default for MokaCacheConfig {
64 fn default() -> Self {
65 Self {
66 max_capacity: 5000,
67 time_to_live: Duration::from_secs(3600),
68 time_to_idle: Duration::from_secs(120),
69 }
70 }
71}
72
73pub struct MokaCache {
75 cache: Cache<String, CacheEntry>,
77 typed_cache: Cache<String, TypedCacheEntry>,
79 hits: Arc<AtomicU64>,
81 misses: Arc<AtomicU64>,
83 sets: Arc<AtomicU64>,
85 #[allow(dead_code)]
87 coalesced_requests: Arc<AtomicU64>,
88}
89
90impl MokaCache {
91 pub fn new(config: MokaCacheConfig) -> CacheResult<Self> {
97 info!("Initializing Moka Cache");
98
99 let cache = Cache::builder()
100 .max_capacity(config.max_capacity)
101 .time_to_live(config.time_to_live)
102 .time_to_idle(config.time_to_idle)
103 .build();
104
105 let typed_cache = Cache::builder()
106 .max_capacity(config.max_capacity)
107 .time_to_live(config.time_to_live)
108 .time_to_idle(config.time_to_idle)
109 .build();
110
111 info!(
112 capacity = config.max_capacity,
113 "Moka Cache initialized with Byte and Typed storage"
114 );
115
116 Ok(Self {
117 cache,
118 typed_cache,
119 hits: Arc::new(AtomicU64::new(0)),
120 misses: Arc::new(AtomicU64::new(0)),
121 sets: Arc::new(AtomicU64::new(0)),
122 coalesced_requests: Arc::new(AtomicU64::new(0)),
123 })
124 }
125
126 pub async fn set_typed(
132 &self,
133 key: &str,
134 value: Arc<dyn Any + Send + Sync>,
135 ttl: Duration,
136 ) -> CacheResult<()> {
137 let entry = TypedCacheEntry::new(value, ttl);
138 self.typed_cache.insert(key.to_string(), entry).await;
139 self.sets.fetch_add(1, Ordering::Relaxed);
140 Ok(())
141 }
142
143 pub async fn get_typed(&self, key: &str) -> Option<Arc<dyn Any + Send + Sync>> {
145 match self.typed_cache.get(key).await {
146 Some(entry) => {
147 if entry.is_expired() {
148 let _ = self.typed_cache.remove(key).await;
149 self.misses.fetch_add(1, Ordering::Relaxed);
150 None
151 } else {
152 self.hits.fetch_add(1, Ordering::Relaxed);
153 Some(entry.value)
154 }
155 }
156 _ => None,
157 }
158 }
159}
160
161use crate::traits::{CacheBackend, L2CacheBackend};
164
165impl CacheBackend for MokaCache {
167 fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
168 Box::pin(async move {
169 if let Some(entry) = self.cache.get(key).await {
170 if entry.is_expired() {
171 let _ = self.cache.remove(key).await;
172 self.misses.fetch_add(1, Ordering::Relaxed);
173 None
174 } else {
175 self.hits.fetch_add(1, Ordering::Relaxed);
176 Some(entry.value)
177 }
178 } else {
179 self.misses.fetch_add(1, Ordering::Relaxed);
180 None
181 }
182 })
183 }
184
185 fn set_with_ttl<'a>(
186 &'a self,
187 key: &'a str,
188 value: Bytes,
189 ttl: Duration,
190 ) -> BoxFuture<'a, CacheResult<()>> {
191 Box::pin(async move {
192 let entry = CacheEntry::new(value, ttl);
193 self.cache.insert(key.to_string(), entry).await;
194 self.sets.fetch_add(1, Ordering::Relaxed);
195 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Moka] Cached key bytes with TTL");
196 Ok(())
197 })
198 }
199
200 fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
201 Box::pin(async move {
202 self.cache.invalidate(key).await;
203 self.typed_cache.invalidate(key).await;
204 Ok(())
205 })
206 }
207
208 fn health_check(&self) -> BoxFuture<'_, bool> {
209 Box::pin(async move {
210 let test_key = "health_check_moka";
211 let test_value = Bytes::from("health_check");
212
213 match self
214 .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(60))
215 .await
216 {
217 Ok(()) => match self.get(test_key).await {
218 Some(retrieved) => {
219 let _ = self.remove(test_key).await;
220 retrieved == test_value
221 }
222 None => false,
223 },
224 Err(_) => false,
225 }
226 })
227 }
228
229 fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
230 Box::pin(async move {
231 self.cache.invalidate_all();
235 self.typed_cache.invalidate_all();
236
237 self.cache.run_pending_tasks().await;
239 self.typed_cache.run_pending_tasks().await;
240
241 debug!(pattern = %pattern, "[Moka] Invalidated all entries due to pattern '{}' request", pattern);
242 Ok(())
243 })
244 }
245
246 fn name(&self) -> &'static str {
247 "Moka"
248 }
249}
250
251impl L2CacheBackend for MokaCache {
252 fn get_with_ttl<'a>(
253 &'a self,
254 key: &'a str,
255 ) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
256 Box::pin(async move {
257 if let Some(entry) = self.cache.get(key).await {
259 if entry.is_expired() {
260 let _ = self.cache.remove(key).await;
261 self.misses.fetch_add(1, Ordering::Relaxed);
262 None
263 } else {
264 self.hits.fetch_add(1, Ordering::Relaxed);
265 let now = Instant::now();
266 let remaining = if entry.expires_at > now {
267 Some(entry.expires_at.duration_since(now))
268 } else {
269 None
270 };
271 Some((entry.value, remaining))
272 }
273 } else {
274 self.misses.fetch_add(1, Ordering::Relaxed);
275 None
276 }
277 })
278 }
279}
280
281#[allow(dead_code)]
283#[derive(Debug, Clone)]
284pub struct CacheStats {
285 pub hits: u64,
286 pub misses: u64,
287 pub sets: u64,
288 pub coalesced_requests: u64,
289 pub size: u64,
290}