1use crate::algebra::Algebra;
14use crate::cache::CacheCoordinator;
15use crate::optimizer::Statistics;
16use dashmap::DashMap;
17use serde::{Deserialize, Serialize};
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::time::Instant;
23
24pub struct QueryPlanCache {
26 cache: Arc<DashMap<QuerySignature, CachedPlan>>,
28 config: CachingConfig,
30 stats: Arc<CacheStatistics>,
32 access_counter: Arc<AtomicU64>,
34 invalidation_coordinator: Option<Arc<CacheCoordinator>>,
36 invalidated_entries: Arc<dashmap::DashSet<QuerySignature>>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachingConfig {
43 pub enabled: bool,
45 pub max_cache_size: usize,
47 pub ttl_seconds: u64,
49 pub parameterized_queries: bool,
51 pub invalidate_on_stats_change: bool,
53 pub stats_change_threshold: f64,
55}
56
57impl Default for CachingConfig {
58 fn default() -> Self {
59 Self {
60 enabled: true,
61 max_cache_size: 10000,
62 ttl_seconds: 3600, parameterized_queries: true,
64 invalidate_on_stats_change: true,
65 stats_change_threshold: 0.2, }
67 }
68}
69
70#[derive(Debug, Clone, Hash, PartialEq, Eq)]
72pub struct QuerySignature {
73 normalized_query: String,
75 parameter_types: Vec<String>,
77 stats_hash: u64,
79}
80
81impl QuerySignature {
82 pub fn new(query: &str, params: Vec<String>, stats: &Statistics) -> Self {
84 Self {
85 normalized_query: Self::normalize_query(query),
86 parameter_types: params,
87 stats_hash: Self::hash_statistics(stats),
88 }
89 }
90
91 fn normalize_query(query: &str) -> String {
93 let mut normalized = query.to_string();
95
96 let re_string = regex::Regex::new(r#""[^"]*""#).expect("regex pattern should be valid");
98 normalized = re_string.replace_all(&normalized, "\"?\"").to_string();
99
100 let re_number =
102 regex::Regex::new(r"\b\d+(\.\d+)?\b").expect("regex pattern should be valid");
103 normalized = re_number.replace_all(&normalized, "?").to_string();
104
105 let re_whitespace = regex::Regex::new(r"\s+").expect("regex pattern should be valid");
107 re_whitespace.replace_all(&normalized, " ").to_string()
108 }
109
110 fn hash_statistics(stats: &Statistics) -> u64 {
112 use std::collections::hash_map::DefaultHasher;
113
114 let mut hasher = DefaultHasher::new();
115
116 for (pattern, card) in &stats.cardinalities {
118 pattern.hash(&mut hasher);
119 card.hash(&mut hasher);
120 }
121
122 for (pred, freq) in &stats.predicate_frequency {
124 pred.hash(&mut hasher);
125 freq.hash(&mut hasher);
126 }
127
128 hasher.finish()
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct CachedPlan {
135 pub plan: Algebra,
137 pub cached_at: Instant,
139 pub hit_count: Arc<AtomicUsize>,
141 pub last_accessed: Arc<AtomicU64>,
143 pub estimated_cost: f64,
145 pub stats_snapshot: StatisticsSnapshot,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct StatisticsSnapshot {
152 pub cardinalities: BTreeMap<String, usize>,
154 pub predicate_frequency: BTreeMap<String, usize>,
156 pub snapshot_time: u64,
158}
159
160impl StatisticsSnapshot {
161 pub fn from_statistics(stats: &Statistics) -> Self {
163 Self {
164 cardinalities: stats
165 .cardinalities
166 .iter()
167 .map(|(k, v)| (k.clone(), *v))
168 .collect(),
169 predicate_frequency: stats
170 .predicate_frequency
171 .iter()
172 .map(|(k, v)| (k.clone(), *v))
173 .collect(),
174 snapshot_time: std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)
176 .expect("SystemTime should be after UNIX_EPOCH")
177 .as_secs(),
178 }
179 }
180
181 pub fn has_changed_significantly(&self, current_stats: &Statistics, threshold: f64) -> bool {
183 for (pattern, old_card) in &self.cardinalities {
185 let current_card = current_stats
186 .cardinalities
187 .get(pattern)
188 .copied()
189 .unwrap_or(0);
190
191 if *old_card == 0 && current_card > 0 {
192 return true; }
194
195 if *old_card > 0 {
196 let change_ratio =
197 (current_card as f64 - *old_card as f64).abs() / *old_card as f64;
198 if change_ratio > threshold {
199 return true;
200 }
201 }
202 }
203
204 for (pred, old_freq) in &self.predicate_frequency {
206 let current_freq = current_stats
207 .predicate_frequency
208 .get(pred)
209 .copied()
210 .unwrap_or(0);
211
212 if *old_freq > 0 {
213 let change_ratio =
214 (current_freq as f64 - *old_freq as f64).abs() / *old_freq as f64;
215 if change_ratio > threshold {
216 return true;
217 }
218 }
219 }
220
221 false
222 }
223}
224
225#[derive(Debug, Default)]
227pub struct CacheStatistics {
228 pub hits: AtomicU64,
230 pub misses: AtomicU64,
232 pub evictions: AtomicU64,
234 pub invalidations: AtomicU64,
236 pub size_bytes: AtomicU64,
238}
239
240impl CacheStatistics {
241 pub fn hit_rate(&self) -> f64 {
243 let hits = self.hits.load(Ordering::Relaxed);
244 let misses = self.misses.load(Ordering::Relaxed);
245 let total = hits + misses;
246
247 if total == 0 {
248 0.0
249 } else {
250 hits as f64 / total as f64
251 }
252 }
253
254 pub fn total_requests(&self) -> u64 {
256 self.hits.load(Ordering::Relaxed) + self.misses.load(Ordering::Relaxed)
257 }
258}
259
260impl QueryPlanCache {
261 pub fn new() -> Self {
263 Self::with_config(CachingConfig::default())
264 }
265
266 pub fn with_config(config: CachingConfig) -> Self {
268 Self {
269 cache: Arc::new(DashMap::new()),
270 config,
271 stats: Arc::new(CacheStatistics::default()),
272 access_counter: Arc::new(AtomicU64::new(0)),
273 invalidation_coordinator: None,
274 invalidated_entries: Arc::new(dashmap::DashSet::new()),
275 }
276 }
277
278 pub fn with_invalidation_coordinator(
280 config: CachingConfig,
281 coordinator: Arc<CacheCoordinator>,
282 ) -> Self {
283 Self {
284 cache: Arc::new(DashMap::new()),
285 config,
286 stats: Arc::new(CacheStatistics::default()),
287 access_counter: Arc::new(AtomicU64::new(0)),
288 invalidation_coordinator: Some(coordinator),
289 invalidated_entries: Arc::new(dashmap::DashSet::new()),
290 }
291 }
292
293 pub fn attach_coordinator(&mut self, coordinator: Arc<CacheCoordinator>) {
295 self.invalidation_coordinator = Some(coordinator);
296 }
297
298 pub fn get(
300 &self,
301 query: &str,
302 params: Vec<String>,
303 current_stats: &Statistics,
304 ) -> Option<Algebra> {
305 if !self.config.enabled {
306 return None;
307 }
308
309 let signature = QuerySignature::new(query, params, current_stats);
310
311 if self.invalidated_entries.contains(&signature) {
313 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
314 self.stats.misses.fetch_add(1, Ordering::Relaxed);
315 return None;
316 }
317
318 if let Some(entry) = self.cache.get_mut(&signature) {
319 let elapsed = entry.cached_at.elapsed();
321 if elapsed.as_secs() > self.config.ttl_seconds {
322 drop(entry); self.cache.remove(&signature);
324 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
325 self.stats.misses.fetch_add(1, Ordering::Relaxed);
326 return None;
327 }
328
329 if self.config.invalidate_on_stats_change
331 && entry
332 .stats_snapshot
333 .has_changed_significantly(current_stats, self.config.stats_change_threshold)
334 {
335 drop(entry); self.cache.remove(&signature);
337 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
338 self.stats.misses.fetch_add(1, Ordering::Relaxed);
339 return None;
340 }
341
342 entry.hit_count.fetch_add(1, Ordering::Relaxed);
344 let access_time = self.access_counter.fetch_add(1, Ordering::Relaxed);
345 entry.last_accessed.store(access_time, Ordering::Relaxed);
346
347 self.stats.hits.fetch_add(1, Ordering::Relaxed);
348 return Some(entry.plan.clone());
349 }
350
351 self.stats.misses.fetch_add(1, Ordering::Relaxed);
352 None
353 }
354
355 pub fn insert(
357 &self,
358 query: &str,
359 params: Vec<String>,
360 plan: Algebra,
361 estimated_cost: f64,
362 current_stats: &Statistics,
363 ) {
364 if !self.config.enabled {
365 return;
366 }
367
368 if self.cache.len() >= self.config.max_cache_size {
370 self.evict_lru();
371 }
372
373 let signature = QuerySignature::new(query, params, current_stats);
374
375 let cached_plan = CachedPlan {
376 plan,
377 cached_at: Instant::now(),
378 hit_count: Arc::new(AtomicUsize::new(0)),
379 last_accessed: Arc::new(AtomicU64::new(self.access_counter.load(Ordering::Relaxed))),
380 estimated_cost,
381 stats_snapshot: StatisticsSnapshot::from_statistics(current_stats),
382 };
383
384 self.cache.insert(signature, cached_plan);
385 }
386
387 fn evict_lru(&self) {
389 let mut oldest_key = None;
391 let mut oldest_access = u64::MAX;
392
393 for entry in self.cache.iter() {
394 let access_time = entry.last_accessed.load(Ordering::Relaxed);
395 if access_time < oldest_access {
396 oldest_access = access_time;
397 oldest_key = Some(entry.key().clone());
398 }
399 }
400
401 if let Some(key) = oldest_key {
402 self.cache.remove(&key);
403 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
404 }
405 }
406
407 pub fn clear(&self) {
409 let count = self.cache.len();
410 self.cache.clear();
411 self.stats
412 .invalidations
413 .fetch_add(count as u64, Ordering::Relaxed);
414 }
415
416 pub fn invalidate_pattern(&self, pattern: &str) {
418 let keys_to_remove: Vec<_> = self
419 .cache
420 .iter()
421 .filter(|entry| entry.stats_snapshot.cardinalities.contains_key(pattern))
422 .map(|entry| entry.key().clone())
423 .collect();
424
425 for key in keys_to_remove {
426 self.invalidated_entries.insert(key.clone());
427 self.cache.remove(&key);
428 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
429 }
430 }
431
432 pub fn mark_invalidated(&self, signature: QuerySignature) {
434 self.invalidated_entries.insert(signature);
435 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
436 }
437
438 pub fn invalidate_signature(&self, signature: &QuerySignature) {
440 self.invalidated_entries.insert(signature.clone());
441 self.cache.remove(signature);
442 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
443 }
444
445 pub fn statistics(&self) -> CacheStats {
447 CacheStats {
448 hits: self.stats.hits.load(Ordering::Relaxed),
449 misses: self.stats.misses.load(Ordering::Relaxed),
450 evictions: self.stats.evictions.load(Ordering::Relaxed),
451 invalidations: self.stats.invalidations.load(Ordering::Relaxed),
452 size: self.cache.len(),
453 capacity: self.config.max_cache_size,
454 hit_rate: self.stats.hit_rate(),
455 }
456 }
457
458 pub fn config(&self) -> &CachingConfig {
460 &self.config
461 }
462
463 pub fn update_config(&mut self, config: CachingConfig) {
465 self.config = config;
466 }
467}
468
469impl Default for QueryPlanCache {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475#[derive(Debug, Clone, Serialize, Deserialize)]
477pub struct CacheStats {
478 pub hits: u64,
480 pub misses: u64,
482 pub evictions: u64,
484 pub invalidations: u64,
486 pub size: usize,
488 pub capacity: usize,
490 pub hit_rate: f64,
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use std::time::Duration;
498
499 #[test]
500 fn test_query_plan_cache_basic() {
501 let cache = QueryPlanCache::new();
502 let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10";
503 let stats = Statistics::new();
504
505 assert!(cache.get(query, vec![], &stats).is_none());
507
508 let plan = Algebra::Bgp(vec![]);
510 cache.insert(query, vec![], plan.clone(), 100.0, &stats);
511
512 let cached = cache.get(query, vec![], &stats);
514 assert!(cached.is_some());
515
516 let stats = cache.statistics();
518 assert_eq!(stats.hits, 1);
519 assert_eq!(stats.misses, 1);
520 }
521
522 #[test]
523 fn test_cache_normalization() {
524 let stats = Statistics::new();
525
526 let query1 = "SELECT ?s WHERE { ?s <http://example.org/p> \"Alice\" }";
527 let query2 = "SELECT ?s WHERE { ?s <http://example.org/p> \"Bob\" }";
528
529 let sig1 = QuerySignature::new(query1, vec![], &stats);
531 let sig2 = QuerySignature::new(query2, vec![], &stats);
532
533 assert_eq!(sig1.normalized_query, sig2.normalized_query);
535 }
536
537 #[test]
538 #[ignore = "inherently slow: requires wall-clock TTL expiry (use nextest --ignored to run)"]
539 fn test_cache_ttl() {
540 let config = CachingConfig {
541 ttl_seconds: 1, ..Default::default()
543 };
544 let cache = QueryPlanCache::with_config(config);
545 let query = "SELECT ?s WHERE { ?s ?p ?o }";
546 let stats = Statistics::new();
547
548 cache.insert(query, vec![], Algebra::Bgp(vec![]), 100.0, &stats);
550
551 assert!(cache.get(query, vec![], &stats).is_some());
553
554 std::thread::sleep(Duration::from_secs(2));
556
557 assert!(cache.get(query, vec![], &stats).is_none());
559 }
560
561 #[test]
562 fn test_cache_eviction() {
563 let config = CachingConfig {
564 max_cache_size: 2,
565 ..Default::default()
566 };
567 let cache = QueryPlanCache::with_config(config);
568 let stats = Statistics::new();
569
570 cache.insert("query1", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
572 cache.insert("query2", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
573 cache.insert("query3", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
574
575 assert_eq!(cache.cache.len(), 2);
577
578 let stats = cache.statistics();
579 assert_eq!(stats.evictions, 1);
580 }
581
582 #[test]
583 fn test_cache_clear() {
584 let cache = QueryPlanCache::new();
585 let stats = Statistics::new();
586
587 for i in 0..10 {
589 let query = format!("SELECT ?s ?var{} WHERE {{ ?s ?p{} ?o{} }}", i, i, i);
590 cache.insert(&query, vec![], Algebra::Bgp(vec![]), 100.0, &stats);
591 }
592
593 let initial_len = cache.cache.len();
594 assert!(initial_len > 0, "Cache should have entries");
595
596 cache.clear();
598 assert_eq!(cache.cache.len(), 0);
599
600 let cache_stats = cache.statistics();
601 assert_eq!(cache_stats.invalidations, initial_len as u64);
602 }
603
604 #[test]
605 fn test_statistics_snapshot() {
606 let stats = Statistics::new();
607 let snapshot = StatisticsSnapshot::from_statistics(&stats);
608
609 assert!(!snapshot.has_changed_significantly(&stats, 0.2));
611 }
612
613 #[test]
614 fn test_cache_disabled() {
615 let config = CachingConfig {
616 enabled: false,
617 ..Default::default()
618 };
619 let cache = QueryPlanCache::with_config(config);
620 let stats = Statistics::new();
621
622 cache.insert("query", vec![], Algebra::Bgp(vec![]), 100.0, &stats);
624
625 assert!(cache.get("query", vec![], &stats).is_none());
627 }
628
629 #[test]
630 fn test_hit_rate_calculation() {
631 let cache = QueryPlanCache::new();
632 let stats = Statistics::new();
633
634 assert_eq!(cache.statistics().hit_rate, 0.0);
636
637 cache.insert(
639 "SELECT ?s WHERE { ?s ?p ?o }",
640 vec![],
641 Algebra::Bgp(vec![]),
642 100.0,
643 &stats,
644 );
645 cache.get("SELECT ?s WHERE { ?s ?p ?o }", vec![], &stats); cache.get("SELECT ?x WHERE { ?x ?y ?z }", vec![], &stats); let cache_stats = cache.statistics();
649 assert_eq!(cache_stats.hits, 1);
650 assert_eq!(cache_stats.misses, 1); assert!((cache_stats.hit_rate - 0.5).abs() < 0.01); }
653}