1use dashmap::DashMap;
2use std::sync::Arc;
3use std::time::{Duration, SystemTime};
4use tokio::time::interval;
5use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
6
7#[derive(Debug)]
9pub struct CacheStats {
10 pub item_count: usize,
11 pub memory_usage: usize,
12 pub hit_count: u64,
13 pub miss_count: u64,
14 pub hit_ratio: f64,
15}
16
17struct ValueMetadata {
19 data: Vec<u8>,
20 expires_at: Option<SystemTime>,
21 last_accessed: SystemTime,
22}
23
24pub struct HotStore {
25 data: Arc<DashMap<String, ValueMetadata>>,
26 access_count: Arc<DashMap<String, u64>>,
27 hit_count: Arc<AtomicU64>,
28 miss_count: Arc<AtomicU64>,
29 max_size: usize, current_size: Arc<AtomicUsize>,
31}
32
33impl Clone for HotStore {
35 fn clone(&self) -> Self {
36 Self {
37 data: Arc::clone(&self.data),
38 access_count: Arc::clone(&self.access_count),
39 hit_count: Arc::clone(&self.hit_count),
40 miss_count: Arc::clone(&self.miss_count),
41 max_size: self.max_size,
42 current_size: Arc::clone(&self.current_size),
43 }
44 }
45}
46
47impl HotStore {
48 pub fn new() -> Self {
49 Self::new_with_size_limit(128) }
51
52 pub fn with_config(
53 cache_size_mb: usize,
54 _cleanup_interval_secs: u64,
55 ) -> Self {
56 Self {
57 data: Arc::new(DashMap::new()),
58 access_count: Arc::new(DashMap::new()),
59 hit_count: Arc::new(AtomicU64::new(0)),
60 miss_count: Arc::new(AtomicU64::new(0)),
61 max_size: cache_size_mb * 1024 * 1024,
62 current_size: Arc::new(AtomicUsize::new(0)),
63 }
64 }
65
66 pub fn new_with_size_limit(max_size_mb: usize) -> Self {
67 Self {
68 data: Arc::new(DashMap::new()),
69 access_count: Arc::new(DashMap::new()),
70 hit_count: Arc::new(AtomicU64::new(0)),
71 miss_count: Arc::new(AtomicU64::new(0)),
72 max_size: max_size_mb * 1024 * 1024,
73 current_size: Arc::new(AtomicUsize::new(0)),
74 }
75 }
76
77 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
78 self.increment_access(key);
79
80 if let Some(mut entry) = self.data.get_mut(key) {
81 if let Some(expires_at) = entry.expires_at {
83 if SystemTime::now() > expires_at {
84 self.data.remove(key);
85 let size = entry.data.len();
86 self.current_size.fetch_sub(size, Ordering::Relaxed);
87 self.miss_count.fetch_add(1, Ordering::Relaxed);
88 return None;
89 }
90 }
91
92 entry.last_accessed = SystemTime::now();
94 self.hit_count.fetch_add(1, Ordering::Relaxed);
95 Some(entry.data.clone())
96 } else {
97 self.miss_count.fetch_add(1, Ordering::Relaxed);
98 None
99 }
100 }
101
102 pub fn get_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
104 self.increment_access(key);
105
106 if let Some(mut entry) = self.data.get_mut(key) {
107 if let Some(expires_at) = entry.expires_at {
109 if SystemTime::now() > expires_at {
110 self.data.remove(key);
111 let size = entry.data.len();
112 self.current_size.fetch_sub(size, Ordering::Relaxed);
113 self.miss_count.fetch_add(1, Ordering::Relaxed);
114 return None;
115 }
116 }
117
118 entry.last_accessed = SystemTime::now();
120 self.hit_count.fetch_add(1, Ordering::Relaxed);
121 Some(Arc::new(entry.data.clone()))
122 } else {
123 self.miss_count.fetch_add(1, Ordering::Relaxed);
124 None
125 }
126 }
127
128 pub fn set(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) {
129 let value_size = value.len();
130
131 let new_size = self.current_size.load(Ordering::Relaxed)
133 .saturating_add(value_size);
134
135 if new_size > self.max_size {
136 self.evict_least_used(new_size.saturating_sub(self.max_size));
137 }
138
139 let expires_at = ttl.map(|duration| SystemTime::now() + duration);
140
141 if let Some(old_value) = self.data.get(&key) {
143 let old_size = old_value.data.len();
144 self.current_size.fetch_sub(old_size, Ordering::Relaxed);
145 }
146
147 let metadata = ValueMetadata {
148 data: value,
149 expires_at,
150 last_accessed: SystemTime::now(),
151 };
152
153 self.data.insert(key, metadata);
154 self.current_size.fetch_add(value_size, Ordering::Relaxed);
155 }
156
157 fn increment_access(&self, key: &str) {
158 self.access_count
159 .entry(key.to_string())
160 .and_modify(|count| *count += 1)
161 .or_insert(1);
162 }
163
164 pub fn is_hot(&self, key: &str) -> bool {
165 self.access_count
166 .get(key)
167 .map(|count| *count > 10)
168 .unwrap_or(false)
169 }
170
171 fn cleanup_expired(&self) {
173 let now = SystemTime::now();
174 let mut total_freed = 0;
175
176 let expired_keys: Vec<String> = self.data
178 .iter()
179 .filter_map(|entry| {
180 if let Some(expires_at) = entry.expires_at {
181 if expires_at <= now {
182 Some(entry.key().clone())
183 } else {
184 None
185 }
186 } else {
187 None
188 }
189 })
190 .collect();
191
192 for key in expired_keys {
193 if let Some(entry) = self.data.remove(&key) {
194 total_freed += entry.1.data.len();
195 }
196 }
197
198 if total_freed > 0 {
199 self.current_size.fetch_sub(total_freed, Ordering::Relaxed);
200 }
201 }
202
203 fn cleanup_by_lru(&self, bytes_to_free: usize) {
205 let mut entries: Vec<_> = self.data
206 .iter()
207 .map(|entry| (entry.key().clone(), entry.last_accessed))
208 .collect();
209
210 entries.sort_by_key(|e| e.1);
212
213 let mut freed = 0;
214 for (key, _) in entries {
215 if freed >= bytes_to_free {
216 break;
217 }
218
219 if let Some(removed) = self.data.remove(&key) {
220 freed += removed.1.data.len();
221 }
222 }
223
224 if freed > 0 {
225 self.current_size.fetch_sub(freed, Ordering::Relaxed);
226 }
227 }
228
229 fn evict_least_used(&self, bytes_to_free: usize) {
231 self.cleanup_expired();
233
234 if self.current_size.load(Ordering::Relaxed) + bytes_to_free <= self.max_size {
236 return;
237 }
238
239 let mut entries: Vec<_> = self.access_count
241 .iter()
242 .map(|e| (e.key().clone(), *e.value()))
243 .collect();
244
245 entries.sort_by_key(|e| e.1);
246
247 let mut freed = 0;
248 for (key, _) in entries {
249 if freed >= bytes_to_free {
250 break;
251 }
252
253 if let Some(value) = self.data.remove(&key) {
254 freed += value.1.data.len();
255 }
256 }
257
258 if freed > 0 {
259 self.current_size.fetch_sub(freed, Ordering::Relaxed);
260 }
261
262 if self.current_size.load(Ordering::Relaxed) + bytes_to_free > self.max_size {
264 self.cleanup_by_lru(bytes_to_free.saturating_sub(freed));
265 }
266 }
267
268 pub async fn start_cleanup_with_interval(self: Arc<Self>, interval_secs: u64) {
269 let mut interval = interval(Duration::from_secs(interval_secs));
270
271 tokio::spawn(async move {
272 loop {
273 interval.tick().await;
274 self.cleanup_expired();
275 }
276 });
277 }
278
279 pub fn delete(&self, key: &str) {
280 if let Some(entry) = self.data.remove(key) {
281 let size = entry.1.data.len();
282 self.current_size.fetch_sub(size, Ordering::Relaxed);
283 }
284 }
285
286 pub fn get_stats(&self) -> CacheStats {
288 let hits = self.hit_count.load(Ordering::Relaxed);
289 let misses = self.miss_count.load(Ordering::Relaxed);
290 let total = hits + misses;
291
292 let hit_ratio = if total == 0 {
293 0.0
294 } else {
295 hits as f64 / total as f64
296 };
297
298 CacheStats {
299 item_count: self.data.len(),
300 memory_usage: self.current_size.load(Ordering::Relaxed),
301 hit_count: hits,
302 miss_count: misses,
303 hit_ratio,
304 }
305 }
306
307 pub fn clear(&self) {
309 self.data.clear();
310 self.access_count.clear();
311 self.current_size.store(0, Ordering::Relaxed);
312 }
313
314 pub fn hit_ratio(&self) -> f64 {
316 let hits = self.hit_count.load(Ordering::Relaxed);
317 let misses = self.miss_count.load(Ordering::Relaxed);
318 let total = hits + misses;
319
320 if total == 0 {
321 0.0
322 } else {
323 hits as f64 / total as f64
324 }
325 }
326}