1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, SystemTime};
5use tokio::time::interval;
6
7#[derive(Debug)]
8pub struct CacheStats {
9 pub item_count: usize,
10 pub memory_usage: usize,
11 pub hit_count: u64,
12 pub miss_count: u64,
13 pub hit_ratio: f64,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum EvictionPolicy {
18 LRU,
19 LFU,
20 Hybrid,
21}
22
23struct ValueMetadata {
24 data: Arc<Vec<u8>>,
25 expires_at: Option<SystemTime>,
26 last_accessed: SystemTime,
27}
28
29pub struct HotStore {
30 data: Arc<DashMap<String, ValueMetadata>>,
31 access_count: Arc<DashMap<String, u64>>,
32 hit_count: Arc<AtomicU64>,
33 miss_count: Arc<AtomicU64>,
34 max_size: usize,
35 current_size: Arc<AtomicUsize>,
36 eviction_policy: EvictionPolicy,
37 eviction_lock: Arc<Mutex<()>>,
38}
39
40impl Clone for HotStore {
41 fn clone(&self) -> Self {
42 Self {
43 data: Arc::clone(&self.data),
44 access_count: Arc::clone(&self.access_count),
45 hit_count: Arc::clone(&self.hit_count),
46 miss_count: Arc::clone(&self.miss_count),
47 max_size: self.max_size,
48 current_size: Arc::clone(&self.current_size),
49 eviction_policy: self.eviction_policy,
50 eviction_lock: Arc::clone(&self.eviction_lock),
51 }
52 }
53}
54
55impl HotStore {
56 pub fn new() -> Self {
57 Self::new_with_size_limit(128)
58 }
59
60 pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
61 Self::with_config_and_eviction(
62 cache_size_mb,
63 _cleanup_interval_secs,
64 EvictionPolicy::Hybrid,
65 )
66 }
67
68 pub fn with_config_and_eviction(
69 cache_size_mb: usize,
70 _cleanup_interval_secs: u64,
71 eviction_policy: EvictionPolicy,
72 ) -> Self {
73 Self {
74 data: Arc::new(DashMap::new()),
75 access_count: Arc::new(DashMap::new()),
76 hit_count: Arc::new(AtomicU64::new(0)),
77 miss_count: Arc::new(AtomicU64::new(0)),
78 max_size: cache_size_mb.checked_mul(1024 * 1024).unwrap_or(usize::MAX),
79 current_size: Arc::new(AtomicUsize::new(0)),
80 eviction_policy,
81 eviction_lock: Arc::new(Mutex::new(())),
82 }
83 }
84
85 pub fn new_with_size_limit(max_size_mb: usize) -> Self {
86 Self {
87 data: Arc::new(DashMap::new()),
88 access_count: Arc::new(DashMap::new()),
89 hit_count: Arc::new(AtomicU64::new(0)),
90 miss_count: Arc::new(AtomicU64::new(0)),
91 max_size: max_size_mb.checked_mul(1024 * 1024).unwrap_or(usize::MAX),
92 current_size: Arc::new(AtomicUsize::new(0)),
93 eviction_policy: EvictionPolicy::Hybrid,
94 eviction_lock: Arc::new(Mutex::new(())),
95 }
96 }
97
98 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
99 self.get_ref(key).map(|arc| (*arc).clone())
100 }
101
102 pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
103 self.increment_access(key);
104
105 if let Some(mut entry) = self.data.get_mut(key) {
106 if let Some(expires_at) = entry.expires_at {
107 if SystemTime::now() > expires_at {
108 self.data.remove(key);
109 self.access_count.remove(key);
110 let size = entry.data.len();
111 self.current_size.fetch_sub(size, Ordering::Relaxed);
112 self.miss_count.fetch_add(1, Ordering::Relaxed);
113 return None;
114 }
115 }
116
117 entry.last_accessed = SystemTime::now();
118 self.hit_count.fetch_add(1, Ordering::Relaxed);
119 Some(Arc::clone(&entry.data))
120 } else {
121 self.miss_count.fetch_add(1, Ordering::Relaxed);
122 None
123 }
124 }
125
126 pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
127 let value_size = value.len();
128
129 {
130 let _guard = self.eviction_lock.lock().unwrap();
131 let current_size = self.current_size.load(Ordering::Relaxed);
132 if current_size + value_size > self.max_size {
133 match self.eviction_policy {
134 EvictionPolicy::LRU => {
135 let bytes_to_free = (current_size + value_size) - self.max_size;
136 self.cleanup_by_lru(bytes_to_free);
137 }
138 EvictionPolicy::LFU => {
139 let bytes_to_free = (current_size + value_size) - self.max_size;
140 self.evict_least_frequently_used(bytes_to_free);
141 }
142 EvictionPolicy::Hybrid => self.evict_hybrid(value_size),
143 }
144 }
145 }
146
147 let expires_at = ttl.map(|duration| SystemTime::now() + duration);
148
149 let metadata = ValueMetadata {
150 data: Arc::new(value),
151 expires_at,
152 last_accessed: SystemTime::now(),
153 };
154
155 let old_value = self.data.insert(key, metadata);
156 let old_size = old_value.map_or(0, |v| v.data.len());
157
158 if value_size > old_size {
159 self.current_size
160 .fetch_add(value_size - old_size, Ordering::Relaxed);
161 } else {
162 self.current_size
163 .fetch_sub(old_size - value_size, Ordering::Relaxed);
164 }
165 }
166
167 fn increment_access(&self, key: &str) {
168 self.access_count
169 .entry(key.to_string())
170 .and_modify(|count| *count += 1)
171 .or_insert(1);
172 }
173
174 pub fn is_hot(&self, key: &str) -> bool {
175 self.access_count
176 .get(key)
177 .map(|count| *count > 10)
178 .unwrap_or(false)
179 }
180
181 fn cleanup_expired(&self) {
182 let now = SystemTime::now();
183 let mut total_freed = 0;
184
185 let expired_keys: Vec<String> = self
186 .data
187 .iter()
188 .filter_map(|entry| {
189 if let Some(expires_at) = entry.expires_at {
190 if expires_at <= now {
191 Some(entry.key().clone())
192 } else {
193 None
194 }
195 } else {
196 None
197 }
198 })
199 .collect();
200
201 for key in expired_keys {
202 if let Some(entry) = self.data.remove(&key) {
203 total_freed += entry.1.data.len();
204 }
205 }
206
207 if total_freed > 0 {
208 self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
209 }
210 }
211
212 fn cleanup_by_lru(&self, bytes_to_free: usize) {
213 let mut entries: Vec<_> = self
214 .data
215 .iter()
216 .map(|entry| (entry.key().clone(), entry.last_accessed))
217 .collect();
218
219 entries.sort_by_key(|e| e.1);
220
221 let mut freed = 0;
222 for (key, _) in entries {
223 if freed >= bytes_to_free {
224 break;
225 }
226
227 if let Some(removed) = self.data.remove(&key) {
228 freed += removed.1.data.len();
229 self.access_count.remove(&key);
230 }
231 }
232
233 if freed > 0 {
234 self.current_size.fetch_sub(freed, Ordering::Relaxed);
235 }
236 }
237
238 fn evict_least_frequently_used(&self, bytes_to_free: usize) {
239 let mut entries: Vec<_> = self
240 .access_count
241 .iter()
242 .map(|e| (e.key().clone(), *e.value()))
243 .collect();
244
245 entries.sort_by_key(|e| e.1);
246
247 let mut freed = 0;
248 for (key, _) in entries {
249 if freed >= bytes_to_free {
250 break;
251 }
252
253 if let Some(value) = self.data.remove(&key) {
254 freed += value.1.data.len();
255 self.access_count.remove(&key);
256 }
257 }
258
259 if freed > 0 {
260 self.current_size.fetch_sub(freed, Ordering::Relaxed);
261 }
262 }
263
264 fn evict_hybrid(&self, item_size: usize) {
265 self.cleanup_expired();
266
267 let current = self.current_size.load(Ordering::Relaxed);
268 if current + item_size <= self.max_size {
269 return;
270 }
271
272 let bytes_to_free = (current + item_size) - self.max_size;
273 self.evict_least_frequently_used(bytes_to_free);
274
275 let current = self.current_size.load(Ordering::Relaxed);
276 if current + item_size > self.max_size {
277 let more_bytes_to_free = (current + item_size) - self.max_size;
278 self.cleanup_by_lru(more_bytes_to_free);
279 }
280 }
281
282 pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
283 let mut interval = interval(Duration::from_secs(interval_secs));
284
285 tokio::spawn(async move {
286 loop {
287 interval.tick().await;
288 self.cleanup_expired();
289 }
290 });
291 }
292
293 pub fn delete(&self, key: &str) {
294 if let Some(entry) = self.data.remove(key) {
295 let size = entry.1.data.len();
296 self.current_size.fetch_sub(size, Ordering::Relaxed);
297 }
298 }
299
300 pub fn get_stats(&self) -> CacheStats {
301 let hits = self.hit_count.load(Ordering::Relaxed);
302 let misses = self.miss_count.load(Ordering::Relaxed);
303 let total = hits + misses;
304
305 let hit_ratio = if total == 0 {
306 0.0
307 } else {
308 hits as f64 / total as f64
309 };
310
311 CacheStats {
312 item_count: self.data.len(),
313 memory_usage: self.current_size.load(Ordering::Relaxed),
314 hit_count: hits,
315 miss_count: misses,
316 hit_ratio,
317 }
318 }
319
320 pub fn clear(&self) {
321 self.data.clear();
322 self.access_count.clear();
323 self.current_size.store(0, Ordering::Relaxed);
324 }
325
326 pub fn hit_ratio(&self) -> f64 {
327 let hits = self.hit_count.load(Ordering::Relaxed);
328 let misses = self.miss_count.load(Ordering::Relaxed);
329 let total = hits + misses;
330
331 if total == 0 {
332 0.0
333 } else {
334 hits as f64 / total as f64
335 }
336 }
337}