Skip to main content

grafeo_engine/query/
cache.rs

1//! Query cache for parsed and planned queries.
2//!
3//! This module provides an LRU cache for query plans to avoid repeated
4//! parsing and optimization of frequently executed queries.
5//!
6//! ## Cache Levels
7//!
8//! - **Parsed cache**: Caches logical plans after translation (language-specific parsing)
9//! - **Optimized cache**: Caches logical plans after optimization
10//!
11//! ## Usage
12//!
13//! ```no_run
14//! use grafeo_engine::query::cache::{QueryCache, CacheKey};
15//! use grafeo_engine::query::processor::QueryLanguage;
16//! use grafeo_engine::query::plan::{LogicalPlan, LogicalOperator};
17//!
18//! let cache = QueryCache::new(1000);
19//! let cache_key = CacheKey::new("MATCH (n) RETURN n", QueryLanguage::Gql);
20//!
21//! // Check cache first
22//! if let Some(plan) = cache.get_optimized(&cache_key) {
23//!     // use cached plan
24//! }
25//!
26//! // Parse and optimize, then cache
27//! let plan = LogicalPlan::new(LogicalOperator::Empty);
28//! cache.put_optimized(cache_key, plan);
29//! ```
30
31use parking_lot::Mutex;
32use std::collections::HashMap;
33use std::hash::Hash;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::time::Instant;
36
37use crate::query::plan::LogicalPlan;
38use crate::query::processor::QueryLanguage;
39
40/// Cache key combining query text, language, and active graph.
41#[derive(Clone, Eq, PartialEq, Hash)]
42pub struct CacheKey {
43    /// The query string (normalized).
44    query: String,
45    /// The query language.
46    language: QueryLanguage,
47    /// Active graph name (`None` = default graph).
48    graph: Option<String>,
49}
50
51impl CacheKey {
52    /// Creates a new cache key for the default graph.
53    #[must_use]
54    pub fn new(query: impl Into<String>, language: QueryLanguage) -> Self {
55        Self {
56            query: normalize_query(&query.into()),
57            language,
58            graph: None,
59        }
60    }
61
62    /// Creates a cache key scoped to a specific graph.
63    #[must_use]
64    pub fn with_graph(
65        query: impl Into<String>,
66        language: QueryLanguage,
67        graph: Option<String>,
68    ) -> Self {
69        Self {
70            query: normalize_query(&query.into()),
71            language,
72            graph,
73        }
74    }
75
76    /// Returns the query string.
77    #[must_use]
78    pub fn query(&self) -> &str {
79        &self.query
80    }
81
82    /// Returns the query language.
83    #[must_use]
84    pub fn language(&self) -> QueryLanguage {
85        self.language
86    }
87}
88
89/// Normalizes a query string for caching.
90///
91/// Removes extra whitespace and normalizes case for keywords.
92fn normalize_query(query: &str) -> String {
93    // Simple normalization: collapse whitespace
94    query.split_whitespace().collect::<Vec<_>>().join(" ")
95}
96
97/// Entry in the cache with metadata.
98struct CacheEntry<T> {
99    /// The cached value.
100    value: T,
101    /// Number of times this entry was accessed.
102    access_count: u64,
103    /// Last access time (not available on WASM).
104    #[cfg(not(target_arch = "wasm32"))]
105    last_accessed: Instant,
106}
107
108impl<T: Clone> CacheEntry<T> {
109    fn new(value: T) -> Self {
110        Self {
111            value,
112            access_count: 0,
113            #[cfg(not(target_arch = "wasm32"))]
114            last_accessed: Instant::now(),
115        }
116    }
117
118    fn access(&mut self) -> T {
119        self.access_count += 1;
120        #[cfg(not(target_arch = "wasm32"))]
121        {
122            self.last_accessed = Instant::now();
123        }
124        self.value.clone()
125    }
126}
127
128/// LRU cache implementation.
129struct LruCache<K, V> {
130    /// The cache storage.
131    entries: HashMap<K, CacheEntry<V>>,
132    /// Maximum number of entries.
133    capacity: usize,
134    /// Order of access (for LRU eviction).
135    access_order: Vec<K>,
136}
137
138impl<K: Clone + Eq + Hash, V: Clone> LruCache<K, V> {
139    fn new(capacity: usize) -> Self {
140        Self {
141            entries: HashMap::with_capacity(capacity),
142            capacity,
143            access_order: Vec::with_capacity(capacity),
144        }
145    }
146
147    fn get(&mut self, key: &K) -> Option<V> {
148        if let Some(entry) = self.entries.get_mut(key) {
149            // Move to end of access order (most recently used)
150            if let Some(pos) = self.access_order.iter().position(|k| k == key) {
151                self.access_order.remove(pos);
152            }
153            self.access_order.push(key.clone());
154            Some(entry.access())
155        } else {
156            None
157        }
158    }
159
160    fn put(&mut self, key: K, value: V) {
161        // Evict if at capacity
162        if self.entries.len() >= self.capacity && !self.entries.contains_key(&key) {
163            self.evict_lru();
164        }
165
166        // Remove from current position in access order
167        if let Some(pos) = self.access_order.iter().position(|k| k == &key) {
168            self.access_order.remove(pos);
169        }
170
171        // Add to end (most recently used)
172        self.access_order.push(key.clone());
173        self.entries.insert(key, CacheEntry::new(value));
174    }
175
176    fn evict_lru(&mut self) {
177        if let Some(key) = self.access_order.first().cloned() {
178            self.access_order.remove(0);
179            self.entries.remove(&key);
180        }
181    }
182
183    fn clear(&mut self) {
184        self.entries.clear();
185        self.access_order.clear();
186    }
187
188    fn len(&self) -> usize {
189        self.entries.len()
190    }
191
192    fn remove(&mut self, key: &K) -> Option<V> {
193        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
194            self.access_order.remove(pos);
195        }
196        self.entries.remove(key).map(|e| e.value)
197    }
198
199    /// Estimates heap memory used by this cache (map buckets + access order vec).
200    fn heap_memory_bytes(&self) -> usize {
201        let entry_size = std::mem::size_of::<K>() + std::mem::size_of::<CacheEntry<V>>() + 1;
202        let map_bytes = self.entries.capacity() * entry_size;
203        let vec_bytes = self.access_order.capacity() * std::mem::size_of::<K>();
204        map_bytes + vec_bytes
205    }
206}
207
208/// Query cache for parsed and optimized plans.
209pub struct QueryCache {
210    /// Cache for parsed (translated) logical plans.
211    parsed_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
212    /// Cache for optimized logical plans.
213    optimized_cache: Mutex<LruCache<CacheKey, LogicalPlan>>,
214    /// Cache hit count for parsed plans.
215    parsed_hits: AtomicU64,
216    /// Cache miss count for parsed plans.
217    parsed_misses: AtomicU64,
218    /// Cache hit count for optimized plans.
219    optimized_hits: AtomicU64,
220    /// Cache miss count for optimized plans.
221    optimized_misses: AtomicU64,
222    /// Number of times the cache was invalidated (cleared due to DDL).
223    invalidations: AtomicU64,
224    /// Whether caching is enabled.
225    enabled: bool,
226}
227
228impl QueryCache {
229    /// Creates a new query cache with the specified capacity.
230    ///
231    /// The capacity is shared between parsed and optimized caches
232    /// (each gets half the capacity).
233    #[must_use]
234    pub fn new(capacity: usize) -> Self {
235        let half_capacity = capacity / 2;
236        Self {
237            parsed_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
238            optimized_cache: Mutex::new(LruCache::new(half_capacity.max(1))),
239            parsed_hits: AtomicU64::new(0),
240            parsed_misses: AtomicU64::new(0),
241            optimized_hits: AtomicU64::new(0),
242            optimized_misses: AtomicU64::new(0),
243            invalidations: AtomicU64::new(0),
244            enabled: true,
245        }
246    }
247
248    /// Creates a disabled cache (for testing or when caching is not desired).
249    #[must_use]
250    pub fn disabled() -> Self {
251        Self {
252            parsed_cache: Mutex::new(LruCache::new(0)),
253            optimized_cache: Mutex::new(LruCache::new(0)),
254            parsed_hits: AtomicU64::new(0),
255            parsed_misses: AtomicU64::new(0),
256            optimized_hits: AtomicU64::new(0),
257            optimized_misses: AtomicU64::new(0),
258            invalidations: AtomicU64::new(0),
259            enabled: false,
260        }
261    }
262
263    /// Returns whether caching is enabled.
264    #[must_use]
265    pub fn is_enabled(&self) -> bool {
266        self.enabled
267    }
268
269    /// Gets a parsed plan from the cache.
270    pub fn get_parsed(&self, key: &CacheKey) -> Option<LogicalPlan> {
271        if !self.enabled {
272            return None;
273        }
274
275        let result = self.parsed_cache.lock().get(key);
276        if result.is_some() {
277            self.parsed_hits.fetch_add(1, Ordering::Relaxed);
278        } else {
279            self.parsed_misses.fetch_add(1, Ordering::Relaxed);
280        }
281        result
282    }
283
284    /// Puts a parsed plan into the cache.
285    pub fn put_parsed(&self, key: CacheKey, plan: LogicalPlan) {
286        if !self.enabled {
287            return;
288        }
289        self.parsed_cache.lock().put(key, plan);
290    }
291
292    /// Gets an optimized plan from the cache.
293    pub fn get_optimized(&self, key: &CacheKey) -> Option<LogicalPlan> {
294        if !self.enabled {
295            return None;
296        }
297
298        let result = self.optimized_cache.lock().get(key);
299        if result.is_some() {
300            self.optimized_hits.fetch_add(1, Ordering::Relaxed);
301        } else {
302            self.optimized_misses.fetch_add(1, Ordering::Relaxed);
303        }
304        result
305    }
306
307    /// Puts an optimized plan into the cache.
308    pub fn put_optimized(&self, key: CacheKey, plan: LogicalPlan) {
309        if !self.enabled {
310            return;
311        }
312        self.optimized_cache.lock().put(key, plan);
313    }
314
315    /// Invalidates a specific query from both caches.
316    pub fn invalidate(&self, key: &CacheKey) {
317        self.parsed_cache.lock().remove(key);
318        self.optimized_cache.lock().remove(key);
319    }
320
321    /// Clears all cached entries and increments the invalidation counter
322    /// (only when the cache was non-empty).
323    pub fn clear(&self) {
324        let had_entries =
325            self.parsed_cache.lock().len() > 0 || self.optimized_cache.lock().len() > 0;
326        self.parsed_cache.lock().clear();
327        self.optimized_cache.lock().clear();
328        if had_entries {
329            self.invalidations.fetch_add(1, Ordering::Relaxed);
330        }
331    }
332
333    /// Returns cache statistics.
334    #[must_use]
335    pub fn stats(&self) -> CacheStats {
336        CacheStats {
337            parsed_size: self.parsed_cache.lock().len(),
338            optimized_size: self.optimized_cache.lock().len(),
339            parsed_hits: self.parsed_hits.load(Ordering::Relaxed),
340            parsed_misses: self.parsed_misses.load(Ordering::Relaxed),
341            optimized_hits: self.optimized_hits.load(Ordering::Relaxed),
342            optimized_misses: self.optimized_misses.load(Ordering::Relaxed),
343            invalidations: self.invalidations.load(Ordering::Relaxed),
344        }
345    }
346
347    /// Estimates heap memory used by both caches.
348    #[must_use]
349    pub fn heap_memory_bytes(&self) -> (usize, usize, usize) {
350        let parsed = self.parsed_cache.lock();
351        let optimized = self.optimized_cache.lock();
352        let parsed_bytes = parsed.heap_memory_bytes();
353        let optimized_bytes = optimized.heap_memory_bytes();
354        let count = parsed.len() + optimized.len();
355        (parsed_bytes, optimized_bytes, count)
356    }
357
358    /// Resets hit/miss counters and invalidation counter.
359    pub fn reset_stats(&self) {
360        self.parsed_hits.store(0, Ordering::Relaxed);
361        self.parsed_misses.store(0, Ordering::Relaxed);
362        self.optimized_hits.store(0, Ordering::Relaxed);
363        self.optimized_misses.store(0, Ordering::Relaxed);
364        self.invalidations.store(0, Ordering::Relaxed);
365    }
366}
367
368impl Default for QueryCache {
369    fn default() -> Self {
370        // Default capacity of 1000 queries
371        Self::new(1000)
372    }
373}
374
375/// Cache statistics.
376#[derive(Debug, Clone)]
377pub struct CacheStats {
378    /// Number of entries in parsed cache.
379    pub parsed_size: usize,
380    /// Number of entries in optimized cache.
381    pub optimized_size: usize,
382    /// Number of parsed cache hits.
383    pub parsed_hits: u64,
384    /// Number of parsed cache misses.
385    pub parsed_misses: u64,
386    /// Number of optimized cache hits.
387    pub optimized_hits: u64,
388    /// Number of optimized cache misses.
389    pub optimized_misses: u64,
390    /// Number of times the cache was invalidated (cleared due to DDL).
391    pub invalidations: u64,
392}
393
394impl CacheStats {
395    /// Returns the hit rate for parsed cache (0.0 to 1.0).
396    #[must_use]
397    pub fn parsed_hit_rate(&self) -> f64 {
398        let total = self.parsed_hits + self.parsed_misses;
399        if total == 0 {
400            0.0
401        } else {
402            self.parsed_hits as f64 / total as f64
403        }
404    }
405
406    /// Returns the hit rate for optimized cache (0.0 to 1.0).
407    #[must_use]
408    pub fn optimized_hit_rate(&self) -> f64 {
409        let total = self.optimized_hits + self.optimized_misses;
410        if total == 0 {
411            0.0
412        } else {
413            self.optimized_hits as f64 / total as f64
414        }
415    }
416
417    /// Returns the total cache size.
418    #[must_use]
419    pub fn total_size(&self) -> usize {
420        self.parsed_size + self.optimized_size
421    }
422
423    /// Returns the total hit rate.
424    #[must_use]
425    pub fn total_hit_rate(&self) -> f64 {
426        let total_hits = self.parsed_hits + self.optimized_hits;
427        let total_misses = self.parsed_misses + self.optimized_misses;
428        let total = total_hits + total_misses;
429        if total == 0 {
430            0.0
431        } else {
432            total_hits as f64 / total as f64
433        }
434    }
435}
436
437/// A caching wrapper for the query processor.
438///
439/// This type wraps a query processor and adds caching capabilities.
440/// Use this for production deployments where query caching is beneficial.
441pub struct CachingQueryProcessor<P> {
442    /// The underlying processor.
443    processor: P,
444    /// The query cache.
445    cache: QueryCache,
446}
447
448impl<P> CachingQueryProcessor<P> {
449    /// Creates a new caching processor.
450    pub fn new(processor: P, cache: QueryCache) -> Self {
451        Self { processor, cache }
452    }
453
454    /// Creates a new caching processor with default cache settings.
455    pub fn with_default_cache(processor: P) -> Self {
456        Self::new(processor, QueryCache::default())
457    }
458
459    /// Returns a reference to the cache.
460    #[must_use]
461    pub fn cache(&self) -> &QueryCache {
462        &self.cache
463    }
464
465    /// Returns a reference to the underlying processor.
466    #[must_use]
467    pub fn processor(&self) -> &P {
468        &self.processor
469    }
470
471    /// Returns cache statistics.
472    #[must_use]
473    pub fn stats(&self) -> CacheStats {
474        self.cache.stats()
475    }
476
477    /// Clears the cache.
478    pub fn clear_cache(&self) {
479        self.cache.clear();
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[cfg(feature = "gql")]
488    fn test_language() -> QueryLanguage {
489        QueryLanguage::Gql
490    }
491
492    #[cfg(not(feature = "gql"))]
493    fn test_language() -> QueryLanguage {
494        // Fallback for tests without gql feature
495        #[cfg(feature = "cypher")]
496        return QueryLanguage::Cypher;
497        #[cfg(feature = "sparql")]
498        return QueryLanguage::Sparql;
499    }
500
501    #[test]
502    fn test_cache_key_normalization() {
503        let key1 = CacheKey::new("MATCH  (n)  RETURN n", test_language());
504        let key2 = CacheKey::new("MATCH (n) RETURN n", test_language());
505
506        // Both should normalize to the same key
507        assert_eq!(key1.query(), key2.query());
508    }
509
510    #[test]
511    fn test_cache_basic_operations() {
512        let cache = QueryCache::new(10);
513        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
514
515        // Create a simple logical plan for testing
516        use crate::query::plan::{LogicalOperator, LogicalPlan};
517        let plan = LogicalPlan::new(LogicalOperator::Empty);
518
519        // Initially empty
520        assert!(cache.get_parsed(&key).is_none());
521
522        // Put and get
523        cache.put_parsed(key.clone(), plan.clone());
524        assert!(cache.get_parsed(&key).is_some());
525
526        // Stats
527        let stats = cache.stats();
528        assert_eq!(stats.parsed_size, 1);
529        assert_eq!(stats.parsed_hits, 1);
530        assert_eq!(stats.parsed_misses, 1);
531    }
532
533    #[test]
534    fn test_cache_lru_eviction() {
535        let cache = QueryCache::new(4); // 2 entries per cache level
536
537        use crate::query::plan::{LogicalOperator, LogicalPlan};
538
539        // Add 3 entries to parsed cache (capacity is 2)
540        for i in 0..3 {
541            let key = CacheKey::new(format!("QUERY {}", i), test_language());
542            cache.put_parsed(key, LogicalPlan::new(LogicalOperator::Empty));
543        }
544
545        // First entry should be evicted
546        let key0 = CacheKey::new("QUERY 0", test_language());
547        assert!(cache.get_parsed(&key0).is_none());
548
549        // Entry 1 and 2 should still be present
550        let key1 = CacheKey::new("QUERY 1", test_language());
551        let key2 = CacheKey::new("QUERY 2", test_language());
552        assert!(cache.get_parsed(&key1).is_some());
553        assert!(cache.get_parsed(&key2).is_some());
554    }
555
556    #[test]
557    fn test_cache_invalidation() {
558        let cache = QueryCache::new(10);
559        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
560
561        use crate::query::plan::{LogicalOperator, LogicalPlan};
562        let plan = LogicalPlan::new(LogicalOperator::Empty);
563
564        cache.put_parsed(key.clone(), plan.clone());
565        cache.put_optimized(key.clone(), plan);
566
567        assert!(cache.get_parsed(&key).is_some());
568        assert!(cache.get_optimized(&key).is_some());
569
570        // Invalidate
571        cache.invalidate(&key);
572
573        // Clear stats from previous gets
574        cache.reset_stats();
575
576        assert!(cache.get_parsed(&key).is_none());
577        assert!(cache.get_optimized(&key).is_none());
578    }
579
580    #[test]
581    fn test_cache_disabled() {
582        let cache = QueryCache::disabled();
583        let key = CacheKey::new("MATCH (n) RETURN n", test_language());
584
585        use crate::query::plan::{LogicalOperator, LogicalPlan};
586        let plan = LogicalPlan::new(LogicalOperator::Empty);
587
588        // Should not store anything
589        cache.put_parsed(key.clone(), plan);
590        assert!(cache.get_parsed(&key).is_none());
591
592        // Stats should be zero
593        let stats = cache.stats();
594        assert_eq!(stats.parsed_size, 0);
595    }
596
597    #[test]
598    fn test_cache_stats() {
599        let cache = QueryCache::new(10);
600
601        use crate::query::plan::{LogicalOperator, LogicalPlan};
602
603        let key1 = CacheKey::new("QUERY 1", test_language());
604        let key2 = CacheKey::new("QUERY 2", test_language());
605        let plan = LogicalPlan::new(LogicalOperator::Empty);
606
607        // Miss
608        cache.get_optimized(&key1);
609
610        // Put and hit
611        cache.put_optimized(key1.clone(), plan);
612        cache.get_optimized(&key1);
613        cache.get_optimized(&key1);
614
615        // Another miss
616        cache.get_optimized(&key2);
617
618        let stats = cache.stats();
619        assert_eq!(stats.optimized_hits, 2);
620        assert_eq!(stats.optimized_misses, 2);
621        assert!((stats.optimized_hit_rate() - 0.5).abs() < 0.01);
622    }
623}