1use parking_lot::RwLock;
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
27use std::time::{Duration, Instant};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct CacheKey([u8; 32]);
36
37impl CacheKey {
38 pub fn from_bytes(data: &[u8]) -> Self {
40 let hash = blake3::hash(data);
41 Self(*hash.as_bytes())
42 }
43
44 pub fn from_query(query_type: &str, params: &[u8]) -> Self {
48 let mut hasher = blake3::Hasher::new();
49 hasher.update(query_type.as_bytes());
50 hasher.update(b":");
51 hasher.update(params);
52 let hash = hasher.finalize();
53 Self(*hash.as_bytes())
54 }
55
56 pub fn as_bytes(&self) -> &[u8; 32] {
58 &self.0
59 }
60}
61
62struct CacheEntry {
68 result: Vec<u8>,
70 created_at: Instant,
72 ttl: Duration,
74 access_count: AtomicU64,
76 last_accessed: Instant,
78 size_bytes: usize,
80 collection: Option<String>,
82}
83
84impl CacheEntry {
85 fn is_expired(&self) -> bool {
87 self.created_at.elapsed() > self.ttl
88 }
89}
90
91pub struct CacheStats {
100 hits: AtomicU64,
101 misses: AtomicU64,
102 evictions: AtomicU64,
103 insertions: AtomicU64,
104}
105
106impl CacheStats {
107 fn new() -> Self {
108 Self {
109 hits: AtomicU64::new(0),
110 misses: AtomicU64::new(0),
111 evictions: AtomicU64::new(0),
112 insertions: AtomicU64::new(0),
113 }
114 }
115
116 pub fn snapshot(&self) -> CacheStatsSnapshot {
118 CacheStatsSnapshot {
119 hits: self.hits.load(Ordering::Relaxed),
120 misses: self.misses.load(Ordering::Relaxed),
121 evictions: self.evictions.load(Ordering::Relaxed),
122 insertions: self.insertions.load(Ordering::Relaxed),
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct CacheStatsSnapshot {
130 pub hits: u64,
132 pub misses: u64,
134 pub evictions: u64,
136 pub insertions: u64,
138}
139
140impl CacheStatsSnapshot {
141 pub fn hit_rate(&self) -> f64 {
145 let total = self.hits + self.misses;
146 if total == 0 {
147 0.0
148 } else {
149 self.hits as f64 / total as f64
150 }
151 }
152
153 pub fn approx_size(&self) -> u64 {
158 self.insertions.saturating_sub(self.evictions)
159 }
160}
161
162struct CacheInner {
168 entries: HashMap<CacheKey, CacheEntry>,
170 lru_order: Vec<CacheKey>,
172 collection_index: HashMap<String, Vec<CacheKey>>,
174}
175
176impl CacheInner {
177 fn new() -> Self {
178 Self {
179 entries: HashMap::new(),
180 lru_order: Vec::new(),
181 collection_index: HashMap::new(),
182 }
183 }
184
185 fn touch(&mut self, key: &CacheKey) {
187 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
188 self.lru_order.remove(pos);
189 }
190 self.lru_order.push(*key);
191 }
192
193 fn evict_lru(&mut self) -> Option<CacheKey> {
195 if self.lru_order.is_empty() {
196 return None;
197 }
198 let key = self.lru_order.remove(0);
199 self.remove_entry_inner(&key);
200 Some(key)
201 }
202
203 fn remove_entry(&mut self, key: &CacheKey) {
205 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
206 self.lru_order.remove(pos);
207 }
208 self.remove_entry_inner(key);
209 }
210
211 fn remove_entry_inner(&mut self, key: &CacheKey) {
214 if let Some(entry) = self.entries.remove(key) {
215 if let Some(ref coll) = entry.collection {
216 if let Some(keys) = self.collection_index.get_mut(coll) {
217 keys.retain(|k| k != key);
218 if keys.is_empty() {
219 self.collection_index.remove(coll);
220 }
221 }
222 }
223 }
224 }
225}
226
227pub struct QueryCache {
233 inner: RwLock<CacheInner>,
234 max_entries: AtomicUsize,
235 default_ttl: Duration,
236 max_value_size: usize,
237 stats: CacheStats,
238}
239
240impl QueryCache {
241 pub fn new(max_entries: usize, default_ttl: Duration, max_value_size: usize) -> Self {
249 Self {
250 inner: RwLock::new(CacheInner::new()),
251 max_entries: AtomicUsize::new(max_entries),
252 default_ttl,
253 max_value_size,
254 stats: CacheStats::new(),
255 }
256 }
257
258 pub fn get(&self, key: &CacheKey) -> Option<Vec<u8>> {
264 let mut inner = self.inner.write();
265
266 if let Some(entry) = inner.entries.get(key) {
267 if entry.is_expired() {
268 inner.remove_entry(key);
270 self.stats.misses.fetch_add(1, Ordering::Relaxed);
271 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
272 return None;
273 }
274
275 let result = entry.result.clone();
277 if let Some(entry) = inner.entries.get_mut(key) {
279 entry.access_count.fetch_add(1, Ordering::Relaxed);
280 entry.last_accessed = Instant::now();
281 }
282 inner.touch(key);
283 self.stats.hits.fetch_add(1, Ordering::Relaxed);
284 Some(result)
285 } else {
286 self.stats.misses.fetch_add(1, Ordering::Relaxed);
287 None
288 }
289 }
290
291 pub fn put(&self, key: CacheKey, result: Vec<u8>) {
296 self.put_with_options(key, result, self.default_ttl, None);
297 }
298
299 pub fn put_with_ttl(&self, key: CacheKey, result: Vec<u8>, ttl: Duration) {
301 self.put_with_options(key, result, ttl, None);
302 }
303
304 pub fn put_with_options(
306 &self,
307 key: CacheKey,
308 result: Vec<u8>,
309 ttl: Duration,
310 collection: Option<&str>,
311 ) {
312 if result.len() > self.max_value_size {
313 return; }
315
316 let size_bytes = result.len();
317 let now = Instant::now();
318
319 let entry = CacheEntry {
320 result,
321 created_at: now,
322 ttl,
323 access_count: AtomicU64::new(0),
324 last_accessed: now,
325 size_bytes,
326 collection: collection.map(String::from),
327 };
328
329 let mut inner = self.inner.write();
330
331 if inner.entries.contains_key(&key) {
333 inner.remove_entry(&key);
334 }
335
336 let max = self.max_entries.load(Ordering::Relaxed);
338 while inner.entries.len() >= max {
339 if inner.evict_lru().is_some() {
340 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
341 } else {
342 break;
343 }
344 }
345
346 if let Some(ref coll) = entry.collection {
348 inner
349 .collection_index
350 .entry(coll.clone())
351 .or_default()
352 .push(key);
353 }
354
355 inner.entries.insert(key, entry);
356 inner.lru_order.push(key);
357 self.stats.insertions.fetch_add(1, Ordering::Relaxed);
358 }
359
360 pub fn invalidate(&self, collection: &str) {
366 let mut inner = self.inner.write();
367 if let Some(keys) = inner.collection_index.remove(collection) {
368 let evicted = keys.len() as u64;
369 for key in &keys {
370 if let Some(pos) = inner.lru_order.iter().position(|k| k == key) {
371 inner.lru_order.remove(pos);
372 }
373 inner.entries.remove(key);
374 }
375 self.stats.evictions.fetch_add(evicted, Ordering::Relaxed);
376 }
377 }
378
379 pub fn invalidate_all(&self) {
381 let mut inner = self.inner.write();
382 let evicted = inner.entries.len() as u64;
383 inner.entries.clear();
384 inner.lru_order.clear();
385 inner.collection_index.clear();
386 self.stats.evictions.fetch_add(evicted, Ordering::Relaxed);
387 }
388
389 pub fn stats(&self) -> CacheStatsSnapshot {
391 self.stats.snapshot()
392 }
393
394 pub fn resize(&self, new_max: usize) {
399 let mut inner = self.inner.write();
409 while inner.entries.len() > new_max {
410 if inner.evict_lru().is_some() {
411 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
412 } else {
413 break;
414 }
415 }
416 drop(inner);
417
418 self.max_entries.store(new_max, Ordering::SeqCst);
420 }
421
422 pub fn len(&self) -> usize {
424 let inner = self.inner.read();
425 inner.entries.len()
426 }
427
428 pub fn is_empty(&self) -> bool {
430 self.len() == 0
431 }
432
433 pub fn total_size_bytes(&self) -> usize {
435 let inner = self.inner.read();
436 inner.entries.values().map(|e| e.size_bytes).sum()
437 }
438}
439
440impl std::fmt::Debug for QueryCache {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 let snap = self.stats.snapshot();
443 f.debug_struct("QueryCache")
444 .field("max_entries", &self.max_entries)
445 .field("default_ttl", &self.default_ttl)
446 .field("max_value_size", &self.max_value_size)
447 .field("len", &self.len())
448 .field("stats", &snap)
449 .finish()
450 }
451}
452
453#[cfg(test)]
458mod tests {
459 use super::*;
460 use std::sync::Arc;
461 use std::thread;
462 use std::time::Duration;
463
464 fn test_cache(max_entries: usize) -> QueryCache {
466 QueryCache::new(max_entries, Duration::from_secs(60), 1024 * 1024)
467 }
468
469 #[test]
471 fn test_cache_put_get() {
472 let cache = test_cache(100);
473 let key = CacheKey::from_bytes(b"select * from users");
474 cache.put(key, vec![1, 2, 3, 4]);
475
476 let result = cache.get(&key);
477 assert!(result.is_some());
478 assert_eq!(result.expect("should have value"), vec![1, 2, 3, 4]);
479 }
480
481 #[test]
483 fn test_cache_miss() {
484 let cache = test_cache(100);
485 let key = CacheKey::from_bytes(b"nonexistent query");
486
487 let result = cache.get(&key);
488 assert!(result.is_none());
489
490 let snap = cache.stats();
491 assert_eq!(snap.hits, 0);
492 assert_eq!(snap.misses, 1);
493 }
494
495 #[test]
497 fn test_cache_ttl_expiry() {
498 let cache = QueryCache::new(100, Duration::from_millis(50), 1024 * 1024);
499 let key = CacheKey::from_bytes(b"expiring query");
500 cache.put(key, vec![10, 20]);
501
502 assert!(cache.get(&key).is_some());
504
505 thread::sleep(Duration::from_millis(100));
507
508 assert!(cache.get(&key).is_none());
510
511 let snap = cache.stats();
512 assert_eq!(snap.hits, 1);
513 assert_eq!(snap.misses, 1);
514 assert_eq!(snap.evictions, 1); }
516
517 #[test]
519 fn test_cache_hit_updates_stats() {
520 let cache = test_cache(100);
521 let key = CacheKey::from_bytes(b"stats query");
522 cache.put(key, vec![1]);
523
524 for _ in 0..5 {
525 let _ = cache.get(&key);
526 }
527
528 let snap = cache.stats();
529 assert_eq!(snap.hits, 5);
530 assert_eq!(snap.misses, 0);
531 }
532
533 #[test]
535 fn test_cache_miss_updates_stats() {
536 let cache = test_cache(100);
537
538 for i in 0..3u8 {
539 let key = CacheKey::from_bytes(&[i]);
540 let _ = cache.get(&key);
541 }
542
543 let snap = cache.stats();
544 assert_eq!(snap.hits, 0);
545 assert_eq!(snap.misses, 3);
546 }
547
548 #[test]
550 fn test_cache_lru_eviction() {
551 let cache = test_cache(3);
552
553 let keys: Vec<CacheKey> = (0..3u8).map(|i| CacheKey::from_bytes(&[i])).collect();
554
555 for (i, key) in keys.iter().enumerate() {
556 cache.put(*key, vec![i as u8]);
557 }
558
559 assert_eq!(cache.len(), 3);
560
561 let _ = cache.get(&keys[0]);
563
564 let key3 = CacheKey::from_bytes(&[3u8]);
566 cache.put(key3, vec![3]);
567
568 assert_eq!(cache.len(), 3);
569 assert!(
570 cache.get(&keys[0]).is_some(),
571 "key[0] was accessed and should survive"
572 );
573 assert!(
574 cache.get(&keys[1]).is_none(),
575 "key[1] should have been evicted"
576 );
577 assert!(
578 cache.get(&keys[2]).is_some(),
579 "key[2] should still be present"
580 );
581 assert!(cache.get(&key3).is_some(), "key[3] was just inserted");
582
583 let snap = cache.stats();
584 assert!(snap.evictions >= 1);
585 }
586
587 #[test]
589 fn test_cache_invalidate_collection() {
590 let cache = test_cache(100);
591
592 let k1 = CacheKey::from_query("filter", b"users:age>18");
593 let k2 = CacheKey::from_query("get", b"users:id=1");
594 let k3 = CacheKey::from_query("filter", b"orders:total>100");
595
596 cache.put_with_options(k1, vec![1], Duration::from_secs(60), Some("users"));
597 cache.put_with_options(k2, vec![2], Duration::from_secs(60), Some("users"));
598 cache.put_with_options(k3, vec![3], Duration::from_secs(60), Some("orders"));
599
600 assert_eq!(cache.len(), 3);
601
602 cache.invalidate("users");
603
604 assert_eq!(cache.len(), 1);
605 assert!(cache.get(&k1).is_none());
606 assert!(cache.get(&k2).is_none());
607 assert!(cache.get(&k3).is_some(), "orders entry should remain");
608 }
609
610 #[test]
612 fn test_cache_invalidate_all() {
613 let cache = test_cache(100);
614
615 for i in 0..10u8 {
616 let key = CacheKey::from_bytes(&[i]);
617 cache.put(key, vec![i]);
618 }
619
620 assert_eq!(cache.len(), 10);
621
622 cache.invalidate_all();
623
624 assert_eq!(cache.len(), 0);
625 assert!(cache.is_empty());
626
627 let snap = cache.stats();
628 assert_eq!(snap.evictions, 10);
629 }
630
631 #[test]
633 fn test_cache_hit_rate() {
634 let cache = test_cache(100);
635 let key = CacheKey::from_bytes(b"rate query");
636 cache.put(key, vec![1]);
637
638 for _ in 0..3 {
640 let _ = cache.get(&key);
641 }
642 let missing = CacheKey::from_bytes(b"no such key");
644 let _ = cache.get(&missing);
645
646 let snap = cache.stats();
647 assert!((snap.hit_rate() - 0.75).abs() < 1e-9);
649
650 let empty_cache = test_cache(10);
652 let snap = empty_cache.stats();
653 assert!((snap.hit_rate() - 0.0).abs() < f64::EPSILON);
654 }
655
656 #[test]
658 fn test_cache_concurrent_access() {
659 let cache = Arc::new(test_cache(500));
660 let mut handles = Vec::new();
661
662 for t in 0..4 {
664 let c = Arc::clone(&cache);
665 handles.push(thread::spawn(move || {
666 for i in 0..200u64 {
667 let key_bytes = format!("thread-{}-key-{}", t, i);
668 let key = CacheKey::from_bytes(key_bytes.as_bytes());
669 c.put(key, vec![t as u8; 64]);
670 }
671 }));
672 }
673
674 for t in 0..4 {
676 let c = Arc::clone(&cache);
677 handles.push(thread::spawn(move || {
678 for i in 0..200u64 {
679 let key_bytes = format!("thread-{}-key-{}", t, i);
680 let key = CacheKey::from_bytes(key_bytes.as_bytes());
681 let _ = c.get(&key);
682 }
683 }));
684 }
685
686 for h in handles {
687 h.join().expect("thread should not panic");
688 }
689
690 let snap = cache.stats();
691 assert!(snap.insertions > 0);
692 assert!(cache.len() <= 500);
693 }
694
695 #[test]
697 fn test_cache_max_value_size() {
698 let cache = QueryCache::new(100, Duration::from_secs(60), 100);
699
700 let k1 = CacheKey::from_bytes(b"small");
702 cache.put(k1, vec![0u8; 100]);
703 assert!(cache.get(&k1).is_some());
704
705 let k2 = CacheKey::from_bytes(b"big");
707 cache.put(k2, vec![0u8; 101]);
708 assert!(cache.get(&k2).is_none());
709
710 let snap = cache.stats();
711 assert_eq!(snap.insertions, 1); }
713
714 #[test]
716 fn test_cache_resize() {
717 let cache = test_cache(10);
718
719 for i in 0..10u8 {
720 let key = CacheKey::from_bytes(&[i]);
721 cache.put(key, vec![i]);
722 }
723 assert_eq!(cache.len(), 10);
724
725 cache.resize(5);
727 assert_eq!(cache.len(), 5);
728
729 let snap = cache.stats();
730 assert_eq!(snap.evictions, 5);
731
732 for i in 100..106u8 {
734 let key = CacheKey::from_bytes(&[i]);
735 cache.put(key, vec![i]);
736 }
737 assert!(cache.len() <= 5);
738 }
739
740 #[test]
742 fn test_cache_key_generation() {
743 let k1 = CacheKey::from_query("filter", b"users:age>18");
744 let k2 = CacheKey::from_query("filter", b"users:age>18");
745 assert_eq!(k1, k2, "same query should produce the same key");
746
747 let k3 = CacheKey::from_bytes(b"hello world");
748 let k4 = CacheKey::from_bytes(b"hello world");
749 assert_eq!(k3, k4);
750 }
751
752 #[test]
754 fn test_cache_different_queries() {
755 let k1 = CacheKey::from_query("filter", b"users:age>18");
756 let k2 = CacheKey::from_query("filter", b"users:age>21");
757 assert_ne!(k1, k2, "different params should produce different keys");
758
759 let k3 = CacheKey::from_query("filter", b"users:age>18");
760 let k4 = CacheKey::from_query("get", b"users:age>18");
761 assert_ne!(
762 k3, k4,
763 "different query types should produce different keys"
764 );
765 }
766
767 #[test]
769 fn test_total_size_bytes() {
770 let cache = test_cache(100);
771 let k1 = CacheKey::from_bytes(b"a");
772 let k2 = CacheKey::from_bytes(b"b");
773 cache.put(k1, vec![0u8; 100]);
774 cache.put(k2, vec![0u8; 200]);
775 assert_eq!(cache.total_size_bytes(), 300);
776 }
777
778 #[test]
780 fn test_put_with_custom_ttl() {
781 let cache = QueryCache::new(100, Duration::from_secs(300), 1024 * 1024);
782 let key = CacheKey::from_bytes(b"short lived");
783 cache.put_with_ttl(key, vec![1, 2], Duration::from_millis(50));
784
785 assert!(cache.get(&key).is_some());
786 thread::sleep(Duration::from_millis(100));
787 assert!(cache.get(&key).is_none());
788 }
789
790 #[test]
792 fn test_debug_format() {
793 let cache = test_cache(10);
794 let dbg = format!("{:?}", cache);
795 assert!(dbg.contains("QueryCache"));
796 assert!(dbg.contains("max_entries"));
797 }
798}