Skip to main content

aurora_db/storage/
hot.rs

1//! Hot Store - Moka-based cache (Sync Version)
2//! Optimized for high-concurrency database workloads.
3
4use moka::Expiry;
5use moka::sync::Cache;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
11pub struct CacheStats {
12    pub item_count: u64,
13    pub memory_usage: u64,
14    pub hit_count: u64,
15    pub miss_count: u64,
16    pub hit_ratio: f64,
17    pub weighted_size: u64,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
21pub enum EvictionPolicy {
22    LRU,    // 5min default TTL
23    LFU,    // 15min default TTL
24    Hybrid, // 30min default TTL
25}
26
27impl std::fmt::Display for EvictionPolicy {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            EvictionPolicy::LRU => write!(f, "LRU"),
31            EvictionPolicy::LFU => write!(f, "LFU"),
32            EvictionPolicy::Hybrid => write!(f, "Hybrid"),
33        }
34    }
35}
36
37impl EvictionPolicy {
38    pub fn default_ttl(&self) -> Duration {
39        match self {
40            EvictionPolicy::LRU => Duration::from_secs(5 * 60),
41            EvictionPolicy::LFU => Duration::from_secs(15 * 60),
42            EvictionPolicy::Hybrid => Duration::from_secs(30 * 60),
43        }
44    }
45}
46
47/// Cached value with optional custom TTL
48#[derive(Clone)]
49pub struct CachedValue {
50    pub data: Arc<Vec<u8>>,
51    pub ttl: Option<Duration>,
52}
53
54/// Per-entry expiration policy
55pub struct HotStoreExpiry {
56    default_ttl: Duration,
57}
58
59// Note: moka::Expiry trait is the same for sync and future caches
60impl Expiry<String, CachedValue> for HotStoreExpiry {
61    fn expire_after_create(
62        &self,
63        _key: &String,
64        value: &CachedValue,
65        _current_time: Instant,
66    ) -> Option<Duration> {
67        Some(value.ttl.unwrap_or(self.default_ttl))
68    }
69
70    fn expire_after_update(
71        &self,
72        _key: &String,
73        value: &CachedValue,
74        _current_time: Instant,
75        _current_duration: Option<Duration>,
76    ) -> Option<Duration> {
77        Some(value.ttl.unwrap_or(self.default_ttl))
78    }
79}
80
81pub struct HotStore {
82    // Changed: Using moka::sync::Cache instead of moka::future::Cache
83    cache: Cache<String, CachedValue>,
84    hit_count: Arc<AtomicU64>,
85    miss_count: Arc<AtomicU64>,
86    max_size: u64,
87    eviction_policy: EvictionPolicy,
88    default_ttl: Duration,
89}
90
91impl Clone for HotStore {
92    fn clone(&self) -> Self {
93        Self {
94            cache: self.cache.clone(),
95            hit_count: Arc::clone(&self.hit_count),
96            miss_count: Arc::clone(&self.miss_count),
97            max_size: self.max_size,
98            eviction_policy: self.eviction_policy,
99            default_ttl: self.default_ttl,
100        }
101    }
102}
103
104impl Default for HotStore {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl HotStore {
111    pub fn new() -> Self {
112        Self::new_with_size_limit(128)
113    }
114
115    pub fn new_with_size_limit(max_size_mb: usize) -> Self {
116        Self::with_config_and_eviction(max_size_mb, 0, EvictionPolicy::Hybrid)
117    }
118
119    pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
120        Self::with_config_and_eviction(
121            cache_size_mb,
122            _cleanup_interval_secs,
123            EvictionPolicy::Hybrid,
124        )
125    }
126
127    pub fn with_config_and_eviction(
128        cache_size_mb: usize,
129        _cleanup_interval_secs: u64,
130        eviction_policy: EvictionPolicy,
131    ) -> Self {
132        let max_size = (cache_size_mb as u64).saturating_mul(1024 * 1024);
133        let default_ttl = eviction_policy.default_ttl();
134
135        let cache = Cache::builder()
136            .max_capacity(max_size)
137            .weigher(|_key: &String, value: &CachedValue| -> u32 {
138                value.data.len().min(u32::MAX as usize) as u32
139            })
140            .expire_after(HotStoreExpiry { default_ttl })
141            .build();
142
143        Self {
144            cache,
145            hit_count: Arc::new(AtomicU64::new(0)),
146            miss_count: Arc::new(AtomicU64::new(0)),
147            max_size,
148            eviction_policy,
149            default_ttl,
150        }
151    }
152
153    // Synchronous API (Fast path)
154
155    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
156        self.get_ref(key).map(|arc| (*arc).clone())
157    }
158
159    pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
160        match self.cache.get(key) {
161            Some(value) => {
162                self.hit_count.fetch_add(1, Ordering::Relaxed);
163                Some(value.data)
164            }
165            None => {
166                self.miss_count.fetch_add(1, Ordering::Relaxed);
167                None
168            }
169        }
170    }
171
172    pub fn set(&self, key: Arc<String>, value: Arc<Vec<u8>>, ttl: Option<Duration>) {
173        let cached = CachedValue { data: value, ttl };
174        // Moka requires String keys, so we deref the Arc<String>.
175        // This is a small clone (just the key string), unavoidable with Moka, but cheap.
176        self.cache.insert(key.to_string(), cached);
177    }
178
179    // Async API Compatibility
180    // Since moka::sync is thread-safe and fast (memory only),
181    // we can call it directly in async blocks without spawn_blocking
182    // unless you have massive eviction callbacks.
183
184    pub async fn get_async(&self, key: &str) -> Option<Arc<Vec<u8>>> {
185        // Direct call to underlying sync cache is safe here
186        self.get_ref(key)
187    }
188
189    pub async fn set_async(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
190        self.set(Arc::new(key), Arc::new(value), ttl)
191    }
192
193    // Maintenance & Stats
194
195    pub fn is_hot(&self, key: &str) -> bool {
196        self.cache.contains_key(key)
197    }
198
199    pub fn delete(&self, key: &str) {
200        self.cache.invalidate(key);
201    }
202
203    pub fn get_stats(&self) -> CacheStats {
204        let hits = self.hit_count.load(Ordering::Relaxed);
205        let misses = self.miss_count.load(Ordering::Relaxed);
206        let total = hits + misses;
207
208        // Note: sync cache uses run_pending_tasks too for exact stats,
209        // but it also does maintenance on reads/writes automatically.
210        CacheStats {
211            item_count: self.cache.entry_count(),
212            memory_usage: self.cache.weighted_size(),
213            hit_count: hits,
214            miss_count: misses,
215            hit_ratio: if total == 0 {
216                0.0
217            } else {
218                hits as f64 / total as f64
219            },
220            weighted_size: self.cache.weighted_size(),
221        }
222    }
223
224    pub fn clear(&self) {
225        self.cache.invalidate_all();
226    }
227
228    pub fn hit_ratio(&self) -> f64 {
229        let hits = self.hit_count.load(Ordering::Relaxed);
230        let misses = self.miss_count.load(Ordering::Relaxed);
231        let total = hits + misses;
232        if total == 0 {
233            0.0
234        } else {
235            hits as f64 / total as f64
236        }
237    }
238
239    /// Explicitly run maintenance tasks (eviction, etc).
240    /// Moka sync does this automatically on access, but this can be called
241    /// by a background thread if the cache is idle.
242    pub fn sync(&self) {
243        self.cache.run_pending_tasks();
244    }
245
246    pub fn max_size(&self) -> u64 {
247        self.max_size
248    }
249
250    pub fn eviction_policy(&self) -> EvictionPolicy {
251        self.eviction_policy
252    }
253
254    pub fn default_ttl(&self) -> Duration {
255        self.default_ttl
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_basic_get_set() {
265        let store = HotStore::new_with_size_limit(1);
266        store.set(
267            Arc::new("key1".to_string()),
268            Arc::new(vec![1, 2, 3, 4]),
269            None,
270        );
271
272        let result = store.get("key1");
273        assert!(result.is_some());
274        assert_eq!(result.unwrap(), vec![1, 2, 3, 4]);
275    }
276
277    #[test]
278    fn test_cache_miss() {
279        let store = HotStore::new();
280        let result = store.get("nonexistent");
281        assert!(result.is_none());
282        assert_eq!(store.get_stats().miss_count, 1);
283    }
284
285    #[tokio::test]
286    async fn test_async_operations() {
287        let store = HotStore::new();
288        // Verify the async wrappers work on the sync cache
289        store
290            .set_async("key1".to_string(), vec![1, 2, 3], None)
291            .await;
292
293        let result = store.get_async("key1").await;
294        assert!(result.is_some());
295        assert_eq!(*result.unwrap(), vec![1, 2, 3]);
296    }
297
298    #[test]
299    fn test_is_hot() {
300        let store = HotStore::new();
301        store.set(Arc::new("key1".to_string()), Arc::new(vec![1]), None);
302        assert!(store.is_hot("key1"));
303        assert!(!store.is_hot("key2"));
304    }
305}