1use scirs2_core::metrics::MetricsRegistry;
38use std::collections::{HashMap, VecDeque};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::sync::{Arc, RwLock};
41use std::time::{Duration, Instant};
42
43#[derive(Debug, Clone)]
45pub struct CacheConfig {
46 pub max_entries: usize,
48 pub max_memory_bytes: u64,
50 pub default_ttl: Duration,
52 pub enable_lru: bool,
54}
55
56impl Default for CacheConfig {
57 fn default() -> Self {
58 Self {
59 max_entries: 10000,
60 max_memory_bytes: 1024 * 1024 * 1024, default_ttl: Duration::from_secs(300), enable_lru: true,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69struct CacheEntry<V> {
70 value: V,
72 size_bytes: u64,
74 #[allow(dead_code)]
76 created_at: Instant,
77 expires_at: Instant,
79 last_accessed: Instant,
81 access_count: u64,
83}
84
85impl<V> CacheEntry<V> {
86 fn is_expired(&self) -> bool {
87 Instant::now() >= self.expires_at
88 }
89
90 fn touch(&mut self) {
91 self.last_accessed = Instant::now();
92 self.access_count += 1;
93 }
94}
95
96pub struct QueryResultCache<V: Clone> {
101 config: CacheConfig,
103 entries: Arc<RwLock<HashMap<String, CacheEntry<V>>>>,
105 lru_queue: Arc<RwLock<VecDeque<String>>>,
107 current_memory: Arc<AtomicU64>,
109 stats: CacheStats,
111 #[allow(dead_code)]
113 metrics: Arc<MetricsRegistry>,
114}
115
116#[derive(Clone)]
118pub struct CacheStats {
119 pub hits: Arc<AtomicU64>,
121 pub misses: Arc<AtomicU64>,
123 pub evictions: Arc<AtomicU64>,
125 pub expirations: Arc<AtomicU64>,
127 pub puts: Arc<AtomicU64>,
129 pub invalidations: Arc<AtomicU64>,
131}
132
133impl CacheStats {
134 fn new() -> Self {
135 Self {
136 hits: Arc::new(AtomicU64::new(0)),
137 misses: Arc::new(AtomicU64::new(0)),
138 evictions: Arc::new(AtomicU64::new(0)),
139 expirations: Arc::new(AtomicU64::new(0)),
140 puts: Arc::new(AtomicU64::new(0)),
141 invalidations: Arc::new(AtomicU64::new(0)),
142 }
143 }
144
145 pub fn hit_rate(&self) -> f64 {
147 let hits = self.hits.load(Ordering::Relaxed);
148 let misses = self.misses.load(Ordering::Relaxed);
149 let total = hits + misses;
150 if total == 0 {
151 0.0
152 } else {
153 hits as f64 / total as f64
154 }
155 }
156
157 pub fn reset(&self) {
159 self.hits.store(0, Ordering::Relaxed);
160 self.misses.store(0, Ordering::Relaxed);
161 self.evictions.store(0, Ordering::Relaxed);
162 self.expirations.store(0, Ordering::Relaxed);
163 self.puts.store(0, Ordering::Relaxed);
164 self.invalidations.store(0, Ordering::Relaxed);
165 }
166}
167
168impl<V: Clone> QueryResultCache<V> {
169 pub fn new(config: CacheConfig) -> Self {
171 let metrics = MetricsRegistry::new();
172
173 Self {
174 config,
175 entries: Arc::new(RwLock::new(HashMap::new())),
176 lru_queue: Arc::new(RwLock::new(VecDeque::new())),
177 current_memory: Arc::new(AtomicU64::new(0)),
178 stats: CacheStats::new(),
179 metrics: Arc::new(metrics),
180 }
181 }
182
183 pub fn put(&self, key: String, value: V) {
185 self.put_with_ttl(key, value, self.config.default_ttl);
186 }
187
188 pub fn put_with_ttl(&self, key: String, value: V, ttl: Duration) {
190 let now = Instant::now();
191 let size_bytes = self.estimate_size(&value);
192
193 let entry = CacheEntry {
194 value,
195 size_bytes,
196 created_at: now,
197 expires_at: now + ttl,
198 last_accessed: now,
199 access_count: 0,
200 };
201
202 self.ensure_capacity(size_bytes);
204
205 {
206 let mut entries = self.entries.write().expect("entries lock poisoned");
207
208 if let Some(old_entry) = entries.remove(&key) {
210 self.current_memory
211 .fetch_sub(old_entry.size_bytes, Ordering::Relaxed);
212 }
213
214 entries.insert(key.clone(), entry);
216 self.current_memory.fetch_add(size_bytes, Ordering::Relaxed);
217 }
218
219 if self.config.enable_lru {
221 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
222 lru.retain(|k| k != &key); lru.push_back(key);
224 }
225
226 self.stats.puts.fetch_add(1, Ordering::Relaxed);
227 }
228
229 pub fn get(&self, key: &str) -> Option<V> {
231 self.clean_expired();
233
234 let mut entries = self.entries.write().expect("entries lock poisoned");
235
236 if let Some(entry) = entries.get_mut(key) {
237 if entry.is_expired() {
238 self.current_memory
240 .fetch_sub(entry.size_bytes, Ordering::Relaxed);
241 entries.remove(key);
242 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
243 self.stats.misses.fetch_add(1, Ordering::Relaxed);
244 return None;
245 }
246
247 entry.touch();
249
250 if self.config.enable_lru {
252 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
253 lru.retain(|k| k != key);
254 lru.push_back(key.to_string());
255 }
256
257 self.stats.hits.fetch_add(1, Ordering::Relaxed);
258 Some(entry.value.clone())
259 } else {
260 self.stats.misses.fetch_add(1, Ordering::Relaxed);
261 None
262 }
263 }
264
265 pub fn invalidate(&self, key: &str) -> bool {
267 let mut entries = self.entries.write().expect("entries lock poisoned");
268
269 if let Some(entry) = entries.remove(key) {
270 self.current_memory
271 .fetch_sub(entry.size_bytes, Ordering::Relaxed);
272
273 if self.config.enable_lru {
274 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
275 lru.retain(|k| k != key);
276 }
277
278 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
279 true
280 } else {
281 false
282 }
283 }
284
285 pub fn clear(&self) {
287 let mut entries = self.entries.write().expect("entries lock poisoned");
288 entries.clear();
289
290 if self.config.enable_lru {
291 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
292 lru.clear();
293 }
294
295 self.current_memory.store(0, Ordering::Relaxed);
296 }
297
298 pub fn len(&self) -> usize {
300 self.entries.read().expect("entries lock poisoned").len()
301 }
302
303 pub fn is_empty(&self) -> bool {
305 self.len() == 0
306 }
307
308 pub fn memory_usage(&self) -> u64 {
310 self.current_memory.load(Ordering::Relaxed)
311 }
312
313 pub fn stats(&self) -> CacheStats {
315 self.stats.clone()
316 }
317
318 fn clean_expired(&self) {
320 let mut entries = self.entries.write().expect("entries lock poisoned");
321 let mut to_remove = Vec::new();
322
323 for (key, entry) in entries.iter() {
324 if entry.is_expired() {
325 to_remove.push((key.clone(), entry.size_bytes));
326 }
327 }
328
329 for (key, size) in to_remove {
330 entries.remove(&key);
331 self.current_memory.fetch_sub(size, Ordering::Relaxed);
332 self.stats.expirations.fetch_add(1, Ordering::Relaxed);
333
334 if self.config.enable_lru {
335 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
336 lru.retain(|k| k != &key);
337 }
338 }
339 }
340
341 fn ensure_capacity(&self, new_entry_size: u64) {
343 while self.len() >= self.config.max_entries {
345 self.evict_lru();
346 }
347
348 while self.memory_usage() + new_entry_size > self.config.max_memory_bytes {
350 self.evict_lru();
351 }
352 }
353
354 fn evict_lru(&self) {
356 if !self.config.enable_lru {
357 let key_to_evict = {
359 let entries = self.entries.read().expect("entries lock poisoned");
360 entries.keys().next().cloned()
361 };
362
363 if let Some(key) = key_to_evict {
364 let mut entries = self.entries.write().expect("entries lock poisoned");
365 if let Some(entry) = entries.remove(&key) {
366 self.current_memory
367 .fetch_sub(entry.size_bytes, Ordering::Relaxed);
368 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
369 }
370 }
371 return;
372 }
373
374 let key_to_evict = {
376 let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
377 lru.pop_front()
378 };
379
380 if let Some(key) = key_to_evict {
381 let mut entries = self.entries.write().expect("entries lock poisoned");
382 if let Some(entry) = entries.remove(&key) {
383 self.current_memory
384 .fetch_sub(entry.size_bytes, Ordering::Relaxed);
385 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
386 }
387 }
388 }
389
390 fn estimate_size(&self, _value: &V) -> u64 {
395 1024
398 }
399}
400
401impl<V: Clone> Default for QueryResultCache<V> {
402 fn default() -> Self {
403 Self::new(CacheConfig::default())
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn test_basic_cache_operations() {
413 let cache = QueryResultCache::<String>::new(CacheConfig::default());
414
415 cache.put("key1".to_string(), "value1".to_string());
417 assert_eq!(cache.get("key1"), Some("value1".to_string()));
418
419 assert_eq!(cache.get("key2"), None);
421
422 assert_eq!(cache.stats().hits.load(Ordering::Relaxed), 1);
424 assert_eq!(cache.stats().misses.load(Ordering::Relaxed), 1);
425 }
426
427 #[test]
428 fn test_ttl_expiration() {
429 let config = CacheConfig {
430 default_ttl: Duration::from_millis(100),
431 ..Default::default()
432 };
433 let cache = QueryResultCache::<String>::new(config);
434
435 cache.put("key1".to_string(), "value1".to_string());
436 assert_eq!(cache.get("key1"), Some("value1".to_string()));
437
438 std::thread::sleep(Duration::from_millis(150));
440
441 assert_eq!(cache.get("key1"), None);
442 assert_eq!(cache.stats().expirations.load(Ordering::Relaxed), 1);
443 }
444
445 #[test]
446 fn test_lru_eviction() {
447 let config = CacheConfig {
448 max_entries: 3,
449 enable_lru: true,
450 ..Default::default()
451 };
452 let cache = QueryResultCache::<String>::new(config);
453
454 cache.put("key1".to_string(), "value1".to_string());
456 cache.put("key2".to_string(), "value2".to_string());
457 cache.put("key3".to_string(), "value3".to_string());
458
459 cache.get("key1");
461
462 cache.put("key4".to_string(), "value4".to_string());
464
465 assert_eq!(cache.get("key1"), Some("value1".to_string()));
466 assert_eq!(cache.get("key2"), None); assert_eq!(cache.get("key3"), Some("value3".to_string()));
468 assert_eq!(cache.get("key4"), Some("value4".to_string()));
469 }
470
471 #[test]
472 fn test_cache_invalidation() {
473 let cache = QueryResultCache::<String>::new(CacheConfig::default());
474
475 cache.put("key1".to_string(), "value1".to_string());
476 assert!(cache.invalidate("key1"));
477 assert_eq!(cache.get("key1"), None);
478 assert!(!cache.invalidate("key1")); }
480
481 #[test]
482 fn test_cache_clear() {
483 let cache = QueryResultCache::<String>::new(CacheConfig::default());
484
485 cache.put("key1".to_string(), "value1".to_string());
486 cache.put("key2".to_string(), "value2".to_string());
487
488 assert_eq!(cache.len(), 2);
489 cache.clear();
490 assert_eq!(cache.len(), 0);
491 assert_eq!(cache.memory_usage(), 0);
492 }
493
494 #[test]
495 fn test_hit_rate() {
496 let cache = QueryResultCache::<String>::new(CacheConfig::default());
497
498 cache.put("key1".to_string(), "value1".to_string());
499
500 cache.get("key1"); cache.get("key2"); cache.get("key1"); assert_eq!(cache.stats().hit_rate(), 2.0 / 3.0);
505 }
506
507 #[test]
508 fn test_concurrent_access() {
509 use std::sync::Arc;
510 use std::thread;
511
512 let cache = Arc::new(QueryResultCache::<String>::new(CacheConfig::default()));
513 let mut handles = vec![];
514
515 for i in 0..10 {
517 let cache_clone = Arc::clone(&cache);
518 let handle = thread::spawn(move || {
519 for j in 0..100 {
520 let key = format!("key_{}", i * 100 + j);
521 let value = format!("value_{}", i * 100 + j);
522 cache_clone.put(key.clone(), value.clone());
523 cache_clone.get(&key);
524 }
525 });
526 handles.push(handle);
527 }
528
529 for handle in handles {
530 handle.join().expect("thread should not panic");
531 }
532
533 assert!(cache.len() <= 1000);
535 }
536
537 #[test]
538 fn test_memory_aware_eviction() {
539 let config = CacheConfig {
540 max_entries: 1000,
541 max_memory_bytes: 5120, enable_lru: true,
543 ..Default::default()
544 };
545 let max_memory = config.max_memory_bytes;
546 let cache = QueryResultCache::<String>::new(config);
547
548 for i in 0..10 {
550 cache.put(format!("key{}", i), format!("value{}", i));
551 }
552
553 assert!(cache.memory_usage() <= max_memory);
555 assert!(cache.stats().evictions.load(Ordering::Relaxed) > 0);
556 }
557}