Skip to main content

grafeo_engine/query/
cache.rs

1//! Query cache for parsed and planned queries.
2//!
3//! This module provides an LRU cache for query plans to avoid repeated
4//! parsing and optimization of frequently executed queries.
5//!
6//! ## Cache Levels
7//!
8//! - **Parsed cache**: Caches logical plans after translation (language-specific parsing)
9//! - **Optimized cache**: Caches logical plans after optimization
10//!
11//! ## Usage
12//!
13//! ```ignore
14//! let cache = QueryCache::new(1000);
15//!
16//! // Check cache first
17//! if let Some(plan) = cache.get_optimized(&cache_key) {
18//!     return execute(plan);
19//! }
20//!
21//! // Parse and optimize
22//! let plan = parse_and_optimize(query)?;
23//! cache.put_optimized(cache_key, plan.clone());
24//! execute(plan)
25//! ```
26
27use parking_lot::Mutex;
28use std::collections::HashMap;
29use std::hash::Hash;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::Instant;
32
33use crate::query::plan::LogicalPlan;
34use crate::query::processor::QueryLanguage;
35
36/// Cache key combining query text and language.
37#[derive(Clone, Eq, PartialEq, Hash)]
38pub struct CacheKey {
39    /// The query string (normalized).
40    query: String,
41    /// The query language.
42    language: QueryLanguage,
43}
44
45impl CacheKey {
46    /// Creates a new cache key.
47    #[must_use]
48    pub fn new(query: impl Into<String>, language: QueryLanguage) -> Self {
49        Self {
50            query: normalize_query(&query.into()),
51            language,
52        }
53    }
54
55    /// Returns the query string.
56    #[must_use]
57    pub fn query(&self) -> &str {
58        &self.query
59    }
60
61    /// Returns the query language.
62    #[must_use]
63    pub fn language(&self) -> QueryLanguage {
64        self.language
65    }
66}
67
68/// Normalizes a query string for caching.
69///
70/// Removes extra whitespace and normalizes case for keywords.
71fn normalize_query(query: &str) -> String {
72    // Simple normalization: collapse whitespace
73    query.split_whitespace().collect::<Vec<_>>().join(" ")
74}
75
76/// Entry in the cache with metadata.
77struct CacheEntry<T> {
78    /// The cached value.
79    value: T,
80    /// When the entry was created.
81    #[allow(dead_code)]
82    created_at: Instant,
83    /// Number of times this entry was accessed.
84    access_count: u64,
85    /// Last access time.
86    last_accessed: Instant,
87}
88
89impl<T: Clone> CacheEntry<T> {
90    fn new(value: T) -> Self {
91        let now = Instant::now();
92        Self {
93            value,
94            created_at: now,
95            access_count: 0,
96            last_accessed: now,
97        }
98    }
99
100    fn access(&mut self) -> T {
101        self.access_count += 1;
102        self.last_accessed = Instant::now();
103        self.value.clone()
104    }
105}
106
107/// LRU cache implementation.
108struct LruCache<K, V> {
109    /// The cache storage.
110    entries: HashMap<K, CacheEntry<V>>,
111    /// Maximum number of entries.
112    capacity: usize,
113    /// Order of access (for LRU eviction).
114    access_order: Vec<K>,
115}
116
117impl<K: Clone + Eq + Hash, V: Clone> LruCache<K, V> {
118    fn new(capacity: usize) -> Self {
119        Self {
120            entries: HashMap::with_capacity(capacity),
121            capacity,
122            access_order: Vec::with_capacity(capacity),
123        }
124    }
125
126    fn get(&mut self, key: &K) -> Option<V> {
127        if let Some(entry) = self.entries.get_mut(key) {
128            // Move to end of access order (most recently used)
129            if let Some(pos) = self.access_order.iter().position(|k| k == key) {
130                self.access_order.remove(pos);
131            }
132            self.access_order.push(key.clone());
133            Some(entry.access())
134        } else {
135            None
136        }
137    }
138
139    fn put(&mut self, key: K, value: V) {
140        // Evict if at capacity
141        if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
142            self.evict_lru();
143        }
144
145        // Remove from current position in access order
146        if let Some(pos) = self.access_order.iter().position(|k| k == &key) {
147            self.access_order.remove(pos);
148        }
149
150        // Add to end (most recently used)
151        self.access_order.push(key.clone());
152        self.entries.insert(key, CacheEntry::new(value));
153    }
154
155    fn evict_lru(&mut self) {
156        if let Some(key) = self.access_order.first().cloned() {
157            self.access_order.remove(0);
158            self.entries.remove(&key);
159        }
160    }
161
162    fn clear(&mut self) {
163        self.entries.clear();
164        self.access_order.clear();
165    }
166
167    fn len(&self) -> usize {
168        self.entries.len()
169    }
170
171    fn remove(&mut self, key: &K) -> Option<V> {
172        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
173            self.access_order.remove(pos);
174        }
175        self.entries.remove(key).map(|e| e.value)
176    }
177}
178
179/// Query cache for parsed and optimized plans.
180pub struct QueryCache {
181    /// Cache for parsed (translated) logical plans.
182    parsed_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
183    /// Cache for optimized logical plans.
184    optimized_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
185    /// Cache hit count for parsed plans.
186    parsed_hits: AtomicU64,
187    /// Cache miss count for parsed plans.
188    parsed_misses: AtomicU64,
189    /// Cache hit count for optimized plans.
190    optimized_hits: AtomicU64,
191    /// Cache miss count for optimized plans.
192    optimized_misses: AtomicU64,
193    /// Whether caching is enabled.
194    enabled: bool,
195}
196
197impl QueryCache {
198    /// Creates a new query cache with the specified capacity.
199    ///
200    /// The capacity is shared between parsed and optimized caches
201    /// (each gets half the capacity).
202    #[must_use]
203    pub fn new(capacity: usize) -> Self {
204        let half_capacity = capacity / 2;
205        Self {
206            parsed_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
207            optimized_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
208            parsed_hits: AtomicU64::new(0),
209            parsed_misses: AtomicU64::new(0),
210            optimized_hits: AtomicU64::new(0),
211            optimized_misses: AtomicU64::new(0),
212            enabled: true,
213        }
214    }
215
216    /// Creates a disabled cache (for testing or when caching is not desired).
217    #[must_use]
218    pub fn disabled() -> Self {
219        Self {
220            parsed_cache: Mutex::new(LruCache::new(0)),
221            optimized_cache: Mutex::new(LruCache::new(0)),
222            parsed_hits: AtomicU64::new(0),
223            parsed_misses: AtomicU64::new(0),
224            optimized_hits: AtomicU64::new(0),
225            optimized_misses: AtomicU64::new(0),
226            enabled: false,
227        }
228    }
229
230    /// Returns whether caching is enabled.
231    #[must_use]
232    pub fn is_enabled(&self) -> bool {
233        self.enabled
234    }
235
236    /// Gets a parsed plan from the cache.
237    pub fn get_parsed(&self, key: &CacheKey) -> Option<LogicalPlan> {
238        if !self.enabled {
239            return None;
240        }
241
242        let result = self.parsed_cache.lock().get(key);
243        if result.is_some() {
244            self.parsed_hits.fetch_add(1, Ordering::Relaxed);
245        } else {
246            self.parsed_misses.fetch_add(1, Ordering::Relaxed);
247        }
248        result
249    }
250
251    /// Puts a parsed plan into the cache.
252    pub fn put_parsed(&self, key: CacheKey, plan: LogicalPlan) {
253        if !self.enabled {
254            return;
255        }
256        self.parsed_cache.lock().put(key, plan);
257    }
258
259    /// Gets an optimized plan from the cache.
260    pub fn get_optimized(&self, key: &CacheKey) -> Option<LogicalPlan> {
261        if !self.enabled {
262            return None;
263        }
264
265        let result = self.optimized_cache.lock().get(key);
266        if result.is_some() {
267            self.optimized_hits.fetch_add(1, Ordering::Relaxed);
268        } else {
269            self.optimized_misses.fetch_add(1, Ordering::Relaxed);
270        }
271        result
272    }
273
274    /// Puts an optimized plan into the cache.
275    pub fn put_optimized(&self, key: CacheKey, plan: LogicalPlan) {
276        if !self.enabled {
277            return;
278        }
279        self.optimized_cache.lock().put(key, plan);
280    }
281
282    /// Invalidates a specific query from both caches.
283    pub fn invalidate(&self, key: &CacheKey) {
284        self.parsed_cache.lock().remove(key);
285        self.optimized_cache.lock().remove(key);
286    }
287
288    /// Clears all cached entries.
289    pub fn clear(&self) {
290        self.parsed_cache.lock().clear();
291        self.optimized_cache.lock().clear();
292    }
293
294    /// Returns cache statistics.
295    #[must_use]
296    pub fn stats(&self) -> CacheStats {
297        CacheStats {
298            parsed_size: self.parsed_cache.lock().len(),
299            optimized_size: self.optimized_cache.lock().len(),
300            parsed_hits: self.parsed_hits.load(Ordering::Relaxed),
301            parsed_misses: self.parsed_misses.load(Ordering::Relaxed),
302            optimized_hits: self.optimized_hits.load(Ordering::Relaxed),
303            optimized_misses: self.optimized_misses.load(Ordering::Relaxed),
304        }
305    }
306
307    /// Resets hit/miss counters.
308    pub fn reset_stats(&self) {
309        self.parsed_hits.store(0, Ordering::Relaxed);
310        self.parsed_misses.store(0, Ordering::Relaxed);
311        self.optimized_hits.store(0, Ordering::Relaxed);
312        self.optimized_misses.store(0, Ordering::Relaxed);
313    }
314}
315
316impl Default for QueryCache {
317    fn default() -> Self {
318        // Default capacity of 1000 queries
319        Self::new(1000)
320    }
321}
322
323/// Cache statistics.
324#[derive(Debug, Clone)]
325pub struct CacheStats {
326    /// Number of entries in parsed cache.
327    pub parsed_size: usize,
328    /// Number of entries in optimized cache.
329    pub optimized_size: usize,
330    /// Number of parsed cache hits.
331    pub parsed_hits: u64,
332    /// Number of parsed cache misses.
333    pub parsed_misses: u64,
334    /// Number of optimized cache hits.
335    pub optimized_hits: u64,
336    /// Number of optimized cache misses.
337    pub optimized_misses: u64,
338}
339
340impl CacheStats {
341    /// Returns the hit rate for parsed cache (0.0 to 1.0).
342    #[must_use]
343    pub fn parsed_hit_rate(&self) -> f64 {
344        let total = self.parsed_hits + self.parsed_misses;
345        if total == 0 {
346            0.0
347        } else {
348            self.parsed_hits as f64 / total as f64
349        }
350    }
351
352    /// Returns the hit rate for optimized cache (0.0 to 1.0).
353    #[must_use]
354    pub fn optimized_hit_rate(&self) -> f64 {
355        let total = self.optimized_hits + self.optimized_misses;
356        if total == 0 {
357            0.0
358        } else {
359            self.optimized_hits as f64 / total as f64
360        }
361    }
362
363    /// Returns the total cache size.
364    #[must_use]
365    pub fn total_size(&self) -> usize {
366        self.parsed_size + self.optimized_size
367    }
368
369    /// Returns the total hit rate.
370    #[must_use]
371    pub fn total_hit_rate(&self) -> f64 {
372        let total_hits = self.parsed_hits + self.optimized_hits;
373        let total_misses = self.parsed_misses + self.optimized_misses;
374        let total = total_hits + total_misses;
375        if total == 0 {
376            0.0
377        } else {
378            total_hits as f64 / total as f64
379        }
380    }
381}
382
383/// A caching wrapper for the query processor.
384///
385/// This type wraps a query processor and adds caching capabilities.
386/// Use this for production deployments where query caching is beneficial.
387pub struct CachingQueryProcessor<P> {
388    /// The underlying processor.
389    processor: P,
390    /// The query cache.
391    cache: QueryCache,
392}
393
394impl<P> CachingQueryProcessor<P> {
395    /// Creates a new caching processor.
396    pub fn new(processor: P, cache: QueryCache) -> Self {
397        Self { processor, cache }
398    }
399
400    /// Creates a new caching processor with default cache settings.
401    pub fn with_default_cache(processor: P) -> Self {
402        Self::new(processor, QueryCache::default())
403    }
404
405    /// Returns a reference to the cache.
406    #[must_use]
407    pub fn cache(&self) -> &QueryCache {
408        &self.cache
409    }
410
411    /// Returns a reference to the underlying processor.
412    #[must_use]
413    pub fn processor(&self) -> &P {
414        &self.processor
415    }
416
417    /// Returns cache statistics.
418    #[must_use]
419    pub fn stats(&self) -> CacheStats {
420        self.cache.stats()
421    }
422
423    /// Clears the cache.
424    pub fn clear_cache(&self) {
425        self.cache.clear();
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[cfg(feature = "gql")]
434    fn test_language() -> QueryLanguage {
435        QueryLanguage::Gql
436    }
437
438    #[cfg(not(feature = "gql"))]
439    fn test_language() -> QueryLanguage {
440        // Fallback for tests without gql feature
441        #[cfg(feature = "cypher")]
442        return QueryLanguage::Cypher;
443        #[cfg(feature = "sparql")]
444        return QueryLanguage::Sparql;
445    }
446
447    #[test]
448    fn test_cache_key_normalization() {
449        let key1 = CacheKey::new("MATCH  (n)  RETURN n", test_language());
450        let key2 = CacheKey::new("MATCH (n) RETURN n", test_language());
451
452        // Both should normalize to the same key
453        assert_eq!(key1.query(), key2.query());
454    }
455
456    #[test]
457    fn test_cache_basic_operations() {
458        let cache = QueryCache::new(10);
459        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
460
461        // Create a simple logical plan for testing
462        use crate::query::plan::{LogicalOperator, LogicalPlan};
463        let plan = LogicalPlan::new(LogicalOperator::Empty);
464
465        // Initially empty
466        assert!(cache.get_parsed(&key).is_none());
467
468        // Put and get
469        cache.put_parsed(key.clone(), plan.clone());
470        assert!(cache.get_parsed(&key).is_some());
471
472        // Stats
473        let stats = cache.stats();
474        assert_eq!(stats.parsed_size, 1);
475        assert_eq!(stats.parsed_hits, 1);
476        assert_eq!(stats.parsed_misses, 1);
477    }
478
479    #[test]
480    fn test_cache_lru_eviction() {
481        let cache = QueryCache::new(4); // 2 entries per cache level
482
483        use crate::query::plan::{LogicalOperator, LogicalPlan};
484
485        // Add 3 entries to parsed cache (capacity is 2)
486        for i in 0..3 {
487            let key = CacheKey::new(format!("QUERY {}", i), test_language());
488            cache.put_parsed(key, LogicalPlan::new(LogicalOperator::Empty));
489        }
490
491        // First entry should be evicted
492        let key0 = CacheKey::new("QUERY 0", test_language());
493        assert!(cache.get_parsed(&key0).is_none());
494
495        // Entry 1 and 2 should still be present
496        let key1 = CacheKey::new("QUERY 1", test_language());
497        let key2 = CacheKey::new("QUERY 2", test_language());
498        assert!(cache.get_parsed(&key1).is_some());
499        assert!(cache.get_parsed(&key2).is_some());
500    }
501
502    #[test]
503    fn test_cache_invalidation() {
504        let cache = QueryCache::new(10);
505        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
506
507        use crate::query::plan::{LogicalOperator, LogicalPlan};
508        let plan = LogicalPlan::new(LogicalOperator::Empty);
509
510        cache.put_parsed(key.clone(), plan.clone());
511        cache.put_optimized(key.clone(), plan);
512
513        assert!(cache.get_parsed(&key).is_some());
514        assert!(cache.get_optimized(&key).is_some());
515
516        // Invalidate
517        cache.invalidate(&key);
518
519        // Clear stats from previous gets
520        cache.reset_stats();
521
522        assert!(cache.get_parsed(&key).is_none());
523        assert!(cache.get_optimized(&key).is_none());
524    }
525
526    #[test]
527    fn test_cache_disabled() {
528        let cache = QueryCache::disabled();
529        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
530
531        use crate::query::plan::{LogicalOperator, LogicalPlan};
532        let plan = LogicalPlan::new(LogicalOperator::Empty);
533
534        // Should not store anything
535        cache.put_parsed(key.clone(), plan);
536        assert!(cache.get_parsed(&key).is_none());
537
538        // Stats should be zero
539        let stats = cache.stats();
540        assert_eq!(stats.parsed_size, 0);
541    }
542
543    #[test]
544    fn test_cache_stats() {
545        let cache = QueryCache::new(10);
546
547        use crate::query::plan::{LogicalOperator, LogicalPlan};
548
549        let key1 = CacheKey::new("QUERY 1", test_language());
550        let key2 = CacheKey::new("QUERY 2", test_language());
551        let plan = LogicalPlan::new(LogicalOperator::Empty);
552
553        // Miss
554        cache.get_optimized(&key1);
555
556        // Put and hit
557        cache.put_optimized(key1.clone(), plan);
558        cache.get_optimized(&key1);
559        cache.get_optimized(&key1);
560
561        // Another miss
562        cache.get_optimized(&key2);
563
564        let stats = cache.stats();
565        assert_eq!(stats.optimized_hits, 2);
566        assert_eq!(stats.optimized_misses, 2);
567        assert!((stats.optimized_hit_rate() - 0.5).abs() < 0.01);
568    }
569}