Skip to main content

grafeo_engine/query/
cache.rs

1//! Query cache for parsed and planned queries.
2//!
3//! This module provides an LRU cache for query plans to avoid repeated
4//! parsing and optimization of frequently executed queries.
5//!
6//! ## Cache Levels
7//!
8//! - **Parsed cache**: Caches logical plans after translation (language-specific parsing)
9//! - **Optimized cache**: Caches logical plans after optimization
10//!
11//! ## Usage
12//!
13//! ```no_run
14//! use grafeo_engine::query::cache::{QueryCache, CacheKey};
15//! use grafeo_engine::query::processor::QueryLanguage;
16//! use grafeo_engine::query::plan::{LogicalPlan, LogicalOperator};
17//!
18//! let cache = QueryCache::new(1000);
19//! let cache_key = CacheKey::new("MATCH (n) RETURN n", QueryLanguage::Gql);
20//!
21//! // Check cache first
22//! if let Some(plan) = cache.get_optimized(&cache_key) {
23//!     // use cached plan
24//! }
25//!
26//! // Parse and optimize, then cache
27//! let plan = LogicalPlan::new(LogicalOperator::Empty);
28//! cache.put_optimized(cache_key, plan);
29//! ```
30
31use parking_lot::Mutex;
32use std::collections::HashMap;
33use std::hash::Hash;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::time::Instant;
36
37use crate::query::plan::LogicalPlan;
38use crate::query::processor::QueryLanguage;
39
40/// Cache key combining query text and language.
41#[derive(Clone, Eq, PartialEq, Hash)]
42pub struct CacheKey {
43    /// The query string (normalized).
44    query: String,
45    /// The query language.
46    language: QueryLanguage,
47}
48
49impl CacheKey {
50    /// Creates a new cache key.
51    #[must_use]
52    pub fn new(query: impl Into<String>, language: QueryLanguage) -> Self {
53        Self {
54            query: normalize_query(&query.into()),
55            language,
56        }
57    }
58
59    /// Returns the query string.
60    #[must_use]
61    pub fn query(&self) -> &str {
62        &self.query
63    }
64
65    /// Returns the query language.
66    #[must_use]
67    pub fn language(&self) -> QueryLanguage {
68        self.language
69    }
70}
71
72/// Normalizes a query string for caching.
73///
74/// Removes extra whitespace and normalizes case for keywords.
75fn normalize_query(query: &str) -> String {
76    // Simple normalization: collapse whitespace
77    query.split_whitespace().collect::<Vec<_>>().join(" ")
78}
79
80/// Entry in the cache with metadata.
81struct CacheEntry<T> {
82    /// The cached value.
83    value: T,
84    /// Number of times this entry was accessed.
85    access_count: u64,
86    /// Last access time (not available on WASM).
87    #[cfg(not(target_arch = "wasm32"))]
88    last_accessed: Instant,
89}
90
91impl<T: Clone> CacheEntry<T> {
92    fn new(value: T) -> Self {
93        Self {
94            value,
95            access_count: 0,
96            #[cfg(not(target_arch = "wasm32"))]
97            last_accessed: Instant::now(),
98        }
99    }
100
101    fn access(&mut self) -> T {
102        self.access_count += 1;
103        #[cfg(not(target_arch = "wasm32"))]
104        {
105            self.last_accessed = Instant::now();
106        }
107        self.value.clone()
108    }
109}
110
111/// LRU cache implementation.
112struct LruCache<K, V> {
113    /// The cache storage.
114    entries: HashMap<K, CacheEntry<V>>,
115    /// Maximum number of entries.
116    capacity: usize,
117    /// Order of access (for LRU eviction).
118    access_order: Vec<K>,
119}
120
121impl<K: Clone + Eq + Hash, V: Clone> LruCache<K, V> {
122    fn new(capacity: usize) -> Self {
123        Self {
124            entries: HashMap::with_capacity(capacity),
125            capacity,
126            access_order: Vec::with_capacity(capacity),
127        }
128    }
129
130    fn get(&mut self, key: &K) -> Option<V> {
131        if let Some(entry) = self.entries.get_mut(key) {
132            // Move to end of access order (most recently used)
133            if let Some(pos) = self.access_order.iter().position(|k| k == key) {
134                self.access_order.remove(pos);
135            }
136            self.access_order.push(key.clone());
137            Some(entry.access())
138        } else {
139            None
140        }
141    }
142
143    fn put(&mut self, key: K, value: V) {
144        // Evict if at capacity
145        if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
146            self.evict_lru();
147        }
148
149        // Remove from current position in access order
150        if let Some(pos) = self.access_order.iter().position(|k| k == &key) {
151            self.access_order.remove(pos);
152        }
153
154        // Add to end (most recently used)
155        self.access_order.push(key.clone());
156        self.entries.insert(key, CacheEntry::new(value));
157    }
158
159    fn evict_lru(&mut self) {
160        if let Some(key) = self.access_order.first().cloned() {
161            self.access_order.remove(0);
162            self.entries.remove(&key);
163        }
164    }
165
166    fn clear(&mut self) {
167        self.entries.clear();
168        self.access_order.clear();
169    }
170
171    fn len(&self) -> usize {
172        self.entries.len()
173    }
174
175    fn remove(&mut self, key: &K) -> Option<V> {
176        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
177            self.access_order.remove(pos);
178        }
179        self.entries.remove(key).map(|e| e.value)
180    }
181}
182
183/// Query cache for parsed and optimized plans.
184pub struct QueryCache {
185    /// Cache for parsed (translated) logical plans.
186    parsed_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
187    /// Cache for optimized logical plans.
188    optimized_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
189    /// Cache hit count for parsed plans.
190    parsed_hits: AtomicU64,
191    /// Cache miss count for parsed plans.
192    parsed_misses: AtomicU64,
193    /// Cache hit count for optimized plans.
194    optimized_hits: AtomicU64,
195    /// Cache miss count for optimized plans.
196    optimized_misses: AtomicU64,
197    /// Number of times the cache was invalidated (cleared due to DDL).
198    invalidations: AtomicU64,
199    /// Whether caching is enabled.
200    enabled: bool,
201}
202
203impl QueryCache {
204    /// Creates a new query cache with the specified capacity.
205    ///
206    /// The capacity is shared between parsed and optimized caches
207    /// (each gets half the capacity).
208    #[must_use]
209    pub fn new(capacity: usize) -> Self {
210        let half_capacity = capacity / 2;
211        Self {
212            parsed_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
213            optimized_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
214            parsed_hits: AtomicU64::new(0),
215            parsed_misses: AtomicU64::new(0),
216            optimized_hits: AtomicU64::new(0),
217            optimized_misses: AtomicU64::new(0),
218            invalidations: AtomicU64::new(0),
219            enabled: true,
220        }
221    }
222
223    /// Creates a disabled cache (for testing or when caching is not desired).
224    #[must_use]
225    pub fn disabled() -> Self {
226        Self {
227            parsed_cache: Mutex::new(LruCache::new(0)),
228            optimized_cache: Mutex::new(LruCache::new(0)),
229            parsed_hits: AtomicU64::new(0),
230            parsed_misses: AtomicU64::new(0),
231            optimized_hits: AtomicU64::new(0),
232            optimized_misses: AtomicU64::new(0),
233            invalidations: AtomicU64::new(0),
234            enabled: false,
235        }
236    }
237
238    /// Returns whether caching is enabled.
239    #[must_use]
240    pub fn is_enabled(&self) -> bool {
241        self.enabled
242    }
243
244    /// Gets a parsed plan from the cache.
245    pub fn get_parsed(&self, key: &CacheKey) -> Option<LogicalPlan> {
246        if !self.enabled {
247            return None;
248        }
249
250        let result = self.parsed_cache.lock().get(key);
251        if result.is_some() {
252            self.parsed_hits.fetch_add(1, Ordering::Relaxed);
253        } else {
254            self.parsed_misses.fetch_add(1, Ordering::Relaxed);
255        }
256        result
257    }
258
259    /// Puts a parsed plan into the cache.
260    pub fn put_parsed(&self, key: CacheKey, plan: LogicalPlan) {
261        if !self.enabled {
262            return;
263        }
264        self.parsed_cache.lock().put(key, plan);
265    }
266
267    /// Gets an optimized plan from the cache.
268    pub fn get_optimized(&self, key: &CacheKey) -> Option<LogicalPlan> {
269        if !self.enabled {
270            return None;
271        }
272
273        let result = self.optimized_cache.lock().get(key);
274        if result.is_some() {
275            self.optimized_hits.fetch_add(1, Ordering::Relaxed);
276        } else {
277            self.optimized_misses.fetch_add(1, Ordering::Relaxed);
278        }
279        result
280    }
281
282    /// Puts an optimized plan into the cache.
283    pub fn put_optimized(&self, key: CacheKey, plan: LogicalPlan) {
284        if !self.enabled {
285            return;
286        }
287        self.optimized_cache.lock().put(key, plan);
288    }
289
290    /// Invalidates a specific query from both caches.
291    pub fn invalidate(&self, key: &CacheKey) {
292        self.parsed_cache.lock().remove(key);
293        self.optimized_cache.lock().remove(key);
294    }
295
296    /// Clears all cached entries and increments the invalidation counter
297    /// (only when the cache was non-empty).
298    pub fn clear(&self) {
299        let had_entries =
300            self.parsed_cache.lock().len() > 0 || self.optimized_cache.lock().len() > 0;
301        self.parsed_cache.lock().clear();
302        self.optimized_cache.lock().clear();
303        if had_entries {
304            self.invalidations.fetch_add(1, Ordering::Relaxed);
305        }
306    }
307
308    /// Returns cache statistics.
309    #[must_use]
310    pub fn stats(&self) -> CacheStats {
311        CacheStats {
312            parsed_size: self.parsed_cache.lock().len(),
313            optimized_size: self.optimized_cache.lock().len(),
314            parsed_hits: self.parsed_hits.load(Ordering::Relaxed),
315            parsed_misses: self.parsed_misses.load(Ordering::Relaxed),
316            optimized_hits: self.optimized_hits.load(Ordering::Relaxed),
317            optimized_misses: self.optimized_misses.load(Ordering::Relaxed),
318            invalidations: self.invalidations.load(Ordering::Relaxed),
319        }
320    }
321
322    /// Resets hit/miss counters and invalidation counter.
323    pub fn reset_stats(&self) {
324        self.parsed_hits.store(0, Ordering::Relaxed);
325        self.parsed_misses.store(0, Ordering::Relaxed);
326        self.optimized_hits.store(0, Ordering::Relaxed);
327        self.optimized_misses.store(0, Ordering::Relaxed);
328        self.invalidations.store(0, Ordering::Relaxed);
329    }
330}
331
332impl Default for QueryCache {
333    fn default() -> Self {
334        // Default capacity of 1000 queries
335        Self::new(1000)
336    }
337}
338
339/// Cache statistics.
340#[derive(Debug, Clone)]
341pub struct CacheStats {
342    /// Number of entries in parsed cache.
343    pub parsed_size: usize,
344    /// Number of entries in optimized cache.
345    pub optimized_size: usize,
346    /// Number of parsed cache hits.
347    pub parsed_hits: u64,
348    /// Number of parsed cache misses.
349    pub parsed_misses: u64,
350    /// Number of optimized cache hits.
351    pub optimized_hits: u64,
352    /// Number of optimized cache misses.
353    pub optimized_misses: u64,
354    /// Number of times the cache was invalidated (cleared due to DDL).
355    pub invalidations: u64,
356}
357
358impl CacheStats {
359    /// Returns the hit rate for parsed cache (0.0 to 1.0).
360    #[must_use]
361    pub fn parsed_hit_rate(&self) -> f64 {
362        let total = self.parsed_hits + self.parsed_misses;
363        if total == 0 {
364            0.0
365        } else {
366            self.parsed_hits as f64 / total as f64
367        }
368    }
369
370    /// Returns the hit rate for optimized cache (0.0 to 1.0).
371    #[must_use]
372    pub fn optimized_hit_rate(&self) -> f64 {
373        let total = self.optimized_hits + self.optimized_misses;
374        if total == 0 {
375            0.0
376        } else {
377            self.optimized_hits as f64 / total as f64
378        }
379    }
380
381    /// Returns the total cache size.
382    #[must_use]
383    pub fn total_size(&self) -> usize {
384        self.parsed_size + self.optimized_size
385    }
386
387    /// Returns the total hit rate.
388    #[must_use]
389    pub fn total_hit_rate(&self) -> f64 {
390        let total_hits = self.parsed_hits + self.optimized_hits;
391        let total_misses = self.parsed_misses + self.optimized_misses;
392        let total = total_hits + total_misses;
393        if total == 0 {
394            0.0
395        } else {
396            total_hits as f64 / total as f64
397        }
398    }
399}
400
401/// A caching wrapper for the query processor.
402///
403/// This type wraps a query processor and adds caching capabilities.
404/// Use this for production deployments where query caching is beneficial.
405pub struct CachingQueryProcessor<P> {
406    /// The underlying processor.
407    processor: P,
408    /// The query cache.
409    cache: QueryCache,
410}
411
412impl<P> CachingQueryProcessor<P> {
413    /// Creates a new caching processor.
414    pub fn new(processor: P, cache: QueryCache) -> Self {
415        Self { processor, cache }
416    }
417
418    /// Creates a new caching processor with default cache settings.
419    pub fn with_default_cache(processor: P) -> Self {
420        Self::new(processor, QueryCache::default())
421    }
422
423    /// Returns a reference to the cache.
424    #[must_use]
425    pub fn cache(&self) -> &QueryCache {
426        &self.cache
427    }
428
429    /// Returns a reference to the underlying processor.
430    #[must_use]
431    pub fn processor(&self) -> &P {
432        &self.processor
433    }
434
435    /// Returns cache statistics.
436    #[must_use]
437    pub fn stats(&self) -> CacheStats {
438        self.cache.stats()
439    }
440
441    /// Clears the cache.
442    pub fn clear_cache(&self) {
443        self.cache.clear();
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[cfg(feature = "gql")]
452    fn test_language() -> QueryLanguage {
453        QueryLanguage::Gql
454    }
455
456    #[cfg(not(feature = "gql"))]
457    fn test_language() -> QueryLanguage {
458        // Fallback for tests without gql feature
459        #[cfg(feature = "cypher")]
460        return QueryLanguage::Cypher;
461        #[cfg(feature = "sparql")]
462        return QueryLanguage::Sparql;
463    }
464
465    #[test]
466    fn test_cache_key_normalization() {
467        let key1 = CacheKey::new("MATCH  (n)  RETURN n", test_language());
468        let key2 = CacheKey::new("MATCH (n) RETURN n", test_language());
469
470        // Both should normalize to the same key
471        assert_eq!(key1.query(), key2.query());
472    }
473
474    #[test]
475    fn test_cache_basic_operations() {
476        let cache = QueryCache::new(10);
477        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
478
479        // Create a simple logical plan for testing
480        use crate::query::plan::{LogicalOperator, LogicalPlan};
481        let plan = LogicalPlan::new(LogicalOperator::Empty);
482
483        // Initially empty
484        assert!(cache.get_parsed(&key).is_none());
485
486        // Put and get
487        cache.put_parsed(key.clone(), plan.clone());
488        assert!(cache.get_parsed(&key).is_some());
489
490        // Stats
491        let stats = cache.stats();
492        assert_eq!(stats.parsed_size, 1);
493        assert_eq!(stats.parsed_hits, 1);
494        assert_eq!(stats.parsed_misses, 1);
495    }
496
497    #[test]
498    fn test_cache_lru_eviction() {
499        let cache = QueryCache::new(4); // 2 entries per cache level
500
501        use crate::query::plan::{LogicalOperator, LogicalPlan};
502
503        // Add 3 entries to parsed cache (capacity is 2)
504        for i in 0..3 {
505            let key = CacheKey::new(format!("QUERY {}", i), test_language());
506            cache.put_parsed(key, LogicalPlan::new(LogicalOperator::Empty));
507        }
508
509        // First entry should be evicted
510        let key0 = CacheKey::new("QUERY 0", test_language());
511        assert!(cache.get_parsed(&key0).is_none());
512
513        // Entry 1 and 2 should still be present
514        let key1 = CacheKey::new("QUERY 1", test_language());
515        let key2 = CacheKey::new("QUERY 2", test_language());
516        assert!(cache.get_parsed(&key1).is_some());
517        assert!(cache.get_parsed(&key2).is_some());
518    }
519
520    #[test]
521    fn test_cache_invalidation() {
522        let cache = QueryCache::new(10);
523        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
524
525        use crate::query::plan::{LogicalOperator, LogicalPlan};
526        let plan = LogicalPlan::new(LogicalOperator::Empty);
527
528        cache.put_parsed(key.clone(), plan.clone());
529        cache.put_optimized(key.clone(), plan);
530
531        assert!(cache.get_parsed(&key).is_some());
532        assert!(cache.get_optimized(&key).is_some());
533
534        // Invalidate
535        cache.invalidate(&key);
536
537        // Clear stats from previous gets
538        cache.reset_stats();
539
540        assert!(cache.get_parsed(&key).is_none());
541        assert!(cache.get_optimized(&key).is_none());
542    }
543
544    #[test]
545    fn test_cache_disabled() {
546        let cache = QueryCache::disabled();
547        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
548
549        use crate::query::plan::{LogicalOperator, LogicalPlan};
550        let plan = LogicalPlan::new(LogicalOperator::Empty);
551
552        // Should not store anything
553        cache.put_parsed(key.clone(), plan);
554        assert!(cache.get_parsed(&key).is_none());
555
556        // Stats should be zero
557        let stats = cache.stats();
558        assert_eq!(stats.parsed_size, 0);
559    }
560
561    #[test]
562    fn test_cache_stats() {
563        let cache = QueryCache::new(10);
564
565        use crate::query::plan::{LogicalOperator, LogicalPlan};
566
567        let key1 = CacheKey::new("QUERY 1", test_language());
568        let key2 = CacheKey::new("QUERY 2", test_language());
569        let plan = LogicalPlan::new(LogicalOperator::Empty);
570
571        // Miss
572        cache.get_optimized(&key1);
573
574        // Put and hit
575        cache.put_optimized(key1.clone(), plan);
576        cache.get_optimized(&key1);
577        cache.get_optimized(&key1);
578
579        // Another miss
580        cache.get_optimized(&key2);
581
582        let stats = cache.stats();
583        assert_eq!(stats.optimized_hits, 2);
584        assert_eq!(stats.optimized_misses, 2);
585        assert!((stats.optimized_hit_rate() - 0.5).abs() < 0.01);
586    }
587}