Skip to main content

amaters_core/compute/
plan_cache.rs

1//! Plan cache for the query planner
2//!
3//! Provides an LRU cache with TTL-based expiry for caching physical execution
4//! plans. This avoids re-planning identical queries when the same query is
5//! submitted multiple times within the TTL window.
6
7use crate::types::Query;
8use parking_lot::Mutex;
9use std::collections::{HashMap, VecDeque};
10use std::time::{Duration, Instant};
11
12use super::planner::PhysicalPlan;
13
14// ---------------------------------------------------------------------------
15// Plan cache
16// ---------------------------------------------------------------------------
17
18/// Cache key: a blake3 hash of the normalized query representation
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub struct CacheKey([u8; 32]);
21
22impl CacheKey {
23    /// Create a cache key from a query by normalizing its debug representation
24    /// and hashing it with blake3.
25    pub(crate) fn from_query(query: &Query) -> Self {
26        let raw = format!("{:?}", query);
27        let normalized = Self::normalize(&raw);
28        let hash = blake3::hash(normalized.as_bytes());
29        Self(*hash.as_bytes())
30    }
31
32    /// Normalize a query string: trim whitespace, lowercase the operation type
33    pub(crate) fn normalize(raw: &str) -> String {
34        let trimmed = raw.trim();
35        // Lowercase the first "word" (operation type like Filter, Get, etc.)
36        // The debug format is e.g. "Filter { collection: ..., predicate: ... }"
37        if let Some(idx) = trimmed.find(|c: char| !c.is_alphanumeric() && c != '_') {
38            let (op, rest) = trimmed.split_at(idx);
39            format!("{}{}", op.to_lowercase(), rest)
40        } else {
41            trimmed.to_lowercase()
42        }
43    }
44
45    /// Create a cache key from a raw string (for prefix-based operations)
46    #[allow(dead_code)]
47    fn from_str(s: &str) -> Self {
48        let hash = blake3::hash(s.as_bytes());
49        Self(*hash.as_bytes())
50    }
51}
52
53/// A cached physical plan with metadata
54#[derive(Debug, Clone)]
55pub struct CachedPlan {
56    /// The cached physical plan
57    pub plan: PhysicalPlan,
58    /// When the plan was cached
59    pub cached_at: Instant,
60    /// Number of cache hits for this entry
61    pub hit_count: u64,
62    /// Normalized query string (for prefix matching during invalidation)
63    pub normalized_query: String,
64}
65
66/// Configuration for the plan cache
67#[derive(Debug, Clone)]
68pub struct PlanCacheConfig {
69    /// Maximum number of entries in the cache
70    pub max_entries: usize,
71    /// Time-to-live for cached plans
72    pub ttl: Duration,
73}
74
75impl Default for PlanCacheConfig {
76    fn default() -> Self {
77        Self {
78            max_entries: 1000,
79            ttl: Duration::from_secs(300), // 5 minutes
80        }
81    }
82}
83
84/// Cache statistics
85#[derive(Debug, Clone, Default)]
86pub struct CacheStats {
87    /// Number of cache hits
88    pub hits: u64,
89    /// Number of cache misses
90    pub misses: u64,
91    /// Number of evictions (LRU or TTL)
92    pub evictions: u64,
93    /// Current number of entries in the cache
94    pub size: usize,
95}
96
97/// LRU plan cache with TTL-based expiry
98///
99/// Implements a manual LRU cache using a `HashMap` for O(1) lookups and a
100/// `VecDeque` to track access order for eviction. Thread-safe via
101/// `parking_lot::Mutex`.
102pub struct PlanCache {
103    /// Cache storage: key -> cached plan
104    entries: Mutex<HashMap<CacheKey, CachedPlan>>,
105    /// LRU order: front = least recently used, back = most recently used
106    lru_order: Mutex<VecDeque<CacheKey>>,
107    /// Configuration
108    config: PlanCacheConfig,
109    /// Running statistics
110    stats: Mutex<CacheStats>,
111}
112
113impl PlanCache {
114    /// Create a new plan cache with the given configuration
115    pub fn new(config: PlanCacheConfig) -> Self {
116        Self {
117            entries: Mutex::new(HashMap::new()),
118            lru_order: Mutex::new(VecDeque::new()),
119            config,
120            stats: Mutex::new(CacheStats::default()),
121        }
122    }
123
124    /// Look up a cached plan by its cache key.
125    ///
126    /// Returns `Some(PhysicalPlan)` if a fresh (non-expired) entry exists,
127    /// otherwise returns `None`. Updates LRU order and hit/miss statistics.
128    pub fn get(&self, key: &CacheKey) -> Option<PhysicalPlan> {
129        let mut entries = self.entries.lock();
130        let mut stats = self.stats.lock();
131
132        if let Some(entry) = entries.get_mut(key) {
133            // Check TTL
134            if entry.cached_at.elapsed() > self.config.ttl {
135                // Expired: remove and count as miss + eviction
136                entries.remove(key);
137                let mut lru = self.lru_order.lock();
138                lru.retain(|k| k != key);
139                stats.misses += 1;
140                stats.evictions += 1;
141                stats.size = entries.len();
142                return None;
143            }
144
145            // Cache hit
146            entry.hit_count += 1;
147            stats.hits += 1;
148
149            // Move to back of LRU (most recently used)
150            let mut lru = self.lru_order.lock();
151            lru.retain(|k| k != key);
152            lru.push_back(*key);
153
154            Some(entry.plan.clone())
155        } else {
156            stats.misses += 1;
157            None
158        }
159    }
160
161    /// Insert a plan into the cache, evicting the LRU entry if at capacity.
162    pub fn insert(&self, key: CacheKey, plan: PhysicalPlan, normalized_query: String) {
163        let mut entries = self.entries.lock();
164        let mut lru = self.lru_order.lock();
165        let mut stats = self.stats.lock();
166
167        // If the key already exists, update in place
168        if entries.contains_key(&key) {
169            lru.retain(|k| k != &key);
170        }
171
172        // Evict LRU entries if at capacity
173        while entries.len() >= self.config.max_entries {
174            if let Some(evicted_key) = lru.pop_front() {
175                entries.remove(&evicted_key);
176                stats.evictions += 1;
177            } else {
178                break;
179            }
180        }
181
182        entries.insert(
183            key,
184            CachedPlan {
185                plan,
186                cached_at: Instant::now(),
187                hit_count: 0,
188                normalized_query,
189            },
190        );
191        lru.push_back(key);
192        stats.size = entries.len();
193    }
194
195    /// Invalidate all cached plans
196    pub fn invalidate_all(&self) {
197        let mut entries = self.entries.lock();
198        let mut lru = self.lru_order.lock();
199        let mut stats = self.stats.lock();
200
201        let evicted = entries.len() as u64;
202        entries.clear();
203        lru.clear();
204        stats.evictions += evicted;
205        stats.size = 0;
206    }
207
208    /// Invalidate all cached plans whose normalized query starts with the
209    /// given prefix (e.g., a collection name).
210    pub fn invalidate_prefix(&self, prefix: &str) {
211        let normalized_prefix = prefix.trim().to_lowercase();
212        let mut entries = self.entries.lock();
213        let mut lru = self.lru_order.lock();
214        let mut stats = self.stats.lock();
215
216        let keys_to_remove: Vec<CacheKey> = entries
217            .iter()
218            .filter(|(_, v)| v.normalized_query.contains(&normalized_prefix))
219            .map(|(k, _)| *k)
220            .collect();
221
222        for key in &keys_to_remove {
223            entries.remove(key);
224            stats.evictions += 1;
225        }
226
227        lru.retain(|k| !keys_to_remove.contains(k));
228        stats.size = entries.len();
229    }
230
231    /// Return a snapshot of the current cache statistics
232    pub fn cache_stats(&self) -> CacheStats {
233        let stats = self.stats.lock();
234        stats.clone()
235    }
236}
237
238impl std::fmt::Debug for PlanCache {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        let stats = self.cache_stats();
241        f.debug_struct("PlanCache")
242            .field("max_entries", &self.config.max_entries)
243            .field("ttl", &self.config.ttl)
244            .field("stats", &stats)
245            .finish()
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::super::planner::QueryPlanner;
252    use super::*;
253    use crate::error::Result;
254    use crate::types::Key;
255
256    // -- Plan cache tests ---------------------------------------------------
257
258    #[test]
259    fn test_cache_hit_for_same_query() -> Result<()> {
260        let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
261
262        let query = Query::Get {
263            collection: "users".to_string(),
264            key: Key::from_str("user:1"),
265        };
266
267        // First call: cache miss, plan gets cached
268        let plan1 = planner.plan(&query)?;
269        let stats1 = planner.cache_stats();
270        assert_eq!(stats1.misses, 1, "first call should be a miss");
271        assert_eq!(stats1.hits, 0);
272        assert_eq!(stats1.size, 1, "one entry should be cached");
273
274        // Second call: cache hit
275        let plan2 = planner.plan(&query)?;
276        let stats2 = planner.cache_stats();
277        assert_eq!(stats2.hits, 1, "second call should be a hit");
278        assert_eq!(stats2.misses, 1, "miss count should not change");
279
280        // Plans should be structurally equivalent
281        assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));
282
283        Ok(())
284    }
285
286    #[test]
287    fn test_cache_miss_for_different_queries() -> Result<()> {
288        let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
289
290        let query_a = Query::Get {
291            collection: "users".to_string(),
292            key: Key::from_str("user:1"),
293        };
294        let query_b = Query::Get {
295            collection: "users".to_string(),
296            key: Key::from_str("user:2"),
297        };
298
299        let _plan_a = planner.plan(&query_a)?;
300        let _plan_b = planner.plan(&query_b)?;
301
302        let stats = planner.cache_stats();
303        assert_eq!(stats.misses, 2, "both queries should miss");
304        assert_eq!(stats.hits, 0);
305        assert_eq!(stats.size, 2);
306
307        Ok(())
308    }
309
310    #[test]
311    fn test_cache_ttl_expiry() -> Result<()> {
312        let config = PlanCacheConfig {
313            max_entries: 100,
314            ttl: Duration::from_millis(50), // very short TTL
315        };
316        let planner = QueryPlanner::new().with_cache(config);
317
318        let query = Query::Get {
319            collection: "items".to_string(),
320            key: Key::from_str("item:1"),
321        };
322
323        // Cache the plan
324        let _plan1 = planner.plan(&query)?;
325        let stats1 = planner.cache_stats();
326        assert_eq!(stats1.misses, 1);
327
328        // Wait for TTL to expire
329        std::thread::sleep(Duration::from_millis(100));
330
331        // Should miss again because TTL expired
332        let _plan2 = planner.plan(&query)?;
333        let stats2 = planner.cache_stats();
334        assert_eq!(stats2.misses, 2, "expired entry should cause a miss");
335        assert_eq!(
336            stats2.evictions, 1,
337            "expired entry should count as eviction"
338        );
339
340        Ok(())
341    }
342
343    #[test]
344    fn test_cache_lru_eviction() -> Result<()> {
345        let config = PlanCacheConfig {
346            max_entries: 3,
347            ttl: Duration::from_secs(300),
348        };
349        let planner = QueryPlanner::new().with_cache(config);
350
351        // Insert 3 entries (fills cache)
352        for i in 0..3 {
353            let query = Query::Get {
354                collection: "data".to_string(),
355                key: Key::from_str(&format!("key:{}", i)),
356            };
357            let _plan = planner.plan(&query)?;
358        }
359
360        let stats = planner.cache_stats();
361        assert_eq!(stats.size, 3);
362        assert_eq!(stats.evictions, 0);
363
364        // Insert a 4th entry, should evict the LRU (key:0)
365        let query_new = Query::Get {
366            collection: "data".to_string(),
367            key: Key::from_str("key:3"),
368        };
369        let _plan = planner.plan(&query_new)?;
370
371        let stats = planner.cache_stats();
372        assert_eq!(stats.size, 3, "size should remain at max_entries");
373        assert_eq!(stats.evictions, 1, "one entry should have been evicted");
374
375        // Access key:1 (was second oldest, but now should be in cache)
376        let query_1 = Query::Get {
377            collection: "data".to_string(),
378            key: Key::from_str("key:1"),
379        };
380        let _plan = planner.plan(&query_1)?;
381        let stats = planner.cache_stats();
382        assert_eq!(stats.hits, 1, "key:1 should still be in cache");
383
384        // The evicted key:0 should miss
385        let query_0 = Query::Get {
386            collection: "data".to_string(),
387            key: Key::from_str("key:0"),
388        };
389        let _plan = planner.plan(&query_0)?;
390        let stats = planner.cache_stats();
391        // key:0 was evicted, so this is a miss; also evicts key:2 (now LRU)
392        assert!(
393            stats.misses >= 5,
394            "key:0 should have been evicted and cause a miss"
395        );
396
397        Ok(())
398    }
399
400    #[test]
401    fn test_cache_stats_accuracy() -> Result<()> {
402        let config = PlanCacheConfig {
403            max_entries: 10,
404            ttl: Duration::from_secs(300),
405        };
406        let planner = QueryPlanner::new().with_cache(config);
407
408        // Start with zero stats
409        let stats = planner.cache_stats();
410        assert_eq!(stats.hits, 0);
411        assert_eq!(stats.misses, 0);
412        assert_eq!(stats.evictions, 0);
413        assert_eq!(stats.size, 0);
414
415        let query = Query::Get {
416            collection: "stats_test".to_string(),
417            key: Key::from_str("k1"),
418        };
419
420        // 1 miss
421        let _p = planner.plan(&query)?;
422        let stats = planner.cache_stats();
423        assert_eq!(stats.misses, 1);
424        assert_eq!(stats.size, 1);
425
426        // 5 hits
427        for _ in 0..5 {
428            let _p = planner.plan(&query)?;
429        }
430        let stats = planner.cache_stats();
431        assert_eq!(stats.hits, 5);
432        assert_eq!(stats.misses, 1);
433        assert_eq!(stats.size, 1);
434
435        Ok(())
436    }
437
438    #[test]
439    fn test_cache_invalidate_all() -> Result<()> {
440        let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
441
442        // Cache some entries
443        for i in 0..5 {
444            let query = Query::Get {
445                collection: "inv_all".to_string(),
446                key: Key::from_str(&format!("k:{}", i)),
447            };
448            let _p = planner.plan(&query)?;
449        }
450
451        let stats = planner.cache_stats();
452        assert_eq!(stats.size, 5);
453
454        // Invalidate all
455        planner.invalidate_all();
456
457        let stats = planner.cache_stats();
458        assert_eq!(stats.size, 0, "all entries should be removed");
459        assert_eq!(stats.evictions, 5, "all removed entries count as evictions");
460
461        // Re-plan should miss
462        let query = Query::Get {
463            collection: "inv_all".to_string(),
464            key: Key::from_str("k:0"),
465        };
466        let _p = planner.plan(&query)?;
467        let stats = planner.cache_stats();
468        assert_eq!(stats.misses, 6, "re-plan after invalidation should miss");
469
470        Ok(())
471    }
472
473    #[test]
474    fn test_cache_invalidate_prefix() -> Result<()> {
475        let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
476
477        // Cache entries for two different collections
478        for i in 0..3 {
479            let query = Query::Get {
480                collection: "orders".to_string(),
481                key: Key::from_str(&format!("o:{}", i)),
482            };
483            let _p = planner.plan(&query)?;
484        }
485        for i in 0..2 {
486            let query = Query::Get {
487                collection: "products".to_string(),
488                key: Key::from_str(&format!("p:{}", i)),
489            };
490            let _p = planner.plan(&query)?;
491        }
492
493        let stats = planner.cache_stats();
494        assert_eq!(stats.size, 5);
495
496        // Invalidate only "orders"
497        planner.invalidate_prefix("orders");
498
499        let stats = planner.cache_stats();
500        assert_eq!(stats.size, 2, "only products should remain");
501        assert_eq!(stats.evictions, 3, "3 orders entries evicted");
502
503        // "products" entries should still hit
504        let query = Query::Get {
505            collection: "products".to_string(),
506            key: Key::from_str("p:0"),
507        };
508        let _p = planner.plan(&query)?;
509        let stats = planner.cache_stats();
510        assert_eq!(stats.hits, 1, "products entry should still be cached");
511
512        Ok(())
513    }
514
515    #[test]
516    fn test_cache_concurrent_access() -> Result<()> {
517        use std::sync::Arc;
518
519        let config = PlanCacheConfig {
520            max_entries: 100,
521            ttl: Duration::from_secs(300),
522        };
523        let planner = Arc::new(QueryPlanner::new().with_cache(config));
524
525        let mut handles = Vec::new();
526
527        // Spawn 8 threads, each planning the same 10 queries
528        for thread_id in 0..8 {
529            let planner_clone = Arc::clone(&planner);
530            let handle = std::thread::spawn(move || -> Result<()> {
531                for i in 0..10 {
532                    let query = Query::Get {
533                        collection: "concurrent".to_string(),
534                        key: Key::from_str(&format!("k:{}:{}", thread_id % 2, i)),
535                    };
536                    let _plan = planner_clone.plan(&query)?;
537                }
538                Ok(())
539            });
540            handles.push(handle);
541        }
542
543        for handle in handles {
544            handle.join().expect("thread should not panic")?;
545        }
546
547        let stats = planner.cache_stats();
548
549        // With 8 threads and 10 queries each (20 distinct keys: thread_id%2 x 10),
550        // we should have at most 20 unique entries
551        assert!(stats.size <= 20, "should have at most 20 entries");
552        // Total operations = 80, misses <= 80, some should be hits
553        let total = stats.hits + stats.misses;
554        assert_eq!(total, 80, "total ops should be 80");
555        // With 4 threads sharing the same keys as another 4, we expect some hits
556        assert!(
557            stats.hits > 0,
558            "should have some cache hits from concurrent access"
559        );
560
561        Ok(())
562    }
563
564    #[test]
565    fn test_cache_key_normalization() {
566        // Same query should produce the same key regardless of whitespace variations
567        let key_a = CacheKey::normalize("  Filter { collection: \"x\" }  ");
568        let key_b = CacheKey::normalize("Filter { collection: \"x\" }");
569        assert_eq!(key_a, key_b);
570
571        // Operation type should be lowercased
572        let normalized = CacheKey::normalize("FILTER { collection: \"x\" }");
573        assert!(normalized.starts_with("filter"));
574    }
575
576    #[test]
577    fn test_planner_without_cache() -> Result<()> {
578        use super::super::planner::PhysicalPlan;
579
580        // Verify that a planner without cache works normally
581        let planner = QueryPlanner::new();
582        assert!(planner.plan_cache().is_none());
583
584        let query = Query::Get {
585            collection: "no_cache".to_string(),
586            key: Key::from_str("k1"),
587        };
588
589        let plan = planner.plan(&query)?;
590        assert!(matches!(plan, PhysicalPlan::PointGet { .. }));
591
592        // cache_stats should return defaults
593        let stats = planner.cache_stats();
594        assert_eq!(stats.hits, 0);
595        assert_eq!(stats.misses, 0);
596
597        // invalidate should be a no-op
598        planner.invalidate_all();
599        planner.invalidate_prefix("anything");
600
601        Ok(())
602    }
603
604    #[test]
605    fn test_cache_with_filter_queries() -> Result<()> {
606        use super::super::planner::PhysicalPlan;
607        use crate::types::{CipherBlob, Predicate, col};
608
609        let planner = QueryPlanner::new().with_cache(PlanCacheConfig::default());
610
611        let query = Query::Filter {
612            collection: "users".to_string(),
613            predicate: Predicate::Gt(col("age"), CipherBlob::new(vec![18])),
614        };
615
616        // First plan: miss
617        let plan1 = planner.plan(&query)?;
618        let stats = planner.cache_stats();
619        assert_eq!(stats.misses, 1);
620
621        // Second plan: hit
622        let plan2 = planner.plan(&query)?;
623        let stats = planner.cache_stats();
624        assert_eq!(stats.hits, 1);
625
626        // Both plans should be FheFilter
627        assert!(matches!(plan1, PhysicalPlan::FheFilter { .. }));
628        assert!(matches!(plan2, PhysicalPlan::FheFilter { .. }));
629
630        Ok(())
631    }
632
633    #[test]
634    fn test_plan_cache_debug() {
635        let cache = PlanCache::new(PlanCacheConfig::default());
636        let debug_str = format!("{:?}", cache);
637        assert!(debug_str.contains("PlanCache"));
638        assert!(debug_str.contains("max_entries"));
639    }
640
641    #[test]
642    fn test_cache_lru_order_updated_on_access() -> Result<()> {
643        let config = PlanCacheConfig {
644            max_entries: 3,
645            ttl: Duration::from_secs(300),
646        };
647        let planner = QueryPlanner::new().with_cache(config);
648
649        // Insert 3 entries: key:0, key:1, key:2
650        // LRU order: key:0 (oldest) -> key:1 -> key:2 (newest)
651        for i in 0..3 {
652            let query = Query::Get {
653                collection: "lru".to_string(),
654                key: Key::from_str(&format!("key:{}", i)),
655            };
656            let _p = planner.plan(&query)?;
657        }
658
659        // Access key:0 to move it to the back
660        let query_0 = Query::Get {
661            collection: "lru".to_string(),
662            key: Key::from_str("key:0"),
663        };
664        let _p = planner.plan(&query_0)?;
665
666        // Now LRU order: key:1 (oldest) -> key:2 -> key:0 (newest)
667        // Insert key:3 should evict key:1 (not key:0)
668        let query_3 = Query::Get {
669            collection: "lru".to_string(),
670            key: Key::from_str("key:3"),
671        };
672        let _p = planner.plan(&query_3)?;
673
674        // key:0 should still be cached (was accessed recently)
675        let _p = planner.plan(&query_0)?;
676        let stats = planner.cache_stats();
677        // key:0 was accessed twice as hit (once to move, once now)
678        assert!(
679            stats.hits >= 2,
680            "key:0 should still be in cache after LRU reorder"
681        );
682
683        // key:1 should have been evicted
684        let query_1 = Query::Get {
685            collection: "lru".to_string(),
686            key: Key::from_str("key:1"),
687        };
688        let _p = planner.plan(&query_1)?;
689        let stats = planner.cache_stats();
690        // This causes another eviction (key:2 is now LRU) and a miss
691        assert!(
692            stats.evictions >= 2,
693            "key:1 eviction + new eviction for reinsertion"
694        );
695
696        Ok(())
697    }
698}