aurora_db/storage/
hot.rs

1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, SystemTime};
5use tokio::time::interval;
6
7#[derive(Debug)]
8pub struct CacheStats {
9    pub item_count: usize,
10    pub memory_usage: usize,
11    pub hit_count: u64,
12    pub miss_count: u64,
13    pub hit_ratio: f64,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum EvictionPolicy {
18    LRU,
19    LFU,
20    Hybrid,
21}
22
23struct ValueMetadata {
24    data: Arc<Vec<u8>>,
25    expires_at: Option<SystemTime>,
26    last_accessed: SystemTime,
27}
28
29pub struct HotStore {
30    data: Arc<DashMap<String, ValueMetadata>>,
31    access_count: Arc<DashMap<String, u64>>,
32    hit_count: Arc<AtomicU64>,
33    miss_count: Arc<AtomicU64>,
34    max_size: usize,
35    current_size: Arc<AtomicUsize>,
36    eviction_policy: EvictionPolicy,
37    eviction_lock: Arc<Mutex<()>>,
38}
39
40impl Clone for HotStore {
41    fn clone(&self) -> Self {
42        Self {
43            data: Arc::clone(&self.data),
44            access_count: Arc::clone(&self.access_count),
45            hit_count: Arc::clone(&self.hit_count),
46            miss_count: Arc::clone(&self.miss_count),
47            max_size: self.max_size,
48            current_size: Arc::clone(&self.current_size),
49            eviction_policy: self.eviction_policy,
50            eviction_lock: Arc::clone(&self.eviction_lock),
51        }
52    }
53}
54
55impl HotStore {
56    pub fn new() -> Self {
57        Self::new_with_size_limit(128)
58    }
59
60    pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
61        Self::with_config_and_eviction(
62            cache_size_mb,
63            _cleanup_interval_secs,
64            EvictionPolicy::Hybrid,
65        )
66    }
67
68    pub fn with_config_and_eviction(
69        cache_size_mb: usize,
70        _cleanup_interval_secs: u64,
71        eviction_policy: EvictionPolicy,
72    ) -> Self {
73        Self {
74            data: Arc::new(DashMap::new()),
75            access_count: Arc::new(DashMap::new()),
76            hit_count: Arc::new(AtomicU64::new(0)),
77            miss_count: Arc::new(AtomicU64::new(0)),
78            max_size: cache_size_mb.checked_mul(1024 * 1024).unwrap_or(usize::MAX),
79            current_size: Arc::new(AtomicUsize::new(0)),
80            eviction_policy,
81            eviction_lock: Arc::new(Mutex::new(())),
82        }
83    }
84
85    pub fn new_with_size_limit(max_size_mb: usize) -> Self {
86        Self {
87            data: Arc::new(DashMap::new()),
88            access_count: Arc::new(DashMap::new()),
89            hit_count: Arc::new(AtomicU64::new(0)),
90            miss_count: Arc::new(AtomicU64::new(0)),
91            max_size: max_size_mb.checked_mul(1024 * 1024).unwrap_or(usize::MAX),
92            current_size: Arc::new(AtomicUsize::new(0)),
93            eviction_policy: EvictionPolicy::Hybrid,
94            eviction_lock: Arc::new(Mutex::new(())),
95        }
96    }
97
98    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
99        self.get_ref(key).map(|arc| (*arc).clone())
100    }
101
102    pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
103        self.increment_access(key);
104
105        if let Some(mut entry) = self.data.get_mut(key) {
106            if let Some(expires_at) = entry.expires_at {
107                if SystemTime::now() > expires_at {
108                    self.data.remove(key);
109                    self.access_count.remove(key);
110                    let size = entry.data.len();
111                    self.current_size.fetch_sub(size, Ordering::Relaxed);
112                    self.miss_count.fetch_add(1, Ordering::Relaxed);
113                    return None;
114                }
115            }
116
117            entry.last_accessed = SystemTime::now();
118            self.hit_count.fetch_add(1, Ordering::Relaxed);
119            Some(Arc::clone(&entry.data))
120        } else {
121            self.miss_count.fetch_add(1, Ordering::Relaxed);
122            None
123        }
124    }
125
126    pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
127        let value_size = value.len();
128
129        {
130            let _guard = self.eviction_lock.lock().unwrap();
131            let current_size = self.current_size.load(Ordering::Relaxed);
132            if current_size + value_size > self.max_size {
133                match self.eviction_policy {
134                    EvictionPolicy::LRU => {
135                        let bytes_to_free = (current_size + value_size) - self.max_size;
136                        self.cleanup_by_lru(bytes_to_free);
137                    }
138                    EvictionPolicy::LFU => {
139                        let bytes_to_free = (current_size + value_size) - self.max_size;
140                        self.evict_least_frequently_used(bytes_to_free);
141                    }
142                    EvictionPolicy::Hybrid => self.evict_hybrid(value_size),
143                }
144            }
145        }
146
147        let expires_at = ttl.map(|duration| SystemTime::now() + duration);
148
149        let metadata = ValueMetadata {
150            data: Arc::new(value),
151            expires_at,
152            last_accessed: SystemTime::now(),
153        };
154
155        let old_value = self.data.insert(key, metadata);
156        let old_size = old_value.map_or(0, |v| v.data.len());
157
158        if value_size > old_size {
159            self.current_size
160                .fetch_add(value_size - old_size, Ordering::Relaxed);
161        } else {
162            self.current_size
163                .fetch_sub(old_size - value_size, Ordering::Relaxed);
164        }
165    }
166
167    fn increment_access(&self, key: &str) {
168        self.access_count
169            .entry(key.to_string())
170            .and_modify(|count| *count += 1)
171            .or_insert(1);
172    }
173
174    pub fn is_hot(&self, key: &str) -> bool {
175        self.access_count
176            .get(key)
177            .map(|count| *count > 10)
178            .unwrap_or(false)
179    }
180
181    fn cleanup_expired(&self) {
182        let now = SystemTime::now();
183        let mut total_freed = 0;
184
185        let expired_keys: Vec<String> = self
186            .data
187            .iter()
188            .filter_map(|entry| {
189                if let Some(expires_at) = entry.expires_at {
190                    if expires_at <= now {
191                        Some(entry.key().clone())
192                    } else {
193                        None
194                    }
195                } else {
196                    None
197                }
198            })
199            .collect();
200
201        for key in expired_keys {
202            if let Some(entry) = self.data.remove(&key) {
203                total_freed += entry.1.data.len();
204            }
205        }
206
207        if total_freed > 0 {
208            self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
209        }
210    }
211
212    fn cleanup_by_lru(&self, bytes_to_free: usize) {
213        let mut entries: Vec<_> = self
214            .data
215            .iter()
216            .map(|entry| (entry.key().clone(), entry.last_accessed))
217            .collect();
218
219        entries.sort_by_key(|e| e.1);
220
221        let mut freed = 0;
222        for (key, _) in entries {
223            if freed >= bytes_to_free {
224                break;
225            }
226
227            if let Some(removed) = self.data.remove(&key) {
228                freed += removed.1.data.len();
229                self.access_count.remove(&key);
230            }
231        }
232
233        if freed > 0 {
234            self.current_size.fetch_sub(freed, Ordering::Relaxed);
235        }
236    }
237
238    fn evict_least_frequently_used(&self, bytes_to_free: usize) {
239        let mut entries: Vec<_> = self
240            .access_count
241            .iter()
242            .map(|e| (e.key().clone(), *e.value()))
243            .collect();
244
245        entries.sort_by_key(|e| e.1);
246
247        let mut freed = 0;
248        for (key, _) in entries {
249            if freed >= bytes_to_free {
250                break;
251            }
252
253            if let Some(value) = self.data.remove(&key) {
254                freed += value.1.data.len();
255                self.access_count.remove(&key);
256            }
257        }
258
259        if freed > 0 {
260            self.current_size.fetch_sub(freed, Ordering::Relaxed);
261        }
262    }
263
264    fn evict_hybrid(&self, item_size: usize) {
265        self.cleanup_expired();
266
267        let current = self.current_size.load(Ordering::Relaxed);
268        if current + item_size <= self.max_size {
269            return;
270        }
271
272        let bytes_to_free = (current + item_size) - self.max_size;
273        self.evict_least_frequently_used(bytes_to_free);
274
275        let current = self.current_size.load(Ordering::Relaxed);
276        if current + item_size > self.max_size {
277            let more_bytes_to_free = (current + item_size) - self.max_size;
278            self.cleanup_by_lru(more_bytes_to_free);
279        }
280    }
281
282    pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
283        let mut interval = interval(Duration::from_secs(interval_secs));
284
285        tokio::spawn(async move {
286            loop {
287                interval.tick().await;
288                self.cleanup_expired();
289            }
290        });
291    }
292
293    pub fn delete(&self, key: &str) {
294        if let Some(entry) = self.data.remove(key) {
295            let size = entry.1.data.len();
296            self.current_size.fetch_sub(size, Ordering::Relaxed);
297        }
298    }
299
300    pub fn get_stats(&self) -> CacheStats {
301        let hits = self.hit_count.load(Ordering::Relaxed);
302        let misses = self.miss_count.load(Ordering::Relaxed);
303        let total = hits + misses;
304
305        let hit_ratio = if total == 0 {
306            0.0
307        } else {
308            hits as f64 / total as f64
309        };
310
311        CacheStats {
312            item_count: self.data.len(),
313            memory_usage: self.current_size.load(Ordering::Relaxed),
314            hit_count: hits,
315            miss_count: misses,
316            hit_ratio,
317        }
318    }
319
320    pub fn clear(&self) {
321        self.data.clear();
322        self.access_count.clear();
323        self.current_size.store(0, Ordering::Relaxed);
324    }
325
326    pub fn hit_ratio(&self) -> f64 {
327        let hits = self.hit_count.load(Ordering::Relaxed);
328        let misses = self.miss_count.load(Ordering::Relaxed);
329        let total = hits + misses;
330
331        if total == 0 {
332            0.0
333        } else {
334            hits as f64 / total as f64
335        }
336    }
337}