1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, SystemTime};
5use tokio::time::interval;
6
7#[derive(Debug)]
8pub struct CacheStats {
9 pub item_count: usize,
10 pub memory_usage: usize,
11 pub hit_count: u64,
12 pub miss_count: u64,
13 pub hit_ratio: f64,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum EvictionPolicy {
18 LRU,
19 LFU,
20 Hybrid,
21}
22
23struct ValueMetadata {
24 data: Arc<Vec<u8>>,
25 expires_at: Option<SystemTime>,
26 last_accessed: SystemTime,
27}
28
29pub struct HotStore {
30 data: Arc<DashMap<String, ValueMetadata>>,
31 access_count: Arc<DashMap<String, u64>>,
32 hit_count: Arc<AtomicU64>,
33 miss_count: Arc<AtomicU64>,
34 max_size: usize,
35 current_size: Arc<AtomicUsize>,
36 eviction_policy: EvictionPolicy,
37 eviction_lock: Arc<Mutex<()>>,
38}
39
40impl Clone for HotStore {
41 fn clone(&self) -> Self {
42 Self {
43 data: Arc::clone(&self.data),
44 access_count: Arc::clone(&self.access_count),
45 hit_count: Arc::clone(&self.hit_count),
46 miss_count: Arc::clone(&self.miss_count),
47 max_size: self.max_size,
48 current_size: Arc::clone(&self.current_size),
49 eviction_policy: self.eviction_policy,
50 eviction_lock: Arc::clone(&self.eviction_lock),
51 }
52 }
53}
54
55impl Default for HotStore {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl HotStore {
62 pub fn new() -> Self {
63 Self::new_with_size_limit(128)
64 }
65
66 pub fn with_config(cache_size_mb: usize, _cleanup_interval_secs: u64) -> Self {
67 Self::with_config_and_eviction(
68 cache_size_mb,
69 _cleanup_interval_secs,
70 EvictionPolicy::Hybrid,
71 )
72 }
73
74 pub fn with_config_and_eviction(
75 cache_size_mb: usize,
76 _cleanup_interval_secs: u64,
77 eviction_policy: EvictionPolicy,
78 ) -> Self {
79 Self {
80 data: Arc::new(DashMap::new()),
81 access_count: Arc::new(DashMap::new()),
82 hit_count: Arc::new(AtomicU64::new(0)),
83 miss_count: Arc::new(AtomicU64::new(0)),
84 max_size: cache_size_mb.saturating_mul(1024 * 1024),
85 current_size: Arc::new(AtomicUsize::new(0)),
86 eviction_policy,
87 eviction_lock: Arc::new(Mutex::new(())),
88 }
89 }
90
91 pub fn new_with_size_limit(max_size_mb: usize) -> Self {
92 Self {
93 data: Arc::new(DashMap::new()),
94 access_count: Arc::new(DashMap::new()),
95 hit_count: Arc::new(AtomicU64::new(0)),
96 miss_count: Arc::new(AtomicU64::new(0)),
97 max_size: max_size_mb.saturating_mul(1024 * 1024),
98 current_size: Arc::new(AtomicUsize::new(0)),
99 eviction_policy: EvictionPolicy::Hybrid,
100 eviction_lock: Arc::new(Mutex::new(())),
101 }
102 }
103
104 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
105 self.get_ref(key).map(|arc| (*arc).clone())
106 }
107
108 pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
109 self.increment_access(key);
110
111 if let Some(mut entry) = self.data.get_mut(key) {
112 if let Some(expires_at) = entry.expires_at
113 && SystemTime::now() > expires_at {
114 self.data.remove(key);
115 self.access_count.remove(key);
116 let size = entry.data.len();
117 self.current_size.fetch_sub(size, Ordering::Relaxed);
118 self.miss_count.fetch_add(1, Ordering::Relaxed);
119 return None;
120 }
121
122 entry.last_accessed = SystemTime::now();
123 self.hit_count.fetch_add(1, Ordering::Relaxed);
124 Some(Arc::clone(&entry.data))
125 } else {
126 self.miss_count.fetch_add(1, Ordering::Relaxed);
127 None
128 }
129 }
130
131 pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
132 let value_size = value.len();
133
134 {
135 let _guard = self.eviction_lock.lock().unwrap();
136 let current_size = self.current_size.load(Ordering::Relaxed);
137 if current_size + value_size > self.max_size {
138 match self.eviction_policy {
139 EvictionPolicy::LRU => {
140 let bytes_to_free = (current_size + value_size) - self.max_size;
141 self.cleanup_by_lru(bytes_to_free);
142 }
143 EvictionPolicy::LFU => {
144 let bytes_to_free = (current_size + value_size) - self.max_size;
145 self.evict_least_frequently_used(bytes_to_free);
146 }
147 EvictionPolicy::Hybrid => self.evict_hybrid(value_size),
148 }
149 }
150 }
151
152 let expires_at = ttl.map(|duration| SystemTime::now() + duration);
153
154 let metadata = ValueMetadata {
155 data: Arc::new(value),
156 expires_at,
157 last_accessed: SystemTime::now(),
158 };
159
160 let old_value = self.data.insert(key, metadata);
161 let old_size = old_value.map_or(0, |v| v.data.len());
162
163 if value_size > old_size {
164 self.current_size
165 .fetch_add(value_size - old_size, Ordering::Relaxed);
166 } else {
167 self.current_size
168 .fetch_sub(old_size - value_size, Ordering::Relaxed);
169 }
170 }
171
172 fn increment_access(&self, key: &str) {
173 self.access_count
174 .entry(key.to_string())
175 .and_modify(|count| *count += 1)
176 .or_insert(1);
177 }
178
179 pub fn is_hot(&self, key: &str) -> bool {
180 self.access_count
181 .get(key)
182 .map(|count| *count > 10)
183 .unwrap_or(false)
184 }
185
186 fn cleanup_expired(&self) {
187 let now = SystemTime::now();
188 let mut total_freed = 0;
189
190 let expired_keys: Vec<String> = self
191 .data
192 .iter()
193 .filter_map(|entry| {
194 if let Some(expires_at) = entry.expires_at {
195 if expires_at <= now {
196 Some(entry.key().clone())
197 } else {
198 None
199 }
200 } else {
201 None
202 }
203 })
204 .collect();
205
206 for key in expired_keys {
207 if let Some(entry) = self.data.remove(&key) {
208 total_freed += entry.1.data.len();
209 }
210 }
211
212 if total_freed > 0 {
213 self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
214 }
215 }
216
217 fn cleanup_by_lru(&self, bytes_to_free: usize) {
218 let mut entries: Vec<_> = self
219 .data
220 .iter()
221 .map(|entry| (entry.key().clone(), entry.last_accessed))
222 .collect();
223
224 entries.sort_by_key(|e| e.1);
225
226 let mut freed = 0;
227 for (key, _) in entries {
228 if freed >= bytes_to_free {
229 break;
230 }
231
232 if let Some(removed) = self.data.remove(&key) {
233 freed += removed.1.data.len();
234 self.access_count.remove(&key);
235 }
236 }
237
238 if freed > 0 {
239 self.current_size.fetch_sub(freed, Ordering::Relaxed);
240 }
241 }
242
243 fn evict_least_frequently_used(&self, bytes_to_free: usize) {
244 let mut entries: Vec<_> = self
245 .access_count
246 .iter()
247 .map(|e| (e.key().clone(), *e.value()))
248 .collect();
249
250 entries.sort_by_key(|e| e.1);
251
252 let mut freed = 0;
253 for (key, _) in entries {
254 if freed >= bytes_to_free {
255 break;
256 }
257
258 if let Some(value) = self.data.remove(&key) {
259 freed += value.1.data.len();
260 self.access_count.remove(&key);
261 }
262 }
263
264 if freed > 0 {
265 self.current_size.fetch_sub(freed, Ordering::Relaxed);
266 }
267 }
268
269 fn evict_hybrid(&self, item_size: usize) {
270 self.cleanup_expired();
271
272 let current = self.current_size.load(Ordering::Relaxed);
273 if current + item_size <= self.max_size {
274 return;
275 }
276
277 let bytes_to_free = (current + item_size) - self.max_size;
278 self.evict_least_frequently_used(bytes_to_free);
279
280 let current = self.current_size.load(Ordering::Relaxed);
281 if current + item_size > self.max_size {
282 let more_bytes_to_free = (current + item_size) - self.max_size;
283 self.cleanup_by_lru(more_bytes_to_free);
284 }
285 }
286
287 pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
288 let mut interval = interval(Duration::from_secs(interval_secs));
289
290 tokio::spawn(async move {
291 loop {
292 interval.tick().await;
293 self.cleanup_expired();
294 }
295 });
296 }
297
298 pub fn delete(&self, key: &str) {
299 if let Some(entry) = self.data.remove(key) {
300 let size = entry.1.data.len();
301 self.current_size.fetch_sub(size, Ordering::Relaxed);
302 }
303 }
304
305 pub fn get_stats(&self) -> CacheStats {
306 let hits = self.hit_count.load(Ordering::Relaxed);
307 let misses = self.miss_count.load(Ordering::Relaxed);
308 let total = hits + misses;
309
310 let hit_ratio = if total == 0 {
311 0.0
312 } else {
313 hits as f64 / total as f64
314 };
315
316 CacheStats {
317 item_count: self.data.len(),
318 memory_usage: self.current_size.load(Ordering::Relaxed),
319 hit_count: hits,
320 miss_count: misses,
321 hit_ratio,
322 }
323 }
324
325 pub fn clear(&self) {
326 self.data.clear();
327 self.access_count.clear();
328 self.current_size.store(0, Ordering::Relaxed);
329 }
330
331 pub fn hit_ratio(&self) -> f64 {
332 let hits = self.hit_count.load(Ordering::Relaxed);
333 let misses = self.miss_count.load(Ordering::Relaxed);
334 let total = hits + misses;
335
336 if total == 0 {
337 0.0
338 } else {
339 hits as f64 / total as f64
340 }
341 }
342}