aurora_db/storage/
hot.rs

1use dashmap::DashMap;
2use std::sync::Arc;
3use std::time::{Duration, SystemTime};
4use tokio::time::interval;
5use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
6
7// Cache statistics structure
8#[derive(Debug)]
9pub struct CacheStats {
10     pub item_count: usize,
11    pub memory_usage: usize,
12    pub hit_count: u64,
13    pub miss_count: u64,
14    pub hit_ratio: f64,
15}
16
17// Metadata for each cached item
18struct ValueMetadata {
19    data: Vec<u8>,
20    expires_at: Option<SystemTime>,
21    last_accessed: SystemTime,
22}
23
24pub struct HotStore {
25    data: Arc<DashMap<String, ValueMetadata>>,
26    access_count: Arc<DashMap<String, u64>>,
27    hit_count: Arc<AtomicU64>,
28    miss_count: Arc<AtomicU64>,
29    max_size: usize,  // Maximum size in bytes
30    current_size: Arc<AtomicUsize>,
31}
32
33// Implement Clone for HotStore
34impl Clone for HotStore {
35    fn clone(&self) -> Self {
36        Self {
37            data: Arc::clone(&self.data),
38            access_count: Arc::clone(&self.access_count),
39            hit_count: Arc::clone(&self.hit_count),
40            miss_count: Arc::clone(&self.miss_count),
41            max_size: self.max_size,
42            current_size: Arc::clone(&self.current_size),
43        }
44    }
45}
46
47impl HotStore {
48    pub fn new() -> Self {
49        Self::new_with_size_limit(128) // Default to 128MB
50    }
51
52    pub fn with_config(
53        cache_size_mb: usize,
54        _cleanup_interval_secs: u64,
55    ) -> Self {
56        Self {
57            data: Arc::new(DashMap::new()),
58            access_count: Arc::new(DashMap::new()),
59            hit_count: Arc::new(AtomicU64::new(0)),
60            miss_count: Arc::new(AtomicU64::new(0)),
61            max_size: cache_size_mb * 1024 * 1024,
62            current_size: Arc::new(AtomicUsize::new(0)),
63        }
64    }
65
66    pub fn new_with_size_limit(max_size_mb: usize) -> Self {
67        Self {
68            data: Arc::new(DashMap::new()),
69            access_count: Arc::new(DashMap::new()),
70            hit_count: Arc::new(AtomicU64::new(0)),
71            miss_count: Arc::new(AtomicU64::new(0)),
72            max_size: max_size_mb * 1024 * 1024,
73            current_size: Arc::new(AtomicUsize::new(0)),
74        }
75    }
76
77    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
78        self.increment_access(key);
79        
80        if let Some(mut entry) = self.data.get_mut(key) {
81            // Check if expired
82            if let Some(expires_at) = entry.expires_at {
83                if SystemTime::now() > expires_at {
84                    self.data.remove(key);
85                    let size = entry.data.len();
86                    self.current_size.fetch_sub(size, Ordering::Relaxed);
87                    self.miss_count.fetch_add(1, Ordering::Relaxed);
88                    return None;
89                }
90            }
91            
92            // Update last accessed time
93            entry.last_accessed = SystemTime::now();
94            self.hit_count.fetch_add(1, Ordering::Relaxed);
95            Some(entry.data.clone())
96        } else {
97            self.miss_count.fetch_add(1, Ordering::Relaxed);
98            None
99        }
100    }
101
102    // Get with Arc to allow zero-copy when possible
103    pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
104        self.increment_access(key);
105        
106        if let Some(mut entry) = self.data.get_mut(key) {
107            // Check if expired
108            if let Some(expires_at) = entry.expires_at {
109                if SystemTime::now() > expires_at {
110                    self.data.remove(key);
111                    let size = entry.data.len();
112                    self.current_size.fetch_sub(size, Ordering::Relaxed);
113                    self.miss_count.fetch_add(1, Ordering::Relaxed);
114                    return None;
115                }
116            }
117            
118            // Update last accessed time
119            entry.last_accessed = SystemTime::now();
120            self.hit_count.fetch_add(1, Ordering::Relaxed);
121            Some(Arc::new(entry.data.clone()))
122        } else {
123            self.miss_count.fetch_add(1, Ordering::Relaxed);
124            None
125        }
126    }
127
128    pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
129        let value_size = value.len();
130        
131        // Check if we need to evict items
132        let new_size = self.current_size.load(Ordering::Relaxed)
133            .saturating_add(value_size);
134            
135        if new_size > self.max_size {
136            self.evict_least_used(new_size.saturating_sub(self.max_size));
137        }
138        
139        let expires_at = ttl.map(|duration| SystemTime::now() + duration);
140        
141        // Update size tracking
142        if let Some(old_value) = self.data.get(&key) {
143            let old_size = old_value.data.len();
144            self.current_size.fetch_sub(old_size, Ordering::Relaxed);
145        }
146        
147        let metadata = ValueMetadata {
148            data: value,
149            expires_at,
150            last_accessed: SystemTime::now(),
151        };
152        
153        self.data.insert(key, metadata);
154        self.current_size.fetch_add(value_size, Ordering::Relaxed);
155    }
156
157    fn increment_access(&self, key: &str) {
158        self.access_count
159            .entry(key.to_string())
160            .and_modify(|count| *count += 1)
161            .or_insert(1);
162    }
163
164    pub fn is_hot(&self, key: &str) -> bool {
165        self.access_count
166            .get(key)
167            .map(|count| *count > 10)
168            .unwrap_or(false)
169    }
170
171    // Clean up expired items
172    fn cleanup_expired(&self) {
173        let now = SystemTime::now();
174        let mut total_freed = 0;
175        
176        // Clean up expired items
177        let expired_keys: Vec<String> = self.data
178            .iter()
179            .filter_map(|entry| {
180                if let Some(expires_at) = entry.expires_at {
181                    if expires_at <= now {
182                        Some(entry.key().clone())
183                    } else {
184                        None
185                    }
186                } else {
187                    None
188                }
189            })
190            .collect();
191            
192        for key in expired_keys {
193            if let Some(entry) = self.data.remove(&key) {
194                total_freed += entry.1.data.len();
195            }
196        }
197        
198        if total_freed > 0 {
199            self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
200        }
201    }
202
203    // Clean up using LRU when cache is full
204    fn cleanup_by_lru(&self, bytes_to_free: usize) {
205        let mut entries: Vec<_> = self.data
206            .iter()
207            .map(|entry| (entry.key().clone(), entry.last_accessed))
208            .collect();
209        
210        // Sort by last accessed time (oldest first)
211        entries.sort_by_key(|e| e.1);
212        
213        let mut freed = 0;
214        for (key, _) in entries {
215            if freed >= bytes_to_free {
216                break;
217            }
218            
219            if let Some(removed) = self.data.remove(&key) {
220                freed += removed.1.data.len();
221            }
222        }
223        
224        if freed > 0 {
225            self.current_size.fetch_sub(freed, Ordering::Relaxed);
226        }
227    }
228
229    // Evict least frequently used items
230    fn evict_least_used(&self, bytes_to_free: usize) {
231        // First try to clean up expired items
232        self.cleanup_expired();
233        
234        // If we've freed enough, we're done
235        if self.current_size.load(Ordering::Relaxed) + bytes_to_free <= self.max_size {
236            return;
237        }
238        
239        // Otherwise, evict by access count
240        let mut entries: Vec<_> = self.access_count
241            .iter()
242            .map(|e| (e.key().clone(), *e.value()))
243            .collect();
244        
245        entries.sort_by_key(|e| e.1);
246        
247        let mut freed = 0;
248        for (key, _) in entries {
249            if freed >= bytes_to_free {
250                break;
251            }
252            
253            if let Some(value) = self.data.remove(&key) {
254                freed += value.1.data.len();
255            }
256        }
257        
258        if freed > 0 {
259            self.current_size.fetch_sub(freed, Ordering::Relaxed);
260        }
261        
262        // If we still need to free more, use LRU approach
263        if self.current_size.load(Ordering::Relaxed) + bytes_to_free > self.max_size {
264            self.cleanup_by_lru(bytes_to_free.saturating_sub(freed));
265        }
266    }
267
268    pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
269        let mut interval = interval(Duration::from_secs(interval_secs));
270        
271        tokio::spawn(async move {
272            loop {
273                interval.tick().await;
274                self.cleanup_expired();
275            }
276        });
277    }
278
279    pub fn delete(&self, key: &str) {
280        if let Some(entry) = self.data.remove(key) {
281            let size = entry.1.data.len();
282            self.current_size.fetch_sub(size, Ordering::Relaxed);
283        }
284    }
285
286    // Get statistics about the cache
287    pub fn get_stats(&self) -> CacheStats {
288        let hits = self.hit_count.load(Ordering::Relaxed);
289        let misses = self.miss_count.load(Ordering::Relaxed);
290        let total = hits + misses;
291        
292        let hit_ratio = if total == 0 {
293            0.0
294        } else {
295            hits as f64 / total as f64
296        };
297        
298        CacheStats {
299            item_count: self.data.len(),
300            memory_usage: self.current_size.load(Ordering::Relaxed),
301            hit_count: hits,
302            miss_count: misses,
303            hit_ratio,
304        }
305    }
306
307    // Clear the cache
308    pub fn clear(&self) {
309        self.data.clear();
310        self.access_count.clear();
311        self.current_size.store(0, Ordering::Relaxed);
312    }
313
314    // Get hit ratio
315    pub fn hit_ratio(&self) -> f64 {
316        let hits = self.hit_count.load(Ordering::Relaxed);
317        let misses = self.miss_count.load(Ordering::Relaxed);
318        let total = hits + misses;
319        
320        if total == 0 {
321            0.0
322        } else {
323            hits as f64 / total as f64
324        }
325    }
326}