Skip to main content

grafeo_engine/query/
cache.rs

1//! Query cache for parsed and planned queries.
2//!
3//! This module provides an LRU cache for query plans to avoid repeated
4//! parsing and optimization of frequently executed queries.
5//!
6//! ## Cache Levels
7//!
8//! - **Parsed cache**: Caches logical plans after translation (language-specific parsing)
9//! - **Optimized cache**: Caches logical plans after optimization
10//!
11//! ## Usage
12//!
13//! ```ignore
14//! let cache = QueryCache::new(1000);
15//!
16//! // Check cache first
17//! if let Some(plan) = cache.get_optimized(&cache_key) {
18//!     return execute(plan);
19//! }
20//!
21//! // Parse and optimize
22//! let plan = parse_and_optimize(query)?;
23//! cache.put_optimized(cache_key, plan.clone());
24//! execute(plan)
25//! ```
26
27use parking_lot::Mutex;
28use std::collections::HashMap;
29use std::hash::Hash;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Instant;
32
33use crate::query::plan::LogicalPlan;
34use crate::query::processor::QueryLanguage;
35
36/// Cache key combining query text and language.
37#[derive(Clone, Eq, PartialEq, Hash)]
38pub struct CacheKey {
39    /// The query string (normalized).
40    query: String,
41    /// The query language.
42    language: QueryLanguage,
43}
44
45impl CacheKey {
46    /// Creates a new cache key.
47    #[must_use]
48    pub fn new(query: impl Into<String>, language: QueryLanguage) -> Self {
49        Self {
50            query: normalize_query(&query.into()),
51            language,
52        }
53    }
54
55    /// Returns the query string.
56    #[must_use]
57    pub fn query(&self) -> &str {
58        &self.query
59    }
60
61    /// Returns the query language.
62    #[must_use]
63    pub fn language(&self) -> QueryLanguage {
64        self.language
65    }
66}
67
68/// Normalizes a query string for caching.
69///
70/// Removes extra whitespace and normalizes case for keywords.
71fn normalize_query(query: &str) -> String {
72    // Simple normalization: collapse whitespace
73    query.split_whitespace().collect::<Vec<_>>().join(" ")
74}
75
76/// Entry in the cache with metadata.
77struct CacheEntry<T> {
78    /// The cached value.
79    value: T,
80    /// Number of times this entry was accessed.
81    access_count: u64,
82    /// Last access time.
83    last_accessed: Instant,
84}
85
86impl<T: Clone> CacheEntry<T> {
87    fn new(value: T) -> Self {
88        Self {
89            value,
90            access_count: 0,
91            last_accessed: Instant::now(),
92        }
93    }
94
95    fn access(&mut self) -> T {
96        self.access_count += 1;
97        self.last_accessed = Instant::now();
98        self.value.clone()
99    }
100}
101
102/// LRU cache implementation.
103struct LruCache<K, V> {
104    /// The cache storage.
105    entries: HashMap<K, CacheEntry<V>>,
106    /// Maximum number of entries.
107    capacity: usize,
108    /// Order of access (for LRU eviction).
109    access_order: Vec<K>,
110}
111
112impl<K: Clone + Eq + Hash, V: Clone> LruCache<K, V> {
113    fn new(capacity: usize) -> Self {
114        Self {
115            entries: HashMap::with_capacity(capacity),
116            capacity,
117            access_order: Vec::with_capacity(capacity),
118        }
119    }
120
121    fn get(&mut self, key: &K) -> Option<V> {
122        if let Some(entry) = self.entries.get_mut(key) {
123            // Move to end of access order (most recently used)
124            if let Some(pos) = self.access_order.iter().position(|k| k == key) {
125                self.access_order.remove(pos);
126            }
127            self.access_order.push(key.clone());
128            Some(entry.access())
129        } else {
130            None
131        }
132    }
133
134    fn put(&mut self, key: K, value: V) {
135        // Evict if at capacity
136        if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
137            self.evict_lru();
138        }
139
140        // Remove from current position in access order
141        if let Some(pos) = self.access_order.iter().position(|k| k == &key) {
142            self.access_order.remove(pos);
143        }
144
145        // Add to end (most recently used)
146        self.access_order.push(key.clone());
147        self.entries.insert(key, CacheEntry::new(value));
148    }
149
150    fn evict_lru(&mut self) {
151        if let Some(key) = self.access_order.first().cloned() {
152            self.access_order.remove(0);
153            self.entries.remove(&key);
154        }
155    }
156
157    fn clear(&mut self) {
158        self.entries.clear();
159        self.access_order.clear();
160    }
161
162    fn len(&self) -> usize {
163        self.entries.len()
164    }
165
166    fn remove(&mut self, key: &K) -> Option<V> {
167        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
168            self.access_order.remove(pos);
169        }
170        self.entries.remove(key).map(|e| e.value)
171    }
172}
173
174/// Query cache for parsed and optimized plans.
175pub struct QueryCache {
176    /// Cache for parsed (translated) logical plans.
177    parsed_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
178    /// Cache for optimized logical plans.
179    optimized_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
180    /// Cache hit count for parsed plans.
181    parsed_hits: AtomicU64,
182    /// Cache miss count for parsed plans.
183    parsed_misses: AtomicU64,
184    /// Cache hit count for optimized plans.
185    optimized_hits: AtomicU64,
186    /// Cache miss count for optimized plans.
187    optimized_misses: AtomicU64,
188    /// Whether caching is enabled.
189    enabled: bool,
190}
191
192impl QueryCache {
193    /// Creates a new query cache with the specified capacity.
194    ///
195    /// The capacity is shared between parsed and optimized caches
196    /// (each gets half the capacity).
197    #[must_use]
198    pub fn new(capacity: usize) -> Self {
199        let half_capacity = capacity / 2;
200        Self {
201            parsed_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
202            optimized_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
203            parsed_hits: AtomicU64::new(0),
204            parsed_misses: AtomicU64::new(0),
205            optimized_hits: AtomicU64::new(0),
206            optimized_misses: AtomicU64::new(0),
207            enabled: true,
208        }
209    }
210
211    /// Creates a disabled cache (for testing or when caching is not desired).
212    #[must_use]
213    pub fn disabled() -> Self {
214        Self {
215            parsed_cache: Mutex::new(LruCache::new(0)),
216            optimized_cache: Mutex::new(LruCache::new(0)),
217            parsed_hits: AtomicU64::new(0),
218            parsed_misses: AtomicU64::new(0),
219            optimized_hits: AtomicU64::new(0),
220            optimized_misses: AtomicU64::new(0),
221            enabled: false,
222        }
223    }
224
225    /// Returns whether caching is enabled.
226    #[must_use]
227    pub fn is_enabled(&self) -> bool {
228        self.enabled
229    }
230
231    /// Gets a parsed plan from the cache.
232    pub fn get_parsed(&self, key: &CacheKey) -> Option<LogicalPlan> {
233        if !self.enabled {
234            return None;
235        }
236
237        let result = self.parsed_cache.lock().get(key);
238        if result.is_some() {
239            self.parsed_hits.fetch_add(1, Ordering::Relaxed);
240        } else {
241            self.parsed_misses.fetch_add(1, Ordering::Relaxed);
242        }
243        result
244    }
245
246    /// Puts a parsed plan into the cache.
247    pub fn put_parsed(&self, key: CacheKey, plan: LogicalPlan) {
248        if !self.enabled {
249            return;
250        }
251        self.parsed_cache.lock().put(key, plan);
252    }
253
254    /// Gets an optimized plan from the cache.
255    pub fn get_optimized(&self, key: &CacheKey) -> Option<LogicalPlan> {
256        if !self.enabled {
257            return None;
258        }
259
260        let result = self.optimized_cache.lock().get(key);
261        if result.is_some() {
262            self.optimized_hits.fetch_add(1, Ordering::Relaxed);
263        } else {
264            self.optimized_misses.fetch_add(1, Ordering::Relaxed);
265        }
266        result
267    }
268
269    /// Puts an optimized plan into the cache.
270    pub fn put_optimized(&self, key: CacheKey, plan: LogicalPlan) {
271        if !self.enabled {
272            return;
273        }
274        self.optimized_cache.lock().put(key, plan);
275    }
276
277    /// Invalidates a specific query from both caches.
278    pub fn invalidate(&self, key: &CacheKey) {
279        self.parsed_cache.lock().remove(key);
280        self.optimized_cache.lock().remove(key);
281    }
282
283    /// Clears all cached entries.
284    pub fn clear(&self) {
285        self.parsed_cache.lock().clear();
286        self.optimized_cache.lock().clear();
287    }
288
289    /// Returns cache statistics.
290    #[must_use]
291    pub fn stats(&self) -> CacheStats {
292        CacheStats {
293            parsed_size: self.parsed_cache.lock().len(),
294            optimized_size: self.optimized_cache.lock().len(),
295            parsed_hits: self.parsed_hits.load(Ordering::Relaxed),
296            parsed_misses: self.parsed_misses.load(Ordering::Relaxed),
297            optimized_hits: self.optimized_hits.load(Ordering::Relaxed),
298            optimized_misses: self.optimized_misses.load(Ordering::Relaxed),
299        }
300    }
301
302    /// Resets hit/miss counters.
303    pub fn reset_stats(&self) {
304        self.parsed_hits.store(0, Ordering::Relaxed);
305        self.parsed_misses.store(0, Ordering::Relaxed);
306        self.optimized_hits.store(0, Ordering::Relaxed);
307        self.optimized_misses.store(0, Ordering::Relaxed);
308    }
309}
310
311impl Default for QueryCache {
312    fn default() -> Self {
313        // Default capacity of 1000 queries
314        Self::new(1000)
315    }
316}
317
318/// Cache statistics.
319#[derive(Debug, Clone)]
320pub struct CacheStats {
321    /// Number of entries in parsed cache.
322    pub parsed_size: usize,
323    /// Number of entries in optimized cache.
324    pub optimized_size: usize,
325    /// Number of parsed cache hits.
326    pub parsed_hits: u64,
327    /// Number of parsed cache misses.
328    pub parsed_misses: u64,
329    /// Number of optimized cache hits.
330    pub optimized_hits: u64,
331    /// Number of optimized cache misses.
332    pub optimized_misses: u64,
333}
334
335impl CacheStats {
336    /// Returns the hit rate for parsed cache (0.0 to 1.0).
337    #[must_use]
338    pub fn parsed_hit_rate(&self) -> f64 {
339        let total = self.parsed_hits + self.parsed_misses;
340        if total == 0 {
341            0.0
342        } else {
343            self.parsed_hits as f64 / total as f64
344        }
345    }
346
347    /// Returns the hit rate for optimized cache (0.0 to 1.0).
348    #[must_use]
349    pub fn optimized_hit_rate(&self) -> f64 {
350        let total = self.optimized_hits + self.optimized_misses;
351        if total == 0 {
352            0.0
353        } else {
354            self.optimized_hits as f64 / total as f64
355        }
356    }
357
358    /// Returns the total cache size.
359    #[must_use]
360    pub fn total_size(&self) -> usize {
361        self.parsed_size + self.optimized_size
362    }
363
364    /// Returns the total hit rate.
365    #[must_use]
366    pub fn total_hit_rate(&self) -> f64 {
367        let total_hits = self.parsed_hits + self.optimized_hits;
368        let total_misses = self.parsed_misses + self.optimized_misses;
369        let total = total_hits + total_misses;
370        if total == 0 {
371            0.0
372        } else {
373            total_hits as f64 / total as f64
374        }
375    }
376}
377
378/// A caching wrapper for the query processor.
379///
380/// This type wraps a query processor and adds caching capabilities.
381/// Use this for production deployments where query caching is beneficial.
382pub struct CachingQueryProcessor<P> {
383    /// The underlying processor.
384    processor: P,
385    /// The query cache.
386    cache: QueryCache,
387}
388
389impl<P> CachingQueryProcessor<P> {
390    /// Creates a new caching processor.
391    pub fn new(processor: P, cache: QueryCache) -> Self {
392        Self { processor, cache }
393    }
394
395    /// Creates a new caching processor with default cache settings.
396    pub fn with_default_cache(processor: P) -> Self {
397        Self::new(processor, QueryCache::default())
398    }
399
400    /// Returns a reference to the cache.
401    #[must_use]
402    pub fn cache(&self) -> &QueryCache {
403        &self.cache
404    }
405
406    /// Returns a reference to the underlying processor.
407    #[must_use]
408    pub fn processor(&self) -> &P {
409        &self.processor
410    }
411
412    /// Returns cache statistics.
413    #[must_use]
414    pub fn stats(&self) -> CacheStats {
415        self.cache.stats()
416    }
417
418    /// Clears the cache.
419    pub fn clear_cache(&self) {
420        self.cache.clear();
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427
428    #[cfg(feature = "gql")]
429    fn test_language() -> QueryLanguage {
430        QueryLanguage::Gql
431    }
432
433    #[cfg(not(feature = "gql"))]
434    fn test_language() -> QueryLanguage {
435        // Fallback for tests without gql feature
436        #[cfg(feature = "cypher")]
437        return QueryLanguage::Cypher;
438        #[cfg(feature = "sparql")]
439        return QueryLanguage::Sparql;
440    }
441
442    #[test]
443    fn test_cache_key_normalization() {
444        let key1 = CacheKey::new("MATCH  (n)  RETURN n", test_language());
445        let key2 = CacheKey::new("MATCH (n) RETURN n", test_language());
446
447        // Both should normalize to the same key
448        assert_eq!(key1.query(), key2.query());
449    }
450
451    #[test]
452    fn test_cache_basic_operations() {
453        let cache = QueryCache::new(10);
454        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
455
456        // Create a simple logical plan for testing
457        use crate::query::plan::{LogicalOperator, LogicalPlan};
458        let plan = LogicalPlan::new(LogicalOperator::Empty);
459
460        // Initially empty
461        assert!(cache.get_parsed(&key).is_none());
462
463        // Put and get
464        cache.put_parsed(key.clone(), plan.clone());
465        assert!(cache.get_parsed(&key).is_some());
466
467        // Stats
468        let stats = cache.stats();
469        assert_eq!(stats.parsed_size, 1);
470        assert_eq!(stats.parsed_hits, 1);
471        assert_eq!(stats.parsed_misses, 1);
472    }
473
474    #[test]
475    fn test_cache_lru_eviction() {
476        let cache = QueryCache::new(4); // 2 entries per cache level
477
478        use crate::query::plan::{LogicalOperator, LogicalPlan};
479
480        // Add 3 entries to parsed cache (capacity is 2)
481        for i in 0..3 {
482            let key = CacheKey::new(format!("QUERY {}", i), test_language());
483            cache.put_parsed(key, LogicalPlan::new(LogicalOperator::Empty));
484        }
485
486        // First entry should be evicted
487        let key0 = CacheKey::new("QUERY 0", test_language());
488        assert!(cache.get_parsed(&key0).is_none());
489
490        // Entry 1 and 2 should still be present
491        let key1 = CacheKey::new("QUERY 1", test_language());
492        let key2 = CacheKey::new("QUERY 2", test_language());
493        assert!(cache.get_parsed(&key1).is_some());
494        assert!(cache.get_parsed(&key2).is_some());
495    }
496
497    #[test]
498    fn test_cache_invalidation() {
499        let cache = QueryCache::new(10);
500        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
501
502        use crate::query::plan::{LogicalOperator, LogicalPlan};
503        let plan = LogicalPlan::new(LogicalOperator::Empty);
504
505        cache.put_parsed(key.clone(), plan.clone());
506        cache.put_optimized(key.clone(), plan);
507
508        assert!(cache.get_parsed(&key).is_some());
509        assert!(cache.get_optimized(&key).is_some());
510
511        // Invalidate
512        cache.invalidate(&key);
513
514        // Clear stats from previous gets
515        cache.reset_stats();
516
517        assert!(cache.get_parsed(&key).is_none());
518        assert!(cache.get_optimized(&key).is_none());
519    }
520
521    #[test]
522    fn test_cache_disabled() {
523        let cache = QueryCache::disabled();
524        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
525
526        use crate::query::plan::{LogicalOperator, LogicalPlan};
527        let plan = LogicalPlan::new(LogicalOperator::Empty);
528
529        // Should not store anything
530        cache.put_parsed(key.clone(), plan);
531        assert!(cache.get_parsed(&key).is_none());
532
533        // Stats should be zero
534        let stats = cache.stats();
535        assert_eq!(stats.parsed_size, 0);
536    }
537
538    #[test]
539    fn test_cache_stats() {
540        let cache = QueryCache::new(10);
541
542        use crate::query::plan::{LogicalOperator, LogicalPlan};
543
544        let key1 = CacheKey::new("QUERY 1", test_language());
545        let key2 = CacheKey::new("QUERY 2", test_language());
546        let plan = LogicalPlan::new(LogicalOperator::Empty);
547
548        // Miss
549        cache.get_optimized(&key1);
550
551        // Put and hit
552        cache.put_optimized(key1.clone(), plan);
553        cache.get_optimized(&key1);
554        cache.get_optimized(&key1);
555
556        // Another miss
557        cache.get_optimized(&key2);
558
559        let stats = cache.stats();
560        assert_eq!(stats.optimized_hits, 2);
561        assert_eq!(stats.optimized_misses, 2);
562        assert!((stats.optimized_hit_rate() - 0.5).abs() < 0.01);
563    }
564}