aurora_db/storage/
hot.rs

1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, SystemTime};
5use tokio::time::interval;
6
7#[derive(Debug)]
8pub struct CacheStats {
9    pub item_count: usize,
10    pub memory_usage: usize,
11    pub hit_count: u64,
12    pub miss_count: u64,
13    pub hit_ratio: f64,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum EvictionPolicy {
18    LRU,
19    LFU,
20    Hybrid,
21}
22
23struct ValueMetadata {
24    data: Arc<Vec<u8>>,
25    expires_at: Option<SystemTime>,
26    last_accessed: SystemTime,
27}
28
29pub struct HotStore {
30    data: Arc<DashMap<String, ValueMetadata>>,
31    access_count: Arc<DashMap<String, u64>>,
32    hit_count: Arc<AtomicU64>,
33    miss_count: Arc<AtomicU64>,
34    max_size: usize,
35    current_size: Arc<AtomicUsize>,
36    eviction_policy: EvictionPolicy,
37    eviction_lock: Arc<Mutex<()>>,
38}
39
40impl Clone for HotStore {
41    fn clone(&self) -> Self {
42        Self {
43            data: Arc::clone(&self.data),
44            access_count: Arc::clone(&self.access_count),
45            hit_count: Arc::clone(&self.hit_count),
46            miss_count: Arc::clone(&self.miss_count),
47            max_size: self.max_size,
48            current_size: Arc::clone(&self.current_size),
49            eviction_policy: self.eviction_policy,
50            eviction_lock: Arc::clone(&self.eviction_lock),
51        }
52    }
53}
54
55impl Default for HotStore {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl HotStore {
62    pub fn new() -> Self {
63        Self::new_with_size_limit(128)
64    }
65
66    pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
67        Self::with_config_and_eviction(
68            cache_size_mb,
69            _cleanup_interval_secs,
70            EvictionPolicy::Hybrid,
71        )
72    }
73
74    pub fn with_config_and_eviction(
75        cache_size_mb: usize,
76        _cleanup_interval_secs: u64,
77        eviction_policy: EvictionPolicy,
78    ) -> Self {
79        Self {
80            data: Arc::new(DashMap::new()),
81            access_count: Arc::new(DashMap::new()),
82            hit_count: Arc::new(AtomicU64::new(0)),
83            miss_count: Arc::new(AtomicU64::new(0)),
84            max_size: cache_size_mb.saturating_mul(1024 * 1024),
85            current_size: Arc::new(AtomicUsize::new(0)),
86            eviction_policy,
87            eviction_lock: Arc::new(Mutex::new(())),
88        }
89    }
90
91    pub fn new_with_size_limit(max_size_mb: usize) -> Self {
92        Self {
93            data: Arc::new(DashMap::new()),
94            access_count: Arc::new(DashMap::new()),
95            hit_count: Arc::new(AtomicU64::new(0)),
96            miss_count: Arc::new(AtomicU64::new(0)),
97            max_size: max_size_mb.saturating_mul(1024 * 1024),
98            current_size: Arc::new(AtomicUsize::new(0)),
99            eviction_policy: EvictionPolicy::Hybrid,
100            eviction_lock: Arc::new(Mutex::new(())),
101        }
102    }
103
104    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
105        self.get_ref(key).map(|arc| (*arc).clone())
106    }
107
108    pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
109        self.increment_access(key);
110
111        if let Some(mut entry) = self.data.get_mut(key) {
112            if let Some(expires_at) = entry.expires_at
113                && SystemTime::now() > expires_at {
114                    self.data.remove(key);
115                    self.access_count.remove(key);
116                    let size = entry.data.len();
117                    self.current_size.fetch_sub(size, Ordering::Relaxed);
118                    self.miss_count.fetch_add(1, Ordering::Relaxed);
119                    return None;
120                }
121
122            entry.last_accessed = SystemTime::now();
123            self.hit_count.fetch_add(1, Ordering::Relaxed);
124            Some(Arc::clone(&entry.data))
125        } else {
126            self.miss_count.fetch_add(1, Ordering::Relaxed);
127            None
128        }
129    }
130
131    pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
132        let value_size = value.len();
133
134        {
135            let _guard = self.eviction_lock.lock().unwrap();
136            let current_size = self.current_size.load(Ordering::Relaxed);
137            if current_size + value_size > self.max_size {
138                match self.eviction_policy {
139                    EvictionPolicy::LRU => {
140                        let bytes_to_free = (current_size + value_size) - self.max_size;
141                        self.cleanup_by_lru(bytes_to_free);
142                    }
143                    EvictionPolicy::LFU => {
144                        let bytes_to_free = (current_size + value_size) - self.max_size;
145                        self.evict_least_frequently_used(bytes_to_free);
146                    }
147                    EvictionPolicy::Hybrid => self.evict_hybrid(value_size),
148                }
149            }
150        }
151
152        let expires_at = ttl.map(|duration| SystemTime::now() + duration);
153
154        let metadata = ValueMetadata {
155            data: Arc::new(value),
156            expires_at,
157            last_accessed: SystemTime::now(),
158        };
159
160        let old_value = self.data.insert(key, metadata);
161        let old_size = old_value.map_or(0, |v| v.data.len());
162
163        if value_size > old_size {
164            self.current_size
165                .fetch_add(value_size - old_size, Ordering::Relaxed);
166        } else {
167            self.current_size
168                .fetch_sub(old_size - value_size, Ordering::Relaxed);
169        }
170    }
171
172    fn increment_access(&self, key: &str) {
173        self.access_count
174            .entry(key.to_string())
175            .and_modify(|count| *count += 1)
176            .or_insert(1);
177    }
178
179    pub fn is_hot(&self, key: &str) -> bool {
180        self.access_count
181            .get(key)
182            .map(|count| *count > 10)
183            .unwrap_or(false)
184    }
185
186    fn cleanup_expired(&self) {
187        let now = SystemTime::now();
188        let mut total_freed = 0;
189
190        let expired_keys: Vec<String> = self
191            .data
192            .iter()
193            .filter_map(|entry| {
194                if let Some(expires_at) = entry.expires_at {
195                    if expires_at <= now {
196                        Some(entry.key().clone())
197                    } else {
198                        None
199                    }
200                } else {
201                    None
202                }
203            })
204            .collect();
205
206        for key in expired_keys {
207            if let Some(entry) = self.data.remove(&key) {
208                total_freed += entry.1.data.len();
209            }
210        }
211
212        if total_freed > 0 {
213            self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
214        }
215    }
216
217    fn cleanup_by_lru(&self, bytes_to_free: usize) {
218        let mut entries: Vec<_> = self
219            .data
220            .iter()
221            .map(|entry| (entry.key().clone(), entry.last_accessed))
222            .collect();
223
224        entries.sort_by_key(|e| e.1);
225
226        let mut freed = 0;
227        for (key, _) in entries {
228            if freed >= bytes_to_free {
229                break;
230            }
231
232            if let Some(removed) = self.data.remove(&key) {
233                freed += removed.1.data.len();
234                self.access_count.remove(&key);
235            }
236        }
237
238        if freed > 0 {
239            self.current_size.fetch_sub(freed, Ordering::Relaxed);
240        }
241    }
242
243    fn evict_least_frequently_used(&self, bytes_to_free: usize) {
244        let mut entries: Vec<_> = self
245            .access_count
246            .iter()
247            .map(|e| (e.key().clone(), *e.value()))
248            .collect();
249
250        entries.sort_by_key(|e| e.1);
251
252        let mut freed = 0;
253        for (key, _) in entries {
254            if freed >= bytes_to_free {
255                break;
256            }
257
258            if let Some(value) = self.data.remove(&key) {
259                freed += value.1.data.len();
260                self.access_count.remove(&key);
261            }
262        }
263
264        if freed > 0 {
265            self.current_size.fetch_sub(freed, Ordering::Relaxed);
266        }
267    }
268
269    fn evict_hybrid(&self, item_size: usize) {
270        self.cleanup_expired();
271
272        let current = self.current_size.load(Ordering::Relaxed);
273        if current + item_size <= self.max_size {
274            return;
275        }
276
277        let bytes_to_free = (current + item_size) - self.max_size;
278        self.evict_least_frequently_used(bytes_to_free);
279
280        let current = self.current_size.load(Ordering::Relaxed);
281        if current + item_size > self.max_size {
282            let more_bytes_to_free = (current + item_size) - self.max_size;
283            self.cleanup_by_lru(more_bytes_to_free);
284        }
285    }
286
287    pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
288        let mut interval = interval(Duration::from_secs(interval_secs));
289
290        tokio::spawn(async move {
291            loop {
292                interval.tick().await;
293                self.cleanup_expired();
294            }
295        });
296    }
297
298    pub fn delete(&self, key: &str) {
299        if let Some(entry) = self.data.remove(key) {
300            let size = entry.1.data.len();
301            self.current_size.fetch_sub(size, Ordering::Relaxed);
302        }
303    }
304
305    pub fn get_stats(&self) -> CacheStats {
306        let hits = self.hit_count.load(Ordering::Relaxed);
307        let misses = self.miss_count.load(Ordering::Relaxed);
308        let total = hits + misses;
309
310        let hit_ratio = if total == 0 {
311            0.0
312        } else {
313            hits as f64 / total as f64
314        };
315
316        CacheStats {
317            item_count: self.data.len(),
318            memory_usage: self.current_size.load(Ordering::Relaxed),
319            hit_count: hits,
320            miss_count: misses,
321            hit_ratio,
322        }
323    }
324
325    pub fn clear(&self) {
326        self.data.clear();
327        self.access_count.clear();
328        self.current_size.store(0, Ordering::Relaxed);
329    }
330
331    pub fn hit_ratio(&self) -> f64 {
332        let hits = self.hit_count.load(Ordering::Relaxed);
333        let misses = self.miss_count.load(Ordering::Relaxed);
334        let total = hits + misses;
335
336        if total == 0 {
337            0.0
338        } else {
339            hits as f64 / total as f64
340        }
341    }
342}