multi_tier_cache/backends/
moka_cache.rs1use anyhow::Result;
6use moka::future::Cache;
7use serde_json;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15struct CacheEntry {
16 value: serde_json::Value,
17 expires_at: Instant,
18}
19
20impl CacheEntry {
21 fn new(value: serde_json::Value, ttl: Duration) -> Self {
22 Self {
23 value,
24 expires_at: Instant::now() + ttl,
25 }
26 }
27
28 fn is_expired(&self) -> bool {
29 Instant::now() > self.expires_at
30 }
31}
32
33#[derive(Debug, Clone, Copy)]
35pub struct MokaCacheConfig {
36 pub max_capacity: u64,
38 pub time_to_live: Duration,
40 pub time_to_idle: Duration,
42}
43
44impl Default for MokaCacheConfig {
45 fn default() -> Self {
46 Self {
47 max_capacity: 2000,
48 time_to_live: Duration::from_secs(3600),
49 time_to_idle: Duration::from_secs(120),
50 }
51 }
52}
53
54pub struct MokaCache {
62 cache: Cache<String, CacheEntry>,
64 hits: Arc<AtomicU64>,
66 misses: Arc<AtomicU64>,
68 sets: Arc<AtomicU64>,
70 #[allow(dead_code)]
72 coalesced_requests: Arc<AtomicU64>,
73}
74
75impl MokaCache {
76 pub fn new(config: MokaCacheConfig) -> Result<Self> {
81 info!("Initializing Moka Cache");
82
83 let cache = Cache::builder()
84 .max_capacity(config.max_capacity)
85 .time_to_live(config.time_to_live)
86 .time_to_idle(config.time_to_idle)
87 .build();
88
89 info!(
90 capacity = config.max_capacity,
91 "Moka Cache initialized with per-key TTL support"
92 );
93
94 Ok(Self {
95 cache,
96 hits: Arc::new(AtomicU64::new(0)),
97 misses: Arc::new(AtomicU64::new(0)),
98 sets: Arc::new(AtomicU64::new(0)),
99 coalesced_requests: Arc::new(AtomicU64::new(0)),
100 })
101 }
102}
103
104use crate::traits::CacheBackend;
107use async_trait::async_trait;
108
109#[async_trait]
113impl CacheBackend for MokaCache {
114 async fn get(&self, key: &str) -> Option<serde_json::Value> {
115 if let Some(entry) = self.cache.get(key).await {
116 if entry.is_expired() {
117 let _ = self.cache.remove(key).await;
119 self.misses.fetch_add(1, Ordering::Relaxed);
120 None
121 } else {
122 self.hits.fetch_add(1, Ordering::Relaxed);
123 Some(entry.value)
124 }
125 } else {
126 self.misses.fetch_add(1, Ordering::Relaxed);
127 None
128 }
129 }
130
131 async fn set_with_ttl(&self, key: &str, value: serde_json::Value, ttl: Duration) -> Result<()> {
132 let entry = CacheEntry::new(value, ttl);
133 self.cache.insert(key.to_string(), entry).await;
134 self.sets.fetch_add(1, Ordering::Relaxed);
135 debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Moka] Cached key with TTL");
136 Ok(())
137 }
138
139 async fn remove(&self, key: &str) -> Result<()> {
140 self.cache.remove(key).await;
141 Ok(())
142 }
143
144 async fn health_check(&self) -> bool {
145 let test_key = "health_check_moka";
147 let test_value = serde_json::json!({"test": true});
148
149 match self
150 .set_with_ttl(test_key, test_value.clone(), Duration::from_secs(60))
151 .await
152 {
153 Ok(()) => match self.get(test_key).await {
154 Some(retrieved) => {
155 let _ = self.remove(test_key).await;
156 retrieved == test_value
157 }
158 None => false,
159 },
160 Err(_) => false,
161 }
162 }
163
164 fn name(&self) -> &'static str {
165 "Moka"
166 }
167}
168
169#[allow(dead_code)]
171#[derive(Debug, Clone)]
172pub struct CacheStats {
173 pub hits: u64,
174 pub misses: u64,
175 pub sets: u64,
176 pub coalesced_requests: u64,
177 pub size: u64,
178}