Skip to main content

oxirs_arq/
query_plan_cache.rs

1//! Query Plan Caching System
2//!
3//! This module provides intelligent caching of optimized SPARQL query plans
4//! to avoid redundant optimization work for frequently executed queries.
5//!
6//! ## Features
7//! - LRU-based cache eviction
8//! - Cache invalidation based on statistics changes
9//! - Parameterized query support
10//! - Cache hit/miss tracking
11//! - TTL-based expiration
12
13use crate::algebra::Algebra;
14use crate::cache::CacheCoordinator;
15use crate::optimizer::Statistics;
16use dashmap::DashMap;
17use serde::{Deserialize, Serialize};
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::time::Instant;
23
24/// Query plan cache for avoiding redundant optimization
25pub struct QueryPlanCache {
26    /// Cache storage (query signature -> cached plan)
27    cache: Arc<DashMap<QuerySignature, CachedPlan>>,
28    /// Configuration
29    config: CachingConfig,
30    /// Cache statistics
31    stats: Arc<CacheStatistics>,
32    /// LRU tracking (access order)
33    access_counter: Arc<AtomicU64>,
34    /// Invalidation coordinator (optional for backward compatibility)
35    invalidation_coordinator: Option<Arc<CacheCoordinator>>,
36    /// Invalidation flags (tracks which entries have been invalidated)
37    invalidated_entries: Arc<dashmap::DashSet<QuerySignature>>,
38}
39
40/// Configuration for query plan caching
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachingConfig {
43    /// Enable caching
44    pub enabled: bool,
45    /// Maximum number of cached plans
46    pub max_cache_size: usize,
47    /// Time-to-live for cached plans (in seconds)
48    pub ttl_seconds: u64,
49    /// Enable parameterized query support
50    pub parameterized_queries: bool,
51    /// Invalidate cache when statistics change significantly
52    pub invalidate_on_stats_change: bool,
53    /// Threshold for statistics change (0.0 to 1.0)
54    pub stats_change_threshold: f64,
55}
56
57impl Default for CachingConfig {
58    fn default() -> Self {
59        Self {
60            enabled: true,
61            max_cache_size: 10000,
62            ttl_seconds: 3600, // 1 hour
63            parameterized_queries: true,
64            invalidate_on_stats_change: true,
65            stats_change_threshold: 0.2, // 20% change
66        }
67    }
68}
69
70/// Signature for uniquely identifying a query
71#[derive(Debug, Clone, Hash, PartialEq, Eq)]
72pub struct QuerySignature {
73    /// Normalized query string (with parameters replaced)
74    normalized_query: String,
75    /// Parameter types for parameterized queries
76    parameter_types: Vec<String>,
77    /// Statistics hash (to detect when stats have changed)
78    stats_hash: u64,
79}
80
81impl QuerySignature {
82    /// Create a new query signature
83    pub fn new(query: &str, params: Vec<String>, stats: &Statistics) -> Self {
84        Self {
85            normalized_query: Self::normalize_query(query),
86            parameter_types: params,
87            stats_hash: Self::hash_statistics(stats),
88        }
89    }
90
91    /// Normalize query by replacing literals with placeholders
92    fn normalize_query(query: &str) -> String {
93        // Simplified normalization - replace numeric literals and strings with placeholders
94        let mut normalized = query.to_string();
95
96        // Replace string literals: "..." -> "?"
97        let re_string = regex::Regex::new(r#""[^"]*""#).expect("regex pattern should be valid");
98        normalized = re_string.replace_all(&normalized, "\"?\"").to_string();
99
100        // Replace numeric literals: 123 -> ?
101        let re_number =
102            regex::Regex::new(r"\b\d+(\.\d+)?\b").expect("regex pattern should be valid");
103        normalized = re_number.replace_all(&normalized, "?").to_string();
104
105        // Collapse whitespace
106        let re_whitespace = regex::Regex::new(r"\s+").expect("regex pattern should be valid");
107        re_whitespace.replace_all(&normalized, " ").to_string()
108    }
109
110    /// Hash statistics to detect changes
111    fn hash_statistics(stats: &Statistics) -> u64 {
112        use std::collections::hash_map::DefaultHasher;
113
114        let mut hasher = DefaultHasher::new();
115
116        // Hash cardinalities
117        for (pattern, card) in &stats.cardinalities {
118            pattern.hash(&mut hasher);
119            card.hash(&mut hasher);
120        }
121
122        // Hash predicate frequencies
123        for (pred, freq) in &stats.predicate_frequency {
124            pred.hash(&mut hasher);
125            freq.hash(&mut hasher);
126        }
127
128        hasher.finish()
129    }
130}
131
132/// Cached query plan with metadata
133#[derive(Debug, Clone)]
134pub struct CachedPlan {
135    /// Optimized query plan
136    pub plan: Algebra,
137    /// When the plan was cached
138    pub cached_at: Instant,
139    /// How many times this plan was used
140    pub hit_count: Arc<AtomicUsize>,
141    /// Last access timestamp (for LRU)
142    pub last_accessed: Arc<AtomicU64>,
143    /// Estimated cost of the plan
144    pub estimated_cost: f64,
145    /// Statistics snapshot at cache time
146    pub stats_snapshot: StatisticsSnapshot,
147}
148
149/// Snapshot of statistics for cache invalidation
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct StatisticsSnapshot {
152    /// Cardinalities at cache time
153    pub cardinalities: BTreeMap<String, usize>,
154    /// Predicate frequencies at cache time
155    pub predicate_frequency: BTreeMap<String, usize>,
156    /// Timestamp when snapshot was taken
157    pub snapshot_time: u64,
158}
159
160impl StatisticsSnapshot {
161    /// Create from Statistics
162    pub fn from_statistics(stats: &Statistics) -> Self {
163        Self {
164            cardinalities: stats
165                .cardinalities
166                .iter()
167                .map(|(k, v)| (k.clone(), *v))
168                .collect(),
169            predicate_frequency: stats
170                .predicate_frequency
171                .iter()
172                .map(|(k, v)| (k.clone(), *v))
173                .collect(),
174            snapshot_time: std::time::SystemTime::now()
175                .duration_since(std::time::UNIX_EPOCH)
176                .expect("SystemTime should be after UNIX_EPOCH")
177                .as_secs(),
178        }
179    }
180
181    /// Check if statistics have changed significantly
182    pub fn has_changed_significantly(&self, current_stats: &Statistics, threshold: f64) -> bool {
183        // Check cardinality changes
184        for (pattern, old_card) in &self.cardinalities {
185            let current_card = current_stats
186                .cardinalities
187                .get(pattern)
188                .copied()
189                .unwrap_or(0);
190
191            if *old_card == 0 && current_card > 0 {
192                return true; // New pattern appeared
193            }
194
195            if *old_card > 0 {
196                let change_ratio =
197                    (current_card as f64 - *old_card as f64).abs() / *old_card as f64;
198                if change_ratio > threshold {
199                    return true;
200                }
201            }
202        }
203
204        // Check predicate frequency changes
205        for (pred, old_freq) in &self.predicate_frequency {
206            let current_freq = current_stats
207                .predicate_frequency
208                .get(pred)
209                .copied()
210                .unwrap_or(0);
211
212            if *old_freq > 0 {
213                let change_ratio =
214                    (current_freq as f64 - *old_freq as f64).abs() / *old_freq as f64;
215                if change_ratio > threshold {
216                    return true;
217                }
218            }
219        }
220
221        false
222    }
223}
224
225/// Cache hit/miss statistics
226#[derive(Debug, Default)]
227pub struct CacheStatistics {
228    /// Total cache hits
229    pub hits: AtomicU64,
230    /// Total cache misses
231    pub misses: AtomicU64,
232    /// Total evictions
233    pub evictions: AtomicU64,
234    /// Total invalidations
235    pub invalidations: AtomicU64,
236    /// Total size in bytes (approximate)
237    pub size_bytes: AtomicU64,
238}
239
240impl CacheStatistics {
241    /// Get hit rate
242    pub fn hit_rate(&self) -> f64 {
243        let hits = self.hits.load(Ordering::Relaxed);
244        let misses = self.misses.load(Ordering::Relaxed);
245        let total = hits + misses;
246
247        if total == 0 {
248            0.0
249        } else {
250            hits as f64 / total as f64
251        }
252    }
253
254    /// Get total requests
255    pub fn total_requests(&self) -> u64 {
256        self.hits.load(Ordering::Relaxed) + self.misses.load(Ordering::Relaxed)
257    }
258}
259
260impl QueryPlanCache {
261    /// Create a new query plan cache
262    pub fn new() -> Self {
263        Self::with_config(CachingConfig::default())
264    }
265
266    /// Create with custom configuration
267    pub fn with_config(config: CachingConfig) -> Self {
268        Self {
269            cache: Arc::new(DashMap::new()),
270            config,
271            stats: Arc::new(CacheStatistics::default()),
272            access_counter: Arc::new(AtomicU64::new(0)),
273            invalidation_coordinator: None,
274            invalidated_entries: Arc::new(dashmap::DashSet::new()),
275        }
276    }
277
278    /// Create with invalidation coordinator
279    pub fn with_invalidation_coordinator(
280        config: CachingConfig,
281        coordinator: Arc<CacheCoordinator>,
282    ) -> Self {
283        Self {
284            cache: Arc::new(DashMap::new()),
285            config,
286            stats: Arc::new(CacheStatistics::default()),
287            access_counter: Arc::new(AtomicU64::new(0)),
288            invalidation_coordinator: Some(coordinator),
289            invalidated_entries: Arc::new(dashmap::DashSet::new()),
290        }
291    }
292
293    /// Attach invalidation coordinator
294    pub fn attach_coordinator(&mut self, coordinator: Arc<CacheCoordinator>) {
295        self.invalidation_coordinator = Some(coordinator);
296    }
297
298    /// Get a cached plan if available
299    pub fn get(
300        &self,
301        query: &str,
302        params: Vec<String>,
303        current_stats: &Statistics,
304    ) -> Option<Algebra> {
305        if !self.config.enabled {
306            return None;
307        }
308
309        let signature = QuerySignature::new(query, params, current_stats);
310
311        // Check if entry has been invalidated
312        if self.invalidated_entries.contains(&signature) {
313            self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
314            self.stats.misses.fetch_add(1, Ordering::Relaxed);
315            return None;
316        }
317
318        if let Some(entry) = self.cache.get_mut(&signature) {
319            // Check TTL
320            let elapsed = entry.cached_at.elapsed();
321            if elapsed.as_secs() > self.config.ttl_seconds {
322                drop(entry); // Release lock before removing
323                self.cache.remove(&signature);
324                self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
325                self.stats.misses.fetch_add(1, Ordering::Relaxed);
326                return None;
327            }
328
329            // Check if statistics have changed significantly
330            if self.config.invalidate_on_stats_change
331                && entry
332                    .stats_snapshot
333                    .has_changed_significantly(current_stats, self.config.stats_change_threshold)
334            {
335                drop(entry); // Release lock before removing
336                self.cache.remove(&signature);
337                self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
338                self.stats.misses.fetch_add(1, Ordering::Relaxed);
339                return None;
340            }
341
342            // Update access tracking
343            entry.hit_count.fetch_add(1, Ordering::Relaxed);
344            let access_time = self.access_counter.fetch_add(1, Ordering::Relaxed);
345            entry.last_accessed.store(access_time, Ordering::Relaxed);
346
347            self.stats.hits.fetch_add(1, Ordering::Relaxed);
348            return Some(entry.plan.clone());
349        }
350
351        self.stats.misses.fetch_add(1, Ordering::Relaxed);
352        None
353    }
354
355    /// Cache a query plan
356    pub fn insert(
357        &self,
358        query: &str,
359        params: Vec<String>,
360        plan: Algebra,
361        estimated_cost: f64,
362        current_stats: &Statistics,
363    ) {
364        if !self.config.enabled {
365            return;
366        }
367
368        // Evict entries if cache is full
369        if self.cache.len() >= self.config.max_cache_size {
370            self.evict_lru();
371        }
372
373        let signature = QuerySignature::new(query, params, current_stats);
374
375        let cached_plan = CachedPlan {
376            plan,
377            cached_at: Instant::now(),
378            hit_count: Arc::new(AtomicUsize::new(0)),
379            last_accessed: Arc::new(AtomicU64::new(self.access_counter.load(Ordering::Relaxed))),
380            estimated_cost,
381            stats_snapshot: StatisticsSnapshot::from_statistics(current_stats),
382        };
383
384        self.cache.insert(signature, cached_plan);
385    }
386
387    /// Evict least recently used entry
388    fn evict_lru(&self) {
389        // Find entry with oldest access time
390        let mut oldest_key = None;
391        let mut oldest_access = u64::MAX;
392
393        for entry in self.cache.iter() {
394            let access_time = entry.last_accessed.load(Ordering::Relaxed);
395            if access_time < oldest_access {
396                oldest_access = access_time;
397                oldest_key = Some(entry.key().clone());
398            }
399        }
400
401        if let Some(key) = oldest_key {
402            self.cache.remove(&key);
403            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
404        }
405    }
406
407    /// Clear the entire cache
408    pub fn clear(&self) {
409        let count = self.cache.len();
410        self.cache.clear();
411        self.stats
412            .invalidations
413            .fetch_add(count as u64, Ordering::Relaxed);
414    }
415
416    /// Invalidate entries that reference a specific pattern
417    pub fn invalidate_pattern(&self, pattern: &str) {
418        let keys_to_remove: Vec<_> = self
419            .cache
420            .iter()
421            .filter(|entry| entry.stats_snapshot.cardinalities.contains_key(pattern))
422            .map(|entry| entry.key().clone())
423            .collect();
424
425        for key in keys_to_remove {
426            self.invalidated_entries.insert(key.clone());
427            self.cache.remove(&key);
428            self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
429        }
430    }
431
432    /// Mark entry as invalidated without removing (for batched invalidation)
433    pub fn mark_invalidated(&self, signature: QuerySignature) {
434        self.invalidated_entries.insert(signature);
435        self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
436    }
437
438    /// Invalidate by signature (for coordinator integration)
439    pub fn invalidate_signature(&self, signature: &QuerySignature) {
440        self.invalidated_entries.insert(signature.clone());
441        self.cache.remove(signature);
442        self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
443    }
444
445    /// Get cache statistics
446    pub fn statistics(&self) -> CacheStats {
447        CacheStats {
448            hits: self.stats.hits.load(Ordering::Relaxed),
449            misses: self.stats.misses.load(Ordering::Relaxed),
450            evictions: self.stats.evictions.load(Ordering::Relaxed),
451            invalidations: self.stats.invalidations.load(Ordering::Relaxed),
452            size: self.cache.len(),
453            capacity: self.config.max_cache_size,
454            hit_rate: self.stats.hit_rate(),
455        }
456    }
457
458    /// Get configuration
459    pub fn config(&self) -> &CachingConfig {
460        &self.config
461    }
462
463    /// Update configuration
464    pub fn update_config(&mut self, config: CachingConfig) {
465        self.config = config;
466    }
467}
468
469impl Default for QueryPlanCache {
470    fn default() -> Self {
471        Self::new()
472    }
473}
474
475/// Cache statistics summary
476#[derive(Debug, Clone, Serialize, Deserialize)]
477pub struct CacheStats {
478    /// Total cache hits
479    pub hits: u64,
480    /// Total cache misses
481    pub misses: u64,
482    /// Total evictions
483    pub evictions: u64,
484    /// Total invalidations
485    pub invalidations: u64,
486    /// Current cache size
487    pub size: usize,
488    /// Maximum capacity
489    pub capacity: usize,
490    /// Hit rate (0.0 to 1.0)
491    pub hit_rate: f64,
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use std::time::Duration;
498
499    #[test]
500    fn test_query_plan_cache_basic() {
501        let cache = QueryPlanCache::new();
502        let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10";
503        let stats = Statistics::new();
504
505        // Cache miss on first access
506        assert!(cache.get(query, vec![], &stats).is_none());
507
508        // Insert a plan
509        let plan = Algebra::Bgp(vec![]);
510        cache.insert(query, vec![], plan.clone(), 100.0, &stats);
511
512        // Cache hit on second access
513        let cached = cache.get(query, vec![], &stats);
514        assert!(cached.is_some());
515
516        // Verify statistics
517        let stats = cache.statistics();
518        assert_eq!(stats.hits, 1);
519        assert_eq!(stats.misses, 1);
520    }
521
522    #[test]
523    fn test_cache_normalization() {
524        let stats = Statistics::new();
525
526        let query1 = "SELECT ?s WHERE { ?s <http://example.org/p> \"Alice\" }";
527        let query2 = "SELECT ?s WHERE { ?s <http://example.org/p> \"Bob\" }";
528
529        // Both queries should normalize to the same signature
530        let sig1 = QuerySignature::new(query1, vec![], &stats);
531        let sig2 = QuerySignature::new(query2, vec![], &stats);
532
533        // The normalized versions should be the same
534        assert_eq!(sig1.normalized_query, sig2.normalized_query);
535    }
536
537    #[test]
538    #[ignore = "inherently slow: requires wall-clock TTL expiry (use nextest --ignored to run)"]
539    fn test_cache_ttl() {
540        let config = CachingConfig {
541            ttl_seconds: 1, // 1 second TTL
542            ..Default::default()
543        };
544        let cache = QueryPlanCache::with_config(config);
545        let query = "SELECT ?s WHERE { ?s ?p ?o }";
546        let stats = Statistics::new();
547
548        // Insert plan
549        cache.insert(query, vec![], Algebra::Bgp(vec![]), 100.0, &stats);
550
551        // Should be cached
552        assert!(cache.get(query, vec![], &stats).is_some());
553
554        // Wait for TTL to expire
555        std::thread::sleep(Duration::from_secs(2));
556
557        // Should be invalidated
558        assert!(cache.get(query, vec![], &stats).is_none());
559    }
560
561    #[test]
562    fn test_cache_eviction() {
563        let config = CachingConfig {
564            max_cache_size: 2,
565            ..Default::default()
566        };
567        let cache = QueryPlanCache::with_config(config);
568        let stats = Statistics::new();
569
570        // Insert 3 plans (should evict oldest)
571        cache.insert("query1", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
572        cache.insert("query2", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
573        cache.insert("query3", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
574
575        // Should have evicted query1
576        assert_eq!(cache.cache.len(), 2);
577
578        let stats = cache.statistics();
579        assert_eq!(stats.evictions, 1);
580    }
581
582    #[test]
583    fn test_cache_clear() {
584        let cache = QueryPlanCache::new();
585        let stats = Statistics::new();
586
587        // Insert multiple plans with different queries (not just different numbers)
588        for i in 0..10 {
589            let query = format!("SELECT ?s ?var{} WHERE {{ ?s ?p{} ?o{} }}", i, i, i);
590            cache.insert(&query, vec![], Algebra::Bgp(vec![]), 100.0, &stats);
591        }
592
593        let initial_len = cache.cache.len();
594        assert!(initial_len > 0, "Cache should have entries");
595
596        // Clear cache
597        cache.clear();
598        assert_eq!(cache.cache.len(), 0);
599
600        let cache_stats = cache.statistics();
601        assert_eq!(cache_stats.invalidations, initial_len as u64);
602    }
603
604    #[test]
605    fn test_statistics_snapshot() {
606        let stats = Statistics::new();
607        let snapshot = StatisticsSnapshot::from_statistics(&stats);
608
609        // Snapshot should not detect change when stats are the same
610        assert!(!snapshot.has_changed_significantly(&stats, 0.2));
611    }
612
613    #[test]
614    fn test_cache_disabled() {
615        let config = CachingConfig {
616            enabled: false,
617            ..Default::default()
618        };
619        let cache = QueryPlanCache::with_config(config);
620        let stats = Statistics::new();
621
622        // Insert should do nothing when disabled
623        cache.insert("query", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
624
625        // Get should always return None when disabled
626        assert!(cache.get("query", vec![], &stats).is_none());
627    }
628
629    #[test]
630    fn test_hit_rate_calculation() {
631        let cache = QueryPlanCache::new();
632        let stats = Statistics::new();
633
634        // Start with no requests
635        assert_eq!(cache.statistics().hit_rate, 0.0);
636
637        // Insert and access
638        cache.insert(
639            "SELECT ?s WHERE { ?s ?p ?o }",
640            vec![],
641            Algebra::Bgp(vec![]),
642            100.0,
643            &stats,
644        );
645        cache.get("SELECT ?s WHERE { ?s ?p ?o }", vec![], &stats); // Hit
646        cache.get("SELECT ?x WHERE { ?x ?y ?z }", vec![], &stats); // Miss
647
648        let cache_stats = cache.statistics();
649        assert_eq!(cache_stats.hits, 1);
650        assert_eq!(cache_stats.misses, 1); // One from q2
651        assert!((cache_stats.hit_rate - 0.5).abs() < 0.01); // 1 hit out of 2 requests = 0.5
652    }
653}