Skip to main content

aurora_db/storage/
hot.rs

1//! Hot Store - Moka-based cache (Sync Version)
2//! Optimized for high-concurrency database workloads.
3
4use moka::Expiry;
5use moka::sync::Cache;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
11pub struct CacheStats {
12    pub item_count: u64,
13    pub memory_usage: u64,
14    pub hit_count: u64,
15    pub miss_count: u64,
16    pub hit_ratio: f64,
17    pub weighted_size: u64,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum EvictionPolicy {
22    LRU,    // 5min default TTL
23    LFU,    // 15min default TTL
24    Hybrid, // 30min default TTL
25}
26
27impl EvictionPolicy {
28    pub fn default_ttl(&self) -> Duration {
29        match self {
30            EvictionPolicy::LRU => Duration::from_secs(5 * 60),
31            EvictionPolicy::LFU => Duration::from_secs(15 * 60),
32            EvictionPolicy::Hybrid => Duration::from_secs(30 * 60),
33        }
34    }
35}
36
37/// Cached value with optional custom TTL
38#[derive(Clone)]
39pub struct CachedValue {
40    pub data: Arc<Vec<u8>>,
41    pub ttl: Option<Duration>,
42}
43
44/// Per-entry expiration policy
45pub struct HotStoreExpiry {
46    default_ttl: Duration,
47}
48
49// Note: moka::Expiry trait is the same for sync and future caches
50impl Expiry<String, CachedValue> for HotStoreExpiry {
51    fn expire_after_create(
52        &self,
53        _key: &String,
54        value: &CachedValue,
55        _current_time: Instant,
56    ) -> Option<Duration> {
57        Some(value.ttl.unwrap_or(self.default_ttl))
58    }
59
60    fn expire_after_update(
61        &self,
62        _key: &String,
63        value: &CachedValue,
64        _current_time: Instant,
65        _current_duration: Option<Duration>,
66    ) -> Option<Duration> {
67        Some(value.ttl.unwrap_or(self.default_ttl))
68    }
69}
70
71pub struct HotStore {
72    // Changed: Using moka::sync::Cache instead of moka::future::Cache
73    cache: Cache<String, CachedValue>,
74    hit_count: Arc<AtomicU64>,
75    miss_count: Arc<AtomicU64>,
76    max_size: u64,
77    eviction_policy: EvictionPolicy,
78    default_ttl: Duration,
79}
80
81impl Clone for HotStore {
82    fn clone(&self) -> Self {
83        Self {
84            cache: self.cache.clone(),
85            hit_count: Arc::clone(&self.hit_count),
86            miss_count: Arc::clone(&self.miss_count),
87            max_size: self.max_size,
88            eviction_policy: self.eviction_policy,
89            default_ttl: self.default_ttl,
90        }
91    }
92}
93
94impl Default for HotStore {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl HotStore {
101    pub fn new() -> Self {
102        Self::new_with_size_limit(128)
103    }
104
105    pub fn new_with_size_limit(max_size_mb: usize) -> Self {
106        Self::with_config_and_eviction(max_size_mb, 0, EvictionPolicy::Hybrid)
107    }
108
109    pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
110        Self::with_config_and_eviction(
111            cache_size_mb,
112            _cleanup_interval_secs,
113            EvictionPolicy::Hybrid,
114        )
115    }
116
117    pub fn with_config_and_eviction(
118        cache_size_mb: usize,
119        _cleanup_interval_secs: u64,
120        eviction_policy: EvictionPolicy,
121    ) -> Self {
122        let max_size = (cache_size_mb as u64).saturating_mul(1024 * 1024);
123        let default_ttl = eviction_policy.default_ttl();
124
125        let cache = Cache::builder()
126            .max_capacity(max_size)
127            .weigher(|_key: &String, value: &CachedValue| -> u32 {
128                value.data.len().min(u32::MAX as usize) as u32
129            })
130            .expire_after(HotStoreExpiry { default_ttl })
131            .build();
132
133        Self {
134            cache,
135            hit_count: Arc::new(AtomicU64::new(0)),
136            miss_count: Arc::new(AtomicU64::new(0)),
137            max_size,
138            eviction_policy,
139            default_ttl,
140        }
141    }
142
143    // --- Synchronous API (Fast path) ---
144
145    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
146        self.get_ref(key).map(|arc| (*arc).clone())
147    }
148
149    pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
150        match self.cache.get(key) {
151            Some(value) => {
152                self.hit_count.fetch_add(1, Ordering::Relaxed);
153                Some(value.data)
154            }
155            None => {
156                self.miss_count.fetch_add(1, Ordering::Relaxed);
157                None
158            }
159        }
160    }
161
162    pub fn set(&self, key: Arc<String>, value: Arc<Vec<u8>>, ttl: Option<Duration>) {
163        let cached = CachedValue {
164            data: value,
165            ttl,
166        };
167        // Moka requires String keys, so we deref the Arc<String>. 
168        // This is a small clone (just the key string), unavoidable with Moka, but cheap.
169        self.cache.insert(key.to_string(), cached);
170    }
171
172    // --- Async API Compatibility ---
173    // Since moka::sync is thread-safe and fast (memory only),
174    // we can call it directly in async blocks without spawn_blocking
175    // unless you have massive eviction callbacks.
176
177    pub async fn get_async(&self, key: &str) -> Option<Arc<Vec<u8>>> {
178        // Direct call to underlying sync cache is safe here
179        self.get_ref(key)
180    }
181
182    pub async fn set_async(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
183        self.set(Arc::new(key), Arc::new(value), ttl)
184    }
185
186    // --- Maintenance & Stats ---
187
188    pub fn is_hot(&self, key: &str) -> bool {
189        self.cache.contains_key(key)
190    }
191
192    pub fn delete(&self, key: &str) {
193        self.cache.invalidate(key);
194    }
195
196    pub fn get_stats(&self) -> CacheStats {
197        let hits = self.hit_count.load(Ordering::Relaxed);
198        let misses = self.miss_count.load(Ordering::Relaxed);
199        let total = hits + misses;
200
201        // Note: sync cache uses run_pending_tasks too for exact stats,
202        // but it also does maintenance on reads/writes automatically.
203        CacheStats {
204            item_count: self.cache.entry_count(),
205            memory_usage: self.cache.weighted_size(),
206            hit_count: hits,
207            miss_count: misses,
208            hit_ratio: if total == 0 {
209                0.0
210            } else {
211                hits as f64 / total as f64
212            },
213            weighted_size: self.cache.weighted_size(),
214        }
215    }
216
217    pub fn clear(&self) {
218        self.cache.invalidate_all();
219    }
220
221    pub fn hit_ratio(&self) -> f64 {
222        let hits = self.hit_count.load(Ordering::Relaxed);
223        let misses = self.miss_count.load(Ordering::Relaxed);
224        let total = hits + misses;
225        if total == 0 {
226            0.0
227        } else {
228            hits as f64 / total as f64
229        }
230    }
231
232    /// Explicitly run maintenance tasks (eviction, etc).
233    /// Moka sync does this automatically on access, but this can be called
234    /// by a background thread if the cache is idle.
235    pub fn sync(&self) {
236        self.cache.run_pending_tasks();
237    }
238
239    pub fn max_size(&self) -> u64 {
240        self.max_size
241    }
242
243    pub fn eviction_policy(&self) -> EvictionPolicy {
244        self.eviction_policy
245    }
246
247    pub fn default_ttl(&self) -> Duration {
248        self.default_ttl
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_basic_get_set() {
258        let store = HotStore::new_with_size_limit(1);
259        store.set(Arc::new("key1".to_string()), Arc::new(vec![1, 2, 3, 4]), None);
260
261        let result = store.get("key1");
262        assert!(result.is_some());
263        assert_eq!(result.unwrap(), vec![1, 2, 3, 4]);
264    }
265
266    #[test]
267    fn test_cache_miss() {
268        let store = HotStore::new();
269        let result = store.get("nonexistent");
270        assert!(result.is_none());
271        assert_eq!(store.get_stats().miss_count, 1);
272    }
273
274    #[tokio::test]
275    async fn test_async_operations() {
276        let store = HotStore::new();
277        // Verify the async wrappers work on the sync cache
278        store
279            .set_async("key1".to_string(), vec![1, 2, 3], None)
280            .await;
281
282        let result = store.get_async("key1").await;
283        assert!(result.is_some());
284        assert_eq!(*result.unwrap(), vec![1, 2, 3]);
285    }
286
287    #[test]
288    fn test_is_hot() {
289        let store = HotStore::new();
290        store.set(Arc::new("key1".to_string()), Arc::new(vec![1]), None);
291        assert!(store.is_hot("key1"));
292        assert!(!store.is_hot("key2"));
293    }
294}