Skip to main content

oxirs_core/cache/
result_cache.rs

1//! Core-level SPARQL query result cache with LRU eviction and delta-based invalidation.
2//!
3//! This module provides a thread-safe, dataset-scoped cache for SPARQL query results.  It is
4//! designed to sit at the **core layer**, below the individual query engine implementations, so
5//! that all engines (ARQ, rule-based, federated, …) can share a single cache.
6//!
7//! # Features
8//!
9//! - **FNV-1a fingerprinting**: Fast, collision-resistant query hashing.
10//! - **LRU eviction**: Least-recently-used entries are evicted when the capacity is reached.
11//! - **TTL expiration**: Entries expire after a configurable time-to-live.
12//! - **Dataset-scoped invalidation**: All entries belonging to a dataset can be invalidated in
13//!   one call, e.g. after bulk updates.
14//! - **Predicate-scoped invalidation**: Only entries that accessed a specific predicate are
15//!   invalidated — allowing fine-grained cache management.
16//! - **Delta-driven invalidation**: A list of [`TripleDelta`] events is used to determine exactly
17//!   which entries are affected by a set of changes.
18//! - **Metrics**: Hit/miss counters accessible via atomic loads.
19//!
20//! # Thread safety
21//!
22//! All public methods take `&self`; internal state is protected by a `Mutex`.  The design avoids
23//! `RwLock` because writes (TTL/access-time updates on cache hit) are almost as frequent as reads.
24
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant};
29
30pub use crate::view::incremental::TripleDelta;
31
32// ---------------------------------------------------------------------------
33// Cache key
34// ---------------------------------------------------------------------------
35
36/// A composite cache key that uniquely identifies a cached query result.
37///
38/// The key combines a dataset identifier with a 64-bit fingerprint of the query text so that the
39/// same query run against different datasets is stored in separate entries.
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct CoreCacheKey {
42    /// Identifier of the dataset this query was run against (e.g. a URL or a name).
43    pub dataset_id: String,
44    /// FNV-1a hash of the normalized query text.
45    pub query_fingerprint: u64,
46}
47
48impl CoreCacheKey {
49    /// Construct a new cache key from a dataset identifier and the raw SPARQL query text.
50    pub fn new(dataset_id: &str, query: &str) -> Self {
51        Self {
52            dataset_id: dataset_id.to_owned(),
53            query_fingerprint: Self::fingerprint(query),
54        }
55    }
56
57    /// FNV-1a hash of `s`.
58    ///
59    /// FNV-1a is fast, simple, and has good avalanche properties for short string keys.
60    fn fingerprint(s: &str) -> u64 {
61        const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
62        const FNV_PRIME: u64 = 1_099_511_628_211;
63
64        let mut hash = FNV_OFFSET;
65        for byte in s.bytes() {
66            hash ^= byte as u64;
67            hash = hash.wrapping_mul(FNV_PRIME);
68        }
69        hash
70    }
71}
72
73// ---------------------------------------------------------------------------
74// Cache entry
75// ---------------------------------------------------------------------------
76
77/// An individual entry in the result cache.
78#[derive(Debug, Clone)]
79pub struct CoreCacheEntry {
80    /// Composite key that identifies this entry.
81    pub key: CoreCacheKey,
82    /// Cached result rows (variable bindings).
83    pub result_rows: Vec<HashMap<String, String>>,
84    /// Predicate IRIs that the cached query accessed.  Used for targeted invalidation.
85    pub accessed_predicates: Vec<String>,
86    /// When this entry was stored.
87    pub created_at: Instant,
88    /// When this entry was last read.
89    pub last_accessed: Instant,
90    /// Entry becomes invalid after this instant.
91    pub expires_at: Instant,
92    /// Number of times this entry has been served from the cache.
93    pub hit_count: u64,
94}
95
96impl CoreCacheEntry {
97    /// Return `true` if this entry has passed its TTL.
98    pub fn is_expired(&self) -> bool {
99        Instant::now() >= self.expires_at
100    }
101
102    /// Record a cache hit: bump the hit counter and update `last_accessed`.
103    fn touch(&mut self) {
104        self.hit_count += 1;
105        self.last_accessed = Instant::now();
106    }
107}
108
109// ---------------------------------------------------------------------------
110// LRU node
111// ---------------------------------------------------------------------------
112
113/// Intrusive doubly-linked LRU list node stored alongside the entry map.
114///
115/// We use a `Vec<String>` as an ordered access log rather than a true linked list to avoid
116/// unsafe pointer arithmetic.  For cache sizes up to a few thousand entries this is fast enough;
117/// large-scale deployments can opt for a more sophisticated structure.
118struct LruList {
119    /// Ordered by access time: front = least recently used, back = most recently used.
120    order: Vec<CoreCacheKey>,
121}
122
123impl LruList {
124    fn new(capacity: usize) -> Self {
125        Self {
126            order: Vec::with_capacity(capacity),
127        }
128    }
129
130    /// Record an access for `key`, moving it to the back of the list.
131    fn touch(&mut self, key: &CoreCacheKey) {
132        if let Some(pos) = self.order.iter().position(|k| k == key) {
133            self.order.remove(pos);
134        }
135        self.order.push(key.clone());
136    }
137
138    /// Remove `key` from the list (called on explicit removal/invalidation).
139    fn remove(&mut self, key: &CoreCacheKey) {
140        self.order.retain(|k| k != key);
141    }
142
143    /// Return the least-recently-used key, or `None` if the list is empty.
144    fn pop_lru(&mut self) -> Option<CoreCacheKey> {
145        if self.order.is_empty() {
146            None
147        } else {
148            Some(self.order.remove(0))
149        }
150    }
151
152    fn len(&self) -> usize {
153        self.order.len()
154    }
155}
156
157// ---------------------------------------------------------------------------
158// Cache internals (held under a single Mutex)
159// ---------------------------------------------------------------------------
160
161struct CacheInner {
162    entries: HashMap<CoreCacheKey, CoreCacheEntry>,
163    lru: LruList,
164    capacity: usize,
165}
166
167impl CacheInner {
168    fn new(capacity: usize) -> Self {
169        Self {
170            entries: HashMap::with_capacity(capacity),
171            lru: LruList::new(capacity),
172            capacity,
173        }
174    }
175
176    /// Evict entries until `entries.len() < capacity`.  Returns the number evicted.
177    fn evict_to_capacity(&mut self) -> usize {
178        let mut evicted = 0;
179        while self.entries.len() >= self.capacity {
180            if let Some(lru_key) = self.lru.pop_lru() {
181                self.entries.remove(&lru_key);
182                evicted += 1;
183            } else {
184                break;
185            }
186        }
187        evicted
188    }
189
190    /// Remove all expired entries and return the count removed.
191    fn purge_expired(&mut self) -> usize {
192        let expired: Vec<CoreCacheKey> = self
193            .entries
194            .iter()
195            .filter(|(_, e)| e.is_expired())
196            .map(|(k, _)| k.clone())
197            .collect();
198
199        let count = expired.len();
200        for key in &expired {
201            self.entries.remove(key);
202            self.lru.remove(key);
203        }
204        count
205    }
206}
207
208// ---------------------------------------------------------------------------
209// Public cache type
210// ---------------------------------------------------------------------------
211
212/// Thread-safe core-level SPARQL query result cache.
213///
214/// Construct with [`CoreResultCache::new`] and use `put` / `get` for caching.
215///
216/// # Example
217///
218/// ```
219/// use oxirs_core::cache::result_cache::{CoreResultCache, CoreCacheKey};
220/// use std::time::Duration;
221///
222/// let cache = CoreResultCache::new(1000, Duration::from_secs(300));
223/// let key   = CoreCacheKey::new("my_dataset", "SELECT * WHERE { ?s ?p ?o }");
224/// cache.put(key.clone(), vec![], vec!["http://p/name".to_string()]);
225/// assert!(cache.get(&key).is_some());
226/// ```
227pub struct CoreResultCache {
228    inner: Arc<Mutex<CacheInner>>,
229    default_ttl: Duration,
230    hits: Arc<AtomicU64>,
231    misses: Arc<AtomicU64>,
232}
233
234impl CoreResultCache {
235    /// Create a new cache with `capacity` entries and a default TTL.
236    pub fn new(capacity: usize, ttl: Duration) -> Self {
237        Self {
238            inner: Arc::new(Mutex::new(CacheInner::new(capacity.max(1)))),
239            default_ttl: ttl,
240            hits: Arc::new(AtomicU64::new(0)),
241            misses: Arc::new(AtomicU64::new(0)),
242        }
243    }
244
245    /// Retrieve a cached result.
246    ///
247    /// Returns `None` on a miss (including expired entries, which are then removed).
248    /// On a hit, `last_accessed` and `hit_count` are updated and the entry is moved to the
249    /// MRU position.
250    pub fn get(&self, key: &CoreCacheKey) -> Option<Vec<HashMap<String, String>>> {
251        let mut inner = self.inner.lock().expect("cache lock poisoned");
252
253        if let Some(entry) = inner.entries.get_mut(key) {
254            if entry.is_expired() {
255                // Remove expired entry and count as a miss.
256                let key_clone = key.clone();
257                inner.entries.remove(&key_clone);
258                inner.lru.remove(&key_clone);
259                self.misses.fetch_add(1, Ordering::Relaxed);
260                return None;
261            }
262            entry.touch();
263            let result = entry.result_rows.clone();
264            inner.lru.touch(key);
265            self.hits.fetch_add(1, Ordering::Relaxed);
266            Some(result)
267        } else {
268            self.misses.fetch_add(1, Ordering::Relaxed);
269            None
270        }
271    }
272
273    /// Store a query result in the cache.
274    ///
275    /// If the cache is at capacity, the least-recently-used entry is evicted first.  Expired
276    /// entries are also purged opportunistically on every `put`.
277    pub fn put(
278        &self,
279        key: CoreCacheKey,
280        rows: Vec<HashMap<String, String>>,
281        predicates: Vec<String>,
282    ) {
283        self.put_with_ttl(key, rows, predicates, self.default_ttl);
284    }
285
286    /// Like [`CoreResultCache::put`] but with a custom TTL for this entry.
287    pub fn put_with_ttl(
288        &self,
289        key: CoreCacheKey,
290        rows: Vec<HashMap<String, String>>,
291        predicates: Vec<String>,
292        ttl: Duration,
293    ) {
294        let now = Instant::now();
295        let entry = CoreCacheEntry {
296            key: key.clone(),
297            result_rows: rows,
298            accessed_predicates: predicates,
299            created_at: now,
300            last_accessed: now,
301            expires_at: now + ttl,
302            hit_count: 0,
303        };
304
305        let mut inner = self.inner.lock().expect("cache lock poisoned");
306
307        // Opportunistically purge expired entries.
308        inner.purge_expired();
309
310        // Make room if needed.
311        inner.evict_to_capacity();
312
313        // If key already exists, remove it from the LRU list first.
314        if inner.entries.contains_key(&key) {
315            inner.lru.remove(&key);
316        }
317
318        inner.lru.touch(&key);
319        inner.entries.insert(key, entry);
320    }
321
322    /// Invalidate all entries for a given dataset.
323    ///
324    /// Returns the number of entries removed.
325    pub fn invalidate_dataset(&self, dataset_id: &str) -> usize {
326        let mut inner = self.inner.lock().expect("cache lock poisoned");
327
328        let to_remove: Vec<CoreCacheKey> = inner
329            .entries
330            .keys()
331            .filter(|k| k.dataset_id == dataset_id)
332            .cloned()
333            .collect();
334
335        let count = to_remove.len();
336        for key in &to_remove {
337            inner.entries.remove(key);
338            inner.lru.remove(key);
339        }
340        count
341    }
342
343    /// Invalidate all entries for a dataset that accessed a specific predicate.
344    ///
345    /// Returns the number of entries removed.
346    pub fn invalidate_predicate(&self, dataset_id: &str, predicate: &str) -> usize {
347        let mut inner = self.inner.lock().expect("cache lock poisoned");
348
349        let to_remove: Vec<CoreCacheKey> = inner
350            .entries
351            .iter()
352            .filter(|(k, e)| {
353                k.dataset_id == dataset_id
354                    && (e.accessed_predicates.is_empty()
355                        || e.accessed_predicates.iter().any(|p| p == predicate))
356            })
357            .map(|(k, _)| k.clone())
358            .collect();
359
360        let count = to_remove.len();
361        for key in &to_remove {
362            inner.entries.remove(key);
363            inner.lru.remove(key);
364        }
365        count
366    }
367
368    /// Invalidate cache entries affected by the given triple deltas.
369    ///
370    /// An entry is considered affected if:
371    /// - Its `accessed_predicates` list is empty (wildcard), **or**
372    /// - Any of the delta predicates appears in the entry's `accessed_predicates`.
373    ///
374    /// Returns the number of entries removed.
375    pub fn invalidate_on_delta(&self, dataset_id: &str, deltas: &[TripleDelta]) -> usize {
376        if deltas.is_empty() {
377            return 0;
378        }
379
380        // Collect the set of affected predicates from deltas.
381        let changed_predicates: std::collections::HashSet<&str> =
382            deltas.iter().map(|d| d.predicate()).collect();
383
384        let mut inner = self.inner.lock().expect("cache lock poisoned");
385
386        let to_remove: Vec<CoreCacheKey> = inner
387            .entries
388            .iter()
389            .filter(|(k, e)| {
390                if k.dataset_id != dataset_id {
391                    return false;
392                }
393                if e.accessed_predicates.is_empty() {
394                    return true; // wildcard: invalidate always
395                }
396                e.accessed_predicates
397                    .iter()
398                    .any(|p| changed_predicates.contains(p.as_str()))
399            })
400            .map(|(k, _)| k.clone())
401            .collect();
402
403        let count = to_remove.len();
404        for key in &to_remove {
405            inner.entries.remove(key);
406            inner.lru.remove(key);
407        }
408        count
409    }
410
411    /// Return the cache hit rate in `[0, 1]`.
412    ///
413    /// Returns `0.0` if no requests have been made yet.
414    pub fn hit_rate(&self) -> f64 {
415        let hits = self.hits.load(Ordering::Relaxed);
416        let misses = self.misses.load(Ordering::Relaxed);
417        let total = hits + misses;
418        if total == 0 {
419            0.0
420        } else {
421            hits as f64 / total as f64
422        }
423    }
424
425    /// Return the number of entries currently in the cache (including potentially expired ones
426    /// that have not yet been purged).
427    pub fn size(&self) -> usize {
428        self.inner.lock().expect("cache lock poisoned").lru.len()
429    }
430
431    /// Forcibly remove all entries from the cache.
432    pub fn clear(&self) {
433        let mut inner = self.inner.lock().expect("cache lock poisoned");
434        inner.entries.clear();
435        inner.lru.order.clear();
436    }
437
438    /// Return the raw hit count.
439    pub fn hit_count(&self) -> u64 {
440        self.hits.load(Ordering::Relaxed)
441    }
442
443    /// Return the raw miss count.
444    pub fn miss_count(&self) -> u64 {
445        self.misses.load(Ordering::Relaxed)
446    }
447}
448
449// ---------------------------------------------------------------------------
450// Tests
451// ---------------------------------------------------------------------------
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use std::thread;
457
458    fn make_key(dataset: &str, query: &str) -> CoreCacheKey {
459        CoreCacheKey::new(dataset, query)
460    }
461
462    fn make_rows(count: usize) -> Vec<HashMap<String, String>> {
463        (0..count)
464            .map(|i| {
465                let mut m = HashMap::new();
466                m.insert("s".to_string(), format!("subject{}", i));
467                m.insert("o".to_string(), format!("object{}", i));
468                m
469            })
470            .collect()
471    }
472
473    // --- CoreCacheKey tests ---
474
475    #[test]
476    fn test_cache_key_same_input_same_fingerprint() {
477        let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
478        let k2 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
479        assert_eq!(k1, k2);
480        assert_eq!(k1.query_fingerprint, k2.query_fingerprint);
481    }
482
483    #[test]
484    fn test_cache_key_different_datasets_different_key() {
485        let k1 = make_key("ds1", "SELECT * WHERE { ?s ?p ?o }");
486        let k2 = make_key("ds2", "SELECT * WHERE { ?s ?p ?o }");
487        assert_ne!(k1, k2);
488    }
489
490    #[test]
491    fn test_cache_key_different_queries_different_fingerprint() {
492        let k1 = make_key("ds", "SELECT ?s WHERE { ?s ?p ?o }");
493        let k2 = make_key("ds", "SELECT ?o WHERE { ?s ?p ?o }");
494        assert_ne!(k1.query_fingerprint, k2.query_fingerprint);
495    }
496
497    #[test]
498    fn test_cache_key_hash_stable() {
499        // The FNV-1a implementation must be deterministic across runs.
500        let k = make_key("myds", "ASK { <s> <p> <o> }");
501        // Run twice — fingerprint must be identical.
502        let k2 = make_key("myds", "ASK { <s> <p> <o> }");
503        assert_eq!(k.query_fingerprint, k2.query_fingerprint);
504    }
505
506    // --- CoreCacheEntry tests ---
507
508    #[test]
509    fn test_cache_entry_is_not_expired_initially() {
510        let now = Instant::now();
511        let entry = CoreCacheEntry {
512            key: make_key("ds", "q"),
513            result_rows: vec![],
514            accessed_predicates: vec![],
515            created_at: now,
516            last_accessed: now,
517            expires_at: now + Duration::from_secs(60),
518            hit_count: 0,
519        };
520        assert!(!entry.is_expired());
521    }
522
523    #[test]
524    fn test_cache_entry_is_expired_past_deadline() {
525        let past = Instant::now() - Duration::from_secs(1);
526        let entry = CoreCacheEntry {
527            key: make_key("ds", "q"),
528            result_rows: vec![],
529            accessed_predicates: vec![],
530            created_at: past,
531            last_accessed: past,
532            expires_at: past,
533            hit_count: 0,
534        };
535        assert!(entry.is_expired());
536    }
537
538    // --- CoreResultCache basic tests ---
539
540    #[test]
541    fn test_cache_miss_on_empty() {
542        let cache = CoreResultCache::new(100, Duration::from_secs(60));
543        let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
544        assert!(cache.get(&key).is_none());
545        assert_eq!(cache.miss_count(), 1);
546    }
547
548    #[test]
549    fn test_cache_hit_after_put() {
550        let cache = CoreResultCache::new(100, Duration::from_secs(60));
551        let key = make_key("ds", "SELECT * WHERE { ?s ?p ?o }");
552        let rows = make_rows(3);
553        cache.put(key.clone(), rows.clone(), vec!["http://p".to_string()]);
554
555        let result = cache.get(&key).expect("cache hit expected");
556        assert_eq!(result.len(), 3);
557        assert_eq!(cache.hit_count(), 1);
558        assert_eq!(cache.miss_count(), 0);
559    }
560
561    #[test]
562    fn test_cache_size_increases_on_put() {
563        let cache = CoreResultCache::new(100, Duration::from_secs(60));
564        assert_eq!(cache.size(), 0);
565        cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
566        assert_eq!(cache.size(), 1);
567        cache.put(make_key("ds", "q2"), make_rows(2), vec![]);
568        assert_eq!(cache.size(), 2);
569    }
570
571    #[test]
572    fn test_cache_hit_rate_pure_hits() {
573        let cache = CoreResultCache::new(100, Duration::from_secs(60));
574        let key = make_key("ds", "q");
575        cache.put(key.clone(), make_rows(1), vec![]);
576        cache.get(&key);
577        cache.get(&key);
578        assert!((cache.hit_rate() - 1.0).abs() < f64::EPSILON);
579    }
580
581    #[test]
582    fn test_cache_hit_rate_mixed() {
583        let cache = CoreResultCache::new(100, Duration::from_secs(60));
584        let key = make_key("ds", "q");
585        cache.put(key.clone(), make_rows(1), vec![]);
586        cache.get(&key); // hit
587        cache.get(&make_key("ds", "other")); // miss
588                                             // 1 hit, 1 miss → 0.5
589        assert!((cache.hit_rate() - 0.5).abs() < f64::EPSILON);
590    }
591
592    #[test]
593    fn test_cache_ttl_expiration() {
594        let cache = CoreResultCache::new(100, Duration::from_millis(50));
595        let key = make_key("ds", "q");
596        cache.put(key.clone(), make_rows(1), vec![]);
597        assert!(cache.get(&key).is_some());
598
599        thread::sleep(Duration::from_millis(100));
600        assert!(cache.get(&key).is_none());
601    }
602
603    #[test]
604    fn test_cache_put_with_custom_ttl_expires() {
605        let cache = CoreResultCache::new(100, Duration::from_secs(300));
606        let key = make_key("ds", "custom_ttl");
607        // Override with very short TTL.
608        cache.put_with_ttl(key.clone(), make_rows(1), vec![], Duration::from_millis(30));
609        assert!(cache.get(&key).is_some());
610        thread::sleep(Duration::from_millis(60));
611        assert!(cache.get(&key).is_none());
612    }
613
614    #[test]
615    fn test_cache_lru_eviction() {
616        // Capacity 3 → inserting a 4th should evict the LRU entry.
617        let cache = CoreResultCache::new(3, Duration::from_secs(60));
618        cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
619        cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
620        cache.put(make_key("ds", "q3"), make_rows(1), vec![]);
621
622        // Access q1 to make it MRU.
623        cache.get(&make_key("ds", "q1"));
624
625        // Insert q4 — q2 should be the LRU and get evicted.
626        cache.put(make_key("ds", "q4"), make_rows(1), vec![]);
627
628        assert!(cache.get(&make_key("ds", "q1")).is_some());
629        assert!(cache.get(&make_key("ds", "q2")).is_none()); // evicted
630        assert!(cache.get(&make_key("ds", "q3")).is_some());
631        assert!(cache.get(&make_key("ds", "q4")).is_some());
632    }
633
634    #[test]
635    fn test_cache_clear() {
636        let cache = CoreResultCache::new(100, Duration::from_secs(60));
637        cache.put(make_key("ds", "q1"), make_rows(1), vec![]);
638        cache.put(make_key("ds", "q2"), make_rows(1), vec![]);
639        assert_eq!(cache.size(), 2);
640        cache.clear();
641        assert_eq!(cache.size(), 0);
642    }
643
644    // --- Invalidation tests ---
645
646    #[test]
647    fn test_invalidate_dataset() {
648        let cache = CoreResultCache::new(100, Duration::from_secs(60));
649        cache.put(make_key("dsA", "q1"), make_rows(1), vec!["p1".to_string()]);
650        cache.put(make_key("dsA", "q2"), make_rows(1), vec!["p2".to_string()]);
651        cache.put(make_key("dsB", "q3"), make_rows(1), vec!["p1".to_string()]);
652
653        let removed = cache.invalidate_dataset("dsA");
654        assert_eq!(removed, 2);
655        assert!(cache.get(&make_key("dsA", "q1")).is_none());
656        assert!(cache.get(&make_key("dsA", "q2")).is_none());
657        // dsB entry untouched
658        assert!(cache.get(&make_key("dsB", "q3")).is_some());
659    }
660
661    #[test]
662    fn test_invalidate_predicate_specific() {
663        let cache = CoreResultCache::new(100, Duration::from_secs(60));
664        cache.put(
665            make_key("ds", "q1"),
666            make_rows(1),
667            vec!["http://p/age".to_string()],
668        );
669        cache.put(
670            make_key("ds", "q2"),
671            make_rows(1),
672            vec!["http://p/name".to_string()],
673        );
674        cache.put(
675            make_key("ds", "q3"),
676            make_rows(1),
677            vec!["http://p/age".to_string(), "http://p/name".to_string()],
678        );
679
680        let removed = cache.invalidate_predicate("ds", "http://p/age");
681        // q1 and q3 both access age; q2 only accesses name.
682        assert_eq!(removed, 2);
683        assert!(cache.get(&make_key("ds", "q1")).is_none());
684        assert!(cache.get(&make_key("ds", "q2")).is_some());
685        assert!(cache.get(&make_key("ds", "q3")).is_none());
686    }
687
688    #[test]
689    fn test_invalidate_predicate_wildcard_entry() {
690        // An entry with no accessed_predicates is a wildcard and must be invalidated.
691        let cache = CoreResultCache::new(100, Duration::from_secs(60));
692        cache.put(make_key("ds", "q_wildcard"), make_rows(1), vec![]); // no predicates
693
694        let removed = cache.invalidate_predicate("ds", "http://p/anything");
695        assert_eq!(removed, 1);
696        assert!(cache.get(&make_key("ds", "q_wildcard")).is_none());
697    }
698
699    #[test]
700    fn test_invalidate_on_delta_affects_matching_entries() {
701        let cache = CoreResultCache::new(100, Duration::from_secs(60));
702        cache.put(
703            make_key("ds", "q_age"),
704            make_rows(1),
705            vec!["http://p/age".to_string()],
706        );
707        cache.put(
708            make_key("ds", "q_name"),
709            make_rows(1),
710            vec!["http://p/name".to_string()],
711        );
712
713        let deltas = vec![TripleDelta::Insert(
714            "s".into(),
715            "http://p/age".into(),
716            "30".into(),
717        )];
718        let removed = cache.invalidate_on_delta("ds", &deltas);
719        assert_eq!(removed, 1);
720        assert!(cache.get(&make_key("ds", "q_age")).is_none());
721        assert!(cache.get(&make_key("ds", "q_name")).is_some()); // unaffected
722    }
723
724    #[test]
725    fn test_invalidate_on_delta_wildcard_entry() {
726        let cache = CoreResultCache::new(100, Duration::from_secs(60));
727        cache.put(make_key("ds", "q_all"), make_rows(1), vec![]); // wildcard
728
729        let deltas = vec![TripleDelta::Delete(
730            "s".into(),
731            "http://p/whatever".into(),
732            "o".into(),
733        )];
734        let removed = cache.invalidate_on_delta("ds", &deltas);
735        assert_eq!(removed, 1);
736    }
737
738    #[test]
739    fn test_invalidate_on_delta_empty_deltas_removes_nothing() {
740        let cache = CoreResultCache::new(100, Duration::from_secs(60));
741        cache.put(make_key("ds", "q1"), make_rows(1), vec!["p".to_string()]);
742        let removed = cache.invalidate_on_delta("ds", &[]);
743        assert_eq!(removed, 0);
744        assert!(cache.get(&make_key("ds", "q1")).is_some());
745    }
746
747    #[test]
748    fn test_invalidate_on_delta_different_dataset_unaffected() {
749        let cache = CoreResultCache::new(100, Duration::from_secs(60));
750        cache.put(
751            make_key("dsA", "q1"),
752            make_rows(1),
753            vec!["http://p/age".to_string()],
754        );
755        cache.put(
756            make_key("dsB", "q2"),
757            make_rows(1),
758            vec!["http://p/age".to_string()],
759        );
760
761        let deltas = vec![TripleDelta::Insert(
762            "s".into(),
763            "http://p/age".into(),
764            "5".into(),
765        )];
766        let removed = cache.invalidate_on_delta("dsA", &deltas);
767        assert_eq!(removed, 1);
768        // dsB's entry should be untouched.
769        assert!(cache.get(&make_key("dsB", "q2")).is_some());
770    }
771
772    // --- Concurrent access ---
773
774    #[test]
775    fn test_concurrent_put_and_get() {
776        let cache = Arc::new(CoreResultCache::new(200, Duration::from_secs(60)));
777        let mut handles = vec![];
778
779        for i in 0..8 {
780            let c = Arc::clone(&cache);
781            handles.push(thread::spawn(move || {
782                for j in 0..25 {
783                    let key = make_key("ds", &format!("query_{}_{}", i, j));
784                    c.put(key.clone(), make_rows(2), vec![]);
785                    let _ = c.get(&key);
786                }
787            }));
788        }
789        for h in handles {
790            h.join().expect("thread panicked");
791        }
792        // After all threads finish the cache should be internally consistent.
793        assert!(cache.size() <= 200);
794    }
795
796    #[test]
797    fn test_put_overwrites_existing_key() {
798        let cache = CoreResultCache::new(100, Duration::from_secs(60));
799        let key = make_key("ds", "q");
800        cache.put(key.clone(), make_rows(1), vec![]);
801        cache.put(key.clone(), make_rows(5), vec![]);
802        // Should return the most recently stored value.
803        let result = cache.get(&key).expect("hit expected");
804        assert_eq!(result.len(), 5);
805        // Size should remain 1 (not 2).
806        assert_eq!(cache.size(), 1);
807    }
808}