1use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant};
29
30pub use crate::view::incremental::TripleDelta;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct CoreCacheKey {
42 pub dataset_id: String,
44 pub query_fingerprint: u64,
46}
47
48impl CoreCacheKey {
49 pub fn new(dataset_id: &str, query: &str) -> Self {
51 Self {
52 dataset_id: dataset_id.to_owned(),
53 query_fingerprint: Self::fingerprint(query),
54 }
55 }
56
57 fn fingerprint(s: &str) -> u64 {
61 const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
62 const FNV_PRIME: u64 = 1_099_511_628_211;
63
64 let mut hash = FNV_OFFSET;
65 for byte in s.bytes() {
66 hash ^= byte as u64;
67 hash = hash.wrapping_mul(FNV_PRIME);
68 }
69 hash
70 }
71}
72
73#[derive(Debug, Clone)]
79pub struct CoreCacheEntry {
80 pub key: CoreCacheKey,
82 pub result_rows: Vec<HashMap<String, String>>,
84 pub accessed_predicates: Vec<String>,
86 pub created_at: Instant,
88 pub last_accessed: Instant,
90 pub expires_at: Instant,
92 pub hit_count: u64,
94}
95
96impl CoreCacheEntry {
97 pub fn is_expired(&self) -> bool {
99 Instant::now() >= self.expires_at
100 }
101
102 fn touch(&mut self) {
104 self.hit_count += 1;
105 self.last_accessed = Instant::now();
106 }
107}
108
109struct LruList {
119 order: Vec<CoreCacheKey>,
121}
122
123impl LruList {
124 fn new(capacity: usize) -> Self {
125 Self {
126 order: Vec::with_capacity(capacity),
127 }
128 }
129
130 fn touch(&mut self, key: &CoreCacheKey) {
132 if let Some(pos) = self.order.iter().position(|k| k == key) {
133 self.order.remove(pos);
134 }
135 self.order.push(key.clone());
136 }
137
138 fn remove(&mut self, key: &CoreCacheKey) {
140 self.order.retain(|k| k != key);
141 }
142
143 fn pop_lru(&mut self) -> Option<CoreCacheKey> {
145 if self.order.is_empty() {
146 None
147 } else {
148 Some(self.order.remove(0))
149 }
150 }
151
152 fn len(&self) -> usize {
153 self.order.len()
154 }
155}
156
157struct CacheInner {
162 entries: HashMap<CoreCacheKey, CoreCacheEntry>,
163 lru: LruList,
164 capacity: usize,
165}
166
167impl CacheInner {
168 fn new(capacity: usize) -> Self {
169 Self {
170 entries: HashMap::with_capacity(capacity),
171 lru: LruList::new(capacity),
172 capacity,
173 }
174 }
175
176 fn evict_to_capacity(&mut self) -> usize {
178 let mut evicted = 0;
179 while self.entries.len() >= self.capacity {
180 if let Some(lru_key) = self.lru.pop_lru() {
181 self.entries.remove(&lru_key);
182 evicted += 1;
183 } else {
184 break;
185 }
186 }
187 evicted
188 }
189
190 fn purge_expired(&mut self) -> usize {
192 let expired: Vec<CoreCacheKey> = self
193 .entries
194 .iter()
195 .filter(|(_, e)| e.is_expired())
196 .map(|(k, _)| k.clone())
197 .collect();
198
199 let count = expired.len();
200 for key in &expired {
201 self.entries.remove(key);
202 self.lru.remove(key);
203 }
204 count
205 }
206}
207
208pub struct CoreResultCache {
228 inner: Arc<Mutex<CacheInner>>,
229 default_ttl: Duration,
230 hits: Arc<AtomicU64>,
231 misses: Arc<AtomicU64>,
232}
233
234impl CoreResultCache {
235 pub fn new(capacity: usize, ttl: Duration) -> Self {
237 Self {
238 inner: Arc::new(Mutex::new(CacheInner::new(capacity.max(1)))),
239 default_ttl: ttl,
240 hits: Arc::new(AtomicU64::new(0)),
241 misses: Arc::new(AtomicU64::new(0)),
242 }
243 }
244
245 pub fn get(&self, key: &CoreCacheKey) -> Option<Vec<HashMap<String, String>>> {
251 let mut inner = self.inner.lock().expect("cache lock poisoned");
252
253 if let Some(entry) = inner.entries.get_mut(key) {
254 if entry.is_expired() {
255 let key_clone = key.clone();
257 inner.entries.remove(&key_clone);
258 inner.lru.remove(&key_clone);
259 self.misses.fetch_add(1, Ordering::Relaxed);
260 return None;
261 }
262 entry.touch();
263 let result = entry.result_rows.clone();
264 inner.lru.touch(key);
265 self.hits.fetch_add(1, Ordering::Relaxed);
266 Some(result)
267 } else {
268 self.misses.fetch_add(1, Ordering::Relaxed);
269 None
270 }
271 }
272
273 pub fn put(
278 &self,
279 key: CoreCacheKey,
280 rows: Vec<HashMap<String, String>>,
281 predicates: Vec<String>,
282 ) {
283 self.put_with_ttl(key, rows, predicates, self.default_ttl);
284 }
285
286 pub fn put_with_ttl(
288 &self,
289 key: CoreCacheKey,
290 rows: Vec<HashMap<String, String>>,
291 predicates: Vec<String>,
292 ttl: Duration,
293 ) {
294 let now = Instant::now();
295 let entry = CoreCacheEntry {
296 key: key.clone(),
297 result_rows: rows,
298 accessed_predicates: predicates,
299 created_at: now,
300 last_accessed: now,
301 expires_at: now + ttl,
302 hit_count: 0,
303 };
304
305 let mut inner = self.inner.lock().expect("cache lock poisoned");
306
307 inner.purge_expired();
309
310 inner.evict_to_capacity();
312
313 if inner.entries.contains_key(&key) {
315 inner.lru.remove(&key);
316 }
317
318 inner.lru.touch(&key);
319 inner.entries.insert(key, entry);
320 }
321
322 pub fn invalidate_dataset(&self, dataset_id: &str) -> usize {
326 let mut inner = self.inner.lock().expect("cache lock poisoned");
327
328 let to_remove: Vec<CoreCacheKey> = inner
329 .entries
330 .keys()
331 .filter(|k| k.dataset_id == dataset_id)
332 .cloned()
333 .collect();
334
335 let count = to_remove.len();
336 for key in &to_remove {
337 inner.entries.remove(key);
338 inner.lru.remove(key);
339 }
340 count
341 }
342
343 pub fn invalidate_predicate(&self, dataset_id: &str, predicate: &str) -> usize {
347 let mut inner = self.inner.lock().expect("cache lock poisoned");
348
349 let to_remove: Vec<CoreCacheKey> = inner
350 .entries
351 .iter()
352 .filter(|(k, e)| {
353 k.dataset_id == dataset_id
354 && (e.accessed_predicates.is_empty()
355 || e.accessed_predicates.iter().any(|p| p == predicate))
356 })
357 .map(|(k, _)| k.clone())
358 .collect();
359
360 let count = to_remove.len();
361 for key in &to_remove {
362 inner.entries.remove(key);
363 inner.lru.remove(key);
364 }
365 count
366 }
367
368 pub fn invalidate_on_delta(&self, dataset_id: &str, deltas: &[TripleDelta]) -> usize {
376 if deltas.is_empty() {
377 return 0;
378 }
379
380 let changed_predicates: std::collections::HashSet<&str> =
382 deltas.iter().map(|d| d.predicate()).collect();
383
384 let mut inner = self.inner.lock().expect("cache lock poisoned");
385
386 let to_remove: Vec<CoreCacheKey> = inner
387 .entries
388 .iter()
389 .filter(|(k, e)| {
390 if k.dataset_id != dataset_id {
391 return false;
392 }
393 if e.accessed_predicates.is_empty() {
394 return true; }
396 e.accessed_predicates
397 .iter()
398 .any(|p| changed_predicates.contains(p.as_str()))
399 })
400 .map(|(k, _)| k.clone())
401 .collect();
402
403 let count = to_remove.len();
404 for key in &to_remove {
405 inner.entries.remove(key);
406 inner.lru.remove(key);
407 }
408 count
409 }
410
411 pub fn hit_rate(&self) -> f64 {
415 let hits = self.hits.load(Ordering::Relaxed);
416 let misses = self.misses.load(Ordering::Relaxed);
417 let total = hits + misses;
418 if total == 0 {
419 0.0
420 } else {
421 hits as f64 / total as f64
422 }
423 }
424
425 pub fn size(&self) -> usize {
428 self.inner.lock().expect("cache lock poisoned").lru.len()
429 }
430
431 pub fn clear(&self) {
433 let mut inner = self.inner.lock().expect("cache lock poisoned");
434 inner.entries.clear();
435 inner.lru.order.clear();
436 }
437
438 pub fn hit_count(&self) -> u64 {
440 self.hits.load(Ordering::Relaxed)
441 }
442
443 pub fn miss_count(&self) -> u64 {
445 self.misses.load(Ordering::Relaxed)
446 }
447}
448
449#[cfg(test)]
454mod tests {
455 use super::*;
456 use std::thread;
457
458 fn make_key(dataset: &str, query: &str) -> CoreCacheKey {
459 CoreCacheKey::new(dataset, query)
460 }
461
462 fn make_rows(count: usize) -> Vec<HashMap<String, String>> {
463 (0..count)
464 .map(|i| {
465 let mut m = HashMap::new();
466 m.insert("s".to_string(), format!("subject{}", i));
467 m.insert("o".to_string(), format!("object{}", i));
468 m
469 })
470 .collect()
471 }
472
473 #[test]
476 fn test_cache_key_same_input_same_fingerprint() {
477 let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
478 let k2 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
479 assert_eq!(k1, k2);
480 assert_eq!(k1.query_fingerprint, k2.query_fingerprint);
481 }
482
483 #[test]
484 fn test_cache_key_different_datasets_different_key() {
485 let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
486 let k2 = make_key("ds2", "SELECT * WHERE { ?s ?p ?o }");
487 assert_ne!(k1, k2);
488 }
489
490 #[test]
491 fn test_cache_key_different_queries_different_fingerprint() {
492 let k1 = make_key("ds", "SELECT ?s WHERE { ?s ?p ?o }");
493 let k2 = make_key("ds", "SELECT ?o WHERE { ?s ?p ?o }");
494 assert_ne!(k1.query_fingerprint, k2.query_fingerprint);
495 }
496
497 #[test]
498 fn test_cache_key_hash_stable() {
499 let k = make_key("myds", "ASK { <s> <p> <o> }");
501 let k2 = make_key("myds", "ASK { <s> <p> <o> }");
503 assert_eq!(k.query_fingerprint, k2.query_fingerprint);
504 }
505
506 #[test]
509 fn test_cache_entry_is_not_expired_initially() {
510 let now = Instant::now();
511 let entry = CoreCacheEntry {
512 key: make_key("ds", "q"),
513 result_rows: vec![],
514 accessed_predicates: vec![],
515 created_at: now,
516 last_accessed: now,
517 expires_at: now + Duration::from_secs(60),
518 hit_count: 0,
519 };
520 assert!(!entry.is_expired());
521 }
522
523 #[test]
524 fn test_cache_entry_is_expired_past_deadline() {
525 let past = Instant::now() - Duration::from_secs(1);
526 let entry = CoreCacheEntry {
527 key: make_key("ds", "q"),
528 result_rows: vec![],
529 accessed_predicates: vec![],
530 created_at: past,
531 last_accessed: past,
532 expires_at: past,
533 hit_count: 0,
534 };
535 assert!(entry.is_expired());
536 }
537
538 #[test]
541 fn test_cache_miss_on_empty() {
542 let cache = CoreResultCache::new(100, Duration::from_secs(60));
543 let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
544 assert!(cache.get(&key).is_none());
545 assert_eq!(cache.miss_count(), 1);
546 }
547
548 #[test]
549 fn test_cache_hit_after_put() {
550 let cache = CoreResultCache::new(100, Duration::from_secs(60));
551 let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
552 let rows = make_rows(3);
553 cache.put(key.clone(), rows.clone(), vec!["http://p".to_string()]);
554
555 let result = cache.get(&key).expect("cache hit expected");
556 assert_eq!(result.len(), 3);
557 assert_eq!(cache.hit_count(), 1);
558 assert_eq!(cache.miss_count(), 0);
559 }
560
561 #[test]
562 fn test_cache_size_increases_on_put() {
563 let cache = CoreResultCache::new(100, Duration::from_secs(60));
564 assert_eq!(cache.size(), 0);
565 cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
566 assert_eq!(cache.size(), 1);
567 cache.put(make_key("ds", "q2"), make_rows(2), vec![]);
568 assert_eq!(cache.size(), 2);
569 }
570
571 #[test]
572 fn test_cache_hit_rate_pure_hits() {
573 let cache = CoreResultCache::new(100, Duration::from_secs(60));
574 let key = make_key("ds", "q");
575 cache.put(key.clone(), make_rows(1), vec![]);
576 cache.get(&key);
577 cache.get(&key);
578 assert!((cache.hit_rate() - 1.0).abs() < f64::EPSILON);
579 }
580
581 #[test]
582 fn test_cache_hit_rate_mixed() {
583 let cache = CoreResultCache::new(100, Duration::from_secs(60));
584 let key = make_key("ds", "q");
585 cache.put(key.clone(), make_rows(1), vec![]);
586 cache.get(&key); cache.get(&make_key("ds", "other")); assert!((cache.hit_rate() - 0.5).abs() < f64::EPSILON);
590 }
591
592 #[test]
593 fn test_cache_ttl_expiration() {
594 let cache = CoreResultCache::new(100, Duration::from_millis(50));
595 let key = make_key("ds", "q");
596 cache.put(key.clone(), make_rows(1), vec![]);
597 assert!(cache.get(&key).is_some());
598
599 thread::sleep(Duration::from_millis(100));
600 assert!(cache.get(&key).is_none());
601 }
602
603 #[test]
604 fn test_cache_put_with_custom_ttl_expires() {
605 let cache = CoreResultCache::new(100, Duration::from_secs(300));
606 let key = make_key("ds", "custom_ttl");
607 cache.put_with_ttl(key.clone(), make_rows(1), vec![], Duration::from_millis(30));
609 assert!(cache.get(&key).is_some());
610 thread::sleep(Duration::from_millis(60));
611 assert!(cache.get(&key).is_none());
612 }
613
614 #[test]
615 fn test_cache_lru_eviction() {
616 let cache = CoreResultCache::new(3, Duration::from_secs(60));
618 cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
619 cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
620 cache.put(make_key("ds", "q3"), make_rows(1), vec![]);
621
622 cache.get(&make_key("ds", "q1"));
624
625 cache.put(make_key("ds", "q4"), make_rows(1), vec![]);
627
628 assert!(cache.get(&make_key("ds", "q1")).is_some());
629 assert!(cache.get(&make_key("ds", "q2")).is_none()); assert!(cache.get(&make_key("ds", "q3")).is_some());
631 assert!(cache.get(&make_key("ds", "q4")).is_some());
632 }
633
634 #[test]
635 fn test_cache_clear() {
636 let cache = CoreResultCache::new(100, Duration::from_secs(60));
637 cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
638 cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
639 assert_eq!(cache.size(), 2);
640 cache.clear();
641 assert_eq!(cache.size(), 0);
642 }
643
644 #[test]
647 fn test_invalidate_dataset() {
648 let cache = CoreResultCache::new(100, Duration::from_secs(60));
649 cache.put(make_key("dsA", "q1"), make_rows(1), vec!["p1".to_string()]);
650 cache.put(make_key("dsA", "q2"), make_rows(1), vec!["p2".to_string()]);
651 cache.put(make_key("dsB", "q3"), make_rows(1), vec!["p1".to_string()]);
652
653 let removed = cache.invalidate_dataset("dsA");
654 assert_eq!(removed, 2);
655 assert!(cache.get(&make_key("dsA", "q1")).is_none());
656 assert!(cache.get(&make_key("dsA", "q2")).is_none());
657 assert!(cache.get(&make_key("dsB", "q3")).is_some());
659 }
660
661 #[test]
662 fn test_invalidate_predicate_specific() {
663 let cache = CoreResultCache::new(100, Duration::from_secs(60));
664 cache.put(
665 make_key("ds", "q1"),
666 make_rows(1),
667 vec!["http://p/age".to_string()],
668 );
669 cache.put(
670 make_key("ds", "q2"),
671 make_rows(1),
672 vec!["http://p/name".to_string()],
673 );
674 cache.put(
675 make_key("ds", "q3"),
676 make_rows(1),
677 vec!["http://p/age".to_string(), "http://p/name".to_string()],
678 );
679
680 let removed = cache.invalidate_predicate("ds", "http://p/age");
681 assert_eq!(removed, 2);
683 assert!(cache.get(&make_key("ds", "q1")).is_none());
684 assert!(cache.get(&make_key("ds", "q2")).is_some());
685 assert!(cache.get(&make_key("ds", "q3")).is_none());
686 }
687
688 #[test]
689 fn test_invalidate_predicate_wildcard_entry() {
690 let cache = CoreResultCache::new(100, Duration::from_secs(60));
692 cache.put(make_key("ds", "q_wildcard"), make_rows(1), vec![]); let removed = cache.invalidate_predicate("ds", "http://p/anything");
695 assert_eq!(removed, 1);
696 assert!(cache.get(&make_key("ds", "q_wildcard")).is_none());
697 }
698
699 #[test]
700 fn test_invalidate_on_delta_affects_matching_entries() {
701 let cache = CoreResultCache::new(100, Duration::from_secs(60));
702 cache.put(
703 make_key("ds", "q_age"),
704 make_rows(1),
705 vec!["http://p/age".to_string()],
706 );
707 cache.put(
708 make_key("ds", "q_name"),
709 make_rows(1),
710 vec!["http://p/name".to_string()],
711 );
712
713 let deltas = vec![TripleDelta::Insert(
714 "s".into(),
715 "http://p/age".into(),
716 "30".into(),
717 )];
718 let removed = cache.invalidate_on_delta("ds", &deltas);
719 assert_eq!(removed, 1);
720 assert!(cache.get(&make_key("ds", "q_age")).is_none());
721 assert!(cache.get(&make_key("ds", "q_name")).is_some()); }
723
724 #[test]
725 fn test_invalidate_on_delta_wildcard_entry() {
726 let cache = CoreResultCache::new(100, Duration::from_secs(60));
727 cache.put(make_key("ds", "q_all"), make_rows(1), vec![]); let deltas = vec![TripleDelta::Delete(
730 "s".into(),
731 "http://p/whatever".into(),
732 "o".into(),
733 )];
734 let removed = cache.invalidate_on_delta("ds", &deltas);
735 assert_eq!(removed, 1);
736 }
737
738 #[test]
739 fn test_invalidate_on_delta_empty_deltas_removes_nothing() {
740 let cache = CoreResultCache::new(100, Duration::from_secs(60));
741 cache.put(make_key("ds", "q1"), make_rows(1), vec!["p".to_string()]);
742 let removed = cache.invalidate_on_delta("ds", &[]);
743 assert_eq!(removed, 0);
744 assert!(cache.get(&make_key("ds", "q1")).is_some());
745 }
746
747 #[test]
748 fn test_invalidate_on_delta_different_dataset_unaffected() {
749 let cache = CoreResultCache::new(100, Duration::from_secs(60));
750 cache.put(
751 make_key("dsA", "q1"),
752 make_rows(1),
753 vec!["http://p/age".to_string()],
754 );
755 cache.put(
756 make_key("dsB", "q2"),
757 make_rows(1),
758 vec!["http://p/age".to_string()],
759 );
760
761 let deltas = vec![TripleDelta::Insert(
762 "s".into(),
763 "http://p/age".into(),
764 "5".into(),
765 )];
766 let removed = cache.invalidate_on_delta("dsA", &deltas);
767 assert_eq!(removed, 1);
768 assert!(cache.get(&make_key("dsB", "q2")).is_some());
770 }
771
772 #[test]
775 fn test_concurrent_put_and_get() {
776 let cache = Arc::new(CoreResultCache::new(200, Duration::from_secs(60)));
777 let mut handles = vec![];
778
779 for i in 0..8 {
780 let c = Arc::clone(&cache);
781 handles.push(thread::spawn(move || {
782 for j in 0..25 {
783 let key = make_key("ds", &format!("query_{}_{}", i, j));
784 c.put(key.clone(), make_rows(2), vec![]);
785 let _ = c.get(&key);
786 }
787 }));
788 }
789 for h in handles {
790 h.join().expect("thread panicked");
791 }
792 assert!(cache.size() <= 200);
794 }
795
796 #[test]
797 fn test_put_overwrites_existing_key() {
798 let cache = CoreResultCache::new(100, Duration::from_secs(60));
799 let key = make_key("ds", "q");
800 cache.put(key.clone(), make_rows(1), vec![]);
801 cache.put(key.clone(), make_rows(5), vec![]);
802 let result = cache.get(&key).expect("hit expected");
804 assert_eq!(result.len(), 5);
805 assert_eq!(cache.size(), 1);
807 }
808}