Skip to main content

oxirs_core/query/
result_cache.rs

1//! Advanced query result cache with TTL and LRU eviction
2//!
3//! This module provides a production-ready caching system for SPARQL query results
4//! with support for:
5//! - Time-To-Live (TTL) expiration
6//! - Least Recently Used (LRU) eviction
7//! - Memory-aware cache management
8//! - Concurrent access with minimal contention
9//! - Cache statistics and monitoring
10//!
11//! # Example
12//!
13//! ```
14//! use oxirs_core::query::result_cache::{QueryResultCache, CacheConfig};
15//! use std::time::Duration;
16//!
17//! let config = CacheConfig {
18//!     max_entries: 1000,
19//!     max_memory_bytes: 100 * 1024 * 1024, // 100 MB
20//!     default_ttl: Duration::from_secs(300), // 5 minutes
21//!     enable_lru: true,
22//! };
23//!
24//! let cache = QueryResultCache::new(config);
25//!
26//! // Cache a query result
27//! let query = "SELECT * WHERE { ?s ?p ?o }".to_string();
28//! let results = vec!["result1".to_string(), "result2".to_string()];
29//! cache.put(query.clone(), results.clone());
30//!
31//! // Retrieve from cache
32//! if let Some(cached) = cache.get(&query) {
33//!     println!("Cache hit! Results: {:?}", cached);
34//! }
35//! ```
36
37use scirs2_core::metrics::MetricsRegistry;
38use std::collections::{HashMap, VecDeque};
39use std::sync::atomic::{AtomicU64, Ordering};
40use std::sync::{Arc, RwLock};
41use std::time::{Duration, Instant};
42
43/// Configuration for the query result cache
44#[derive(Debug, Clone)]
45pub struct CacheConfig {
46    /// Maximum number of cache entries
47    pub max_entries: usize,
48    /// Maximum memory usage in bytes
49    pub max_memory_bytes: u64,
50    /// Default TTL for cache entries
51    pub default_ttl: Duration,
52    /// Enable LRU eviction
53    pub enable_lru: bool,
54}
55
56impl Default for CacheConfig {
57    fn default() -> Self {
58        Self {
59            max_entries: 10000,
60            max_memory_bytes: 1024 * 1024 * 1024,  // 1 GB
61            default_ttl: Duration::from_secs(300), // 5 minutes
62            enable_lru: true,
63        }
64    }
65}
66
67/// Cache entry with metadata
68#[derive(Debug, Clone)]
69struct CacheEntry<V> {
70    /// Cached value
71    value: V,
72    /// Estimated size in bytes
73    size_bytes: u64,
74    /// Creation timestamp
75    #[allow(dead_code)]
76    created_at: Instant,
77    /// Expiration timestamp
78    expires_at: Instant,
79    /// Last accessed timestamp (for LRU)
80    last_accessed: Instant,
81    /// Access count
82    access_count: u64,
83}
84
85impl<V> CacheEntry<V> {
86    fn is_expired(&self) -> bool {
87        Instant::now() >= self.expires_at
88    }
89
90    fn touch(&mut self) {
91        self.last_accessed = Instant::now();
92        self.access_count += 1;
93    }
94}
95
96/// Query result cache with TTL and LRU eviction
97///
98/// Thread-safe cache implementation optimized for concurrent SPARQL query workloads.
99/// Uses read-write locks with minimal contention and efficient memory management.
100pub struct QueryResultCache<V: Clone> {
101    /// Cache configuration
102    config: CacheConfig,
103    /// Cache entries
104    entries: Arc<RwLock<HashMap<String, CacheEntry<V>>>>,
105    /// LRU queue (query keys in access order)
106    lru_queue: Arc<RwLock<VecDeque<String>>>,
107    /// Current memory usage
108    current_memory: Arc<AtomicU64>,
109    /// Cache statistics
110    stats: CacheStats,
111    /// Metrics registry (reserved for future monitoring features)
112    #[allow(dead_code)]
113    metrics: Arc<MetricsRegistry>,
114}
115
116/// Cache statistics for monitoring
117#[derive(Clone)]
118pub struct CacheStats {
119    /// Total cache hits
120    pub hits: Arc<AtomicU64>,
121    /// Total cache misses
122    pub misses: Arc<AtomicU64>,
123    /// Total evictions (LRU)
124    pub evictions: Arc<AtomicU64>,
125    /// Total expirations (TTL)
126    pub expirations: Arc<AtomicU64>,
127    /// Total puts
128    pub puts: Arc<AtomicU64>,
129    /// Total invalidations
130    pub invalidations: Arc<AtomicU64>,
131}
132
133impl CacheStats {
134    fn new() -> Self {
135        Self {
136            hits: Arc::new(AtomicU64::new(0)),
137            misses: Arc::new(AtomicU64::new(0)),
138            evictions: Arc::new(AtomicU64::new(0)),
139            expirations: Arc::new(AtomicU64::new(0)),
140            puts: Arc::new(AtomicU64::new(0)),
141            invalidations: Arc::new(AtomicU64::new(0)),
142        }
143    }
144
145    /// Get cache hit rate (0.0 to 1.0)
146    pub fn hit_rate(&self) -> f64 {
147        let hits = self.hits.load(Ordering::Relaxed);
148        let misses = self.misses.load(Ordering::Relaxed);
149        let total = hits + misses;
150        if total == 0 {
151            0.0
152        } else {
153            hits as f64 / total as f64
154        }
155    }
156
157    /// Reset all statistics
158    pub fn reset(&self) {
159        self.hits.store(0, Ordering::Relaxed);
160        self.misses.store(0, Ordering::Relaxed);
161        self.evictions.store(0, Ordering::Relaxed);
162        self.expirations.store(0, Ordering::Relaxed);
163        self.puts.store(0, Ordering::Relaxed);
164        self.invalidations.store(0, Ordering::Relaxed);
165    }
166}
167
168impl<V: Clone> QueryResultCache<V> {
169    /// Create a new query result cache with the given configuration
170    pub fn new(config: CacheConfig) -> Self {
171        let metrics = MetricsRegistry::new();
172
173        Self {
174            config,
175            entries: Arc::new(RwLock::new(HashMap::new())),
176            lru_queue: Arc::new(RwLock::new(VecDeque::new())),
177            current_memory: Arc::new(AtomicU64::new(0)),
178            stats: CacheStats::new(),
179            metrics: Arc::new(metrics),
180        }
181    }
182
183    /// Put a value in the cache with default TTL
184    pub fn put(&self, key: String, value: V) {
185        self.put_with_ttl(key, value, self.config.default_ttl);
186    }
187
188    /// Put a value in the cache with custom TTL
189    pub fn put_with_ttl(&self, key: String, value: V, ttl: Duration) {
190        let now = Instant::now();
191        let size_bytes = self.estimate_size(&value);
192
193        let entry = CacheEntry {
194            value,
195            size_bytes,
196            created_at: now,
197            expires_at: now + ttl,
198            last_accessed: now,
199            access_count: 0,
200        };
201
202        // Check if we need to evict entries
203        self.ensure_capacity(size_bytes);
204
205        {
206            let mut entries = self.entries.write().expect("entries lock poisoned");
207
208            // Remove old entry if exists
209            if let Some(old_entry) = entries.remove(&key) {
210                self.current_memory
211                    .fetch_sub(old_entry.size_bytes, Ordering::Relaxed);
212            }
213
214            // Insert new entry
215            entries.insert(key.clone(), entry);
216            self.current_memory.fetch_add(size_bytes, Ordering::Relaxed);
217        }
218
219        // Update LRU queue
220        if self.config.enable_lru {
221            let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
222            lru.retain(|k| k != &key); // Remove if already exists
223            lru.push_back(key);
224        }
225
226        self.stats.puts.fetch_add(1, Ordering::Relaxed);
227    }
228
229    /// Get a value from the cache
230    pub fn get(&self, key: &str) -> Option<V> {
231        // Clean expired entries periodically
232        self.clean_expired();
233
234        let mut entries = self.entries.write().expect("entries lock poisoned");
235
236        if let Some(entry) = entries.get_mut(key) {
237            if entry.is_expired() {
238                // Entry expired
239                self.current_memory
240                    .fetch_sub(entry.size_bytes, Ordering::Relaxed);
241                entries.remove(key);
242                self.stats.expirations.fetch_add(1, Ordering::Relaxed);
243                self.stats.misses.fetch_add(1, Ordering::Relaxed);
244                return None;
245            }
246
247            // Update access metadata
248            entry.touch();
249
250            // Update LRU queue
251            if self.config.enable_lru {
252                let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
253                lru.retain(|k| k != key);
254                lru.push_back(key.to_string());
255            }
256
257            self.stats.hits.fetch_add(1, Ordering::Relaxed);
258            Some(entry.value.clone())
259        } else {
260            self.stats.misses.fetch_add(1, Ordering::Relaxed);
261            None
262        }
263    }
264
265    /// Invalidate (remove) a cache entry
266    pub fn invalidate(&self, key: &str) -> bool {
267        let mut entries = self.entries.write().expect("entries lock poisoned");
268
269        if let Some(entry) = entries.remove(key) {
270            self.current_memory
271                .fetch_sub(entry.size_bytes, Ordering::Relaxed);
272
273            if self.config.enable_lru {
274                let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
275                lru.retain(|k| k != key);
276            }
277
278            self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
279            true
280        } else {
281            false
282        }
283    }
284
285    /// Clear all cache entries
286    pub fn clear(&self) {
287        let mut entries = self.entries.write().expect("entries lock poisoned");
288        entries.clear();
289
290        if self.config.enable_lru {
291            let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
292            lru.clear();
293        }
294
295        self.current_memory.store(0, Ordering::Relaxed);
296    }
297
298    /// Get current cache size (number of entries)
299    pub fn len(&self) -> usize {
300        self.entries.read().expect("entries lock poisoned").len()
301    }
302
303    /// Check if cache is empty
304    pub fn is_empty(&self) -> bool {
305        self.len() == 0
306    }
307
308    /// Get current memory usage in bytes
309    pub fn memory_usage(&self) -> u64 {
310        self.current_memory.load(Ordering::Relaxed)
311    }
312
313    /// Get cache statistics
314    pub fn stats(&self) -> CacheStats {
315        self.stats.clone()
316    }
317
318    /// Clean expired entries
319    fn clean_expired(&self) {
320        let mut entries = self.entries.write().expect("entries lock poisoned");
321        let mut to_remove = Vec::new();
322
323        for (key, entry) in entries.iter() {
324            if entry.is_expired() {
325                to_remove.push((key.clone(), entry.size_bytes));
326            }
327        }
328
329        for (key, size) in to_remove {
330            entries.remove(&key);
331            self.current_memory.fetch_sub(size, Ordering::Relaxed);
332            self.stats.expirations.fetch_add(1, Ordering::Relaxed);
333
334            if self.config.enable_lru {
335                let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
336                lru.retain(|k| k != &key);
337            }
338        }
339    }
340
341    /// Ensure capacity for new entry by evicting if necessary
342    fn ensure_capacity(&self, new_entry_size: u64) {
343        // Check if we need to evict based on entry count
344        while self.len() >= self.config.max_entries {
345            self.evict_lru();
346        }
347
348        // Check if we need to evict based on memory
349        while self.memory_usage() + new_entry_size > self.config.max_memory_bytes {
350            self.evict_lru();
351        }
352    }
353
354    /// Evict the least recently used entry
355    fn evict_lru(&self) {
356        if !self.config.enable_lru {
357            // If LRU is disabled, evict a random entry
358            let key_to_evict = {
359                let entries = self.entries.read().expect("entries lock poisoned");
360                entries.keys().next().cloned()
361            };
362
363            if let Some(key) = key_to_evict {
364                let mut entries = self.entries.write().expect("entries lock poisoned");
365                if let Some(entry) = entries.remove(&key) {
366                    self.current_memory
367                        .fetch_sub(entry.size_bytes, Ordering::Relaxed);
368                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
369                }
370            }
371            return;
372        }
373
374        // Evict from LRU queue
375        let key_to_evict = {
376            let mut lru = self.lru_queue.write().expect("lru_queue lock poisoned");
377            lru.pop_front()
378        };
379
380        if let Some(key) = key_to_evict {
381            let mut entries = self.entries.write().expect("entries lock poisoned");
382            if let Some(entry) = entries.remove(&key) {
383                self.current_memory
384                    .fetch_sub(entry.size_bytes, Ordering::Relaxed);
385                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
386            }
387        }
388    }
389
390    /// Estimate size of a value in bytes
391    ///
392    /// This is a simple estimation. For more accurate sizing, consider
393    /// implementing a custom trait for your value types.
394    fn estimate_size(&self, _value: &V) -> u64 {
395        // Conservative estimate: 1KB per entry
396        // In production, you might want to implement actual size calculation
397        1024
398    }
399}
400
401impl<V: Clone> Default for QueryResultCache<V> {
402    fn default() -> Self {
403        Self::new(CacheConfig::default())
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn test_basic_cache_operations() {
413        let cache = QueryResultCache::<String>::new(CacheConfig::default());
414
415        // Put and get
416        cache.put("key1".to_string(), "value1".to_string());
417        assert_eq!(cache.get("key1"), Some("value1".to_string()));
418
419        // Miss
420        assert_eq!(cache.get("key2"), None);
421
422        // Stats
423        assert_eq!(cache.stats().hits.load(Ordering::Relaxed), 1);
424        assert_eq!(cache.stats().misses.load(Ordering::Relaxed), 1);
425    }
426
427    #[test]
428    fn test_ttl_expiration() {
429        let config = CacheConfig {
430            default_ttl: Duration::from_millis(100),
431            ..Default::default()
432        };
433        let cache = QueryResultCache::<String>::new(config);
434
435        cache.put("key1".to_string(), "value1".to_string());
436        assert_eq!(cache.get("key1"), Some("value1".to_string()));
437
438        // Wait for expiration
439        std::thread::sleep(Duration::from_millis(150));
440
441        assert_eq!(cache.get("key1"), None);
442        assert_eq!(cache.stats().expirations.load(Ordering::Relaxed), 1);
443    }
444
445    #[test]
446    fn test_lru_eviction() {
447        let config = CacheConfig {
448            max_entries: 3,
449            enable_lru: true,
450            ..Default::default()
451        };
452        let cache = QueryResultCache::<String>::new(config);
453
454        // Fill cache
455        cache.put("key1".to_string(), "value1".to_string());
456        cache.put("key2".to_string(), "value2".to_string());
457        cache.put("key3".to_string(), "value3".to_string());
458
459        // Access key1 to make it most recently used
460        cache.get("key1");
461
462        // Add key4, should evict key2 (least recently used)
463        cache.put("key4".to_string(), "value4".to_string());
464
465        assert_eq!(cache.get("key1"), Some("value1".to_string()));
466        assert_eq!(cache.get("key2"), None); // Evicted
467        assert_eq!(cache.get("key3"), Some("value3".to_string()));
468        assert_eq!(cache.get("key4"), Some("value4".to_string()));
469    }
470
471    #[test]
472    fn test_cache_invalidation() {
473        let cache = QueryResultCache::<String>::new(CacheConfig::default());
474
475        cache.put("key1".to_string(), "value1".to_string());
476        assert!(cache.invalidate("key1"));
477        assert_eq!(cache.get("key1"), None);
478        assert!(!cache.invalidate("key1")); // Already removed
479    }
480
481    #[test]
482    fn test_cache_clear() {
483        let cache = QueryResultCache::<String>::new(CacheConfig::default());
484
485        cache.put("key1".to_string(), "value1".to_string());
486        cache.put("key2".to_string(), "value2".to_string());
487
488        assert_eq!(cache.len(), 2);
489        cache.clear();
490        assert_eq!(cache.len(), 0);
491        assert_eq!(cache.memory_usage(), 0);
492    }
493
494    #[test]
495    fn test_hit_rate() {
496        let cache = QueryResultCache::<String>::new(CacheConfig::default());
497
498        cache.put("key1".to_string(), "value1".to_string());
499
500        cache.get("key1"); // Hit
501        cache.get("key2"); // Miss
502        cache.get("key1"); // Hit
503
504        assert_eq!(cache.stats().hit_rate(), 2.0 / 3.0);
505    }
506
507    #[test]
508    fn test_concurrent_access() {
509        use std::sync::Arc;
510        use std::thread;
511
512        let cache = Arc::new(QueryResultCache::<String>::new(CacheConfig::default()));
513        let mut handles = vec![];
514
515        // Spawn multiple threads doing puts and gets
516        for i in 0..10 {
517            let cache_clone = Arc::clone(&cache);
518            let handle = thread::spawn(move || {
519                for j in 0..100 {
520                    let key = format!("key_{}", i * 100 + j);
521                    let value = format!("value_{}", i * 100 + j);
522                    cache_clone.put(key.clone(), value.clone());
523                    cache_clone.get(&key);
524                }
525            });
526            handles.push(handle);
527        }
528
529        for handle in handles {
530            handle.join().expect("thread should not panic");
531        }
532
533        // Verify cache is in a consistent state
534        assert!(cache.len() <= 1000);
535    }
536
537    #[test]
538    fn test_memory_aware_eviction() {
539        let config = CacheConfig {
540            max_entries: 1000,
541            max_memory_bytes: 5120, // 5KB (5 entries * 1KB each)
542            enable_lru: true,
543            ..Default::default()
544        };
545        let max_memory = config.max_memory_bytes;
546        let cache = QueryResultCache::<String>::new(config);
547
548        // Add 10 entries, should trigger eviction
549        for i in 0..10 {
550            cache.put(format!("key{}", i), format!("value{}", i));
551        }
552
553        // Should have evicted some entries to stay under memory limit
554        assert!(cache.memory_usage() <= max_memory);
555        assert!(cache.stats().evictions.load(Ordering::Relaxed) > 0);
556    }
557}