Skip to main content

reddb_server/storage/query/planner/
cache.rs

1//! Query Plan Cache
2//!
3//! LRU cache for compiled query plans with TTL validation.
4//!
5//! # Features
6//!
7//! - LRU eviction policy
8//! - TTL-based invalidation
9//! - Thread-safe access
10//! - Statistics tracking
11
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15use super::{CacheStats, QueryPlan};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CacheEntryState {
19    Inactive,
20    Active,
21}
22
23/// A cached query plan with metadata
24#[derive(Debug, Clone)]
25pub struct CachedPlan {
26    /// The compiled query plan
27    pub plan: QueryPlan,
28    /// When this plan was cached
29    pub cached_at: Instant,
30    /// Number of times this plan was accessed
31    pub access_count: u64,
32    /// Last access time
33    pub last_accessed: Instant,
34    /// Query shape key used for parameter-insensitive cache grouping.
35    pub shape_key: Option<String>,
36    /// Last exact query string stored in this slot.
37    pub exact_query: Option<std::sync::Arc<str>>,
38    /// Runtime activation state inspired by Mongo's active/inactive plan cache.
39    pub state: CacheEntryState,
40    /// Moving expectation for storage reads (`rows_scanned`) on this shape.
41    pub expected_rows_scanned: Option<u64>,
42    /// Last observed runtime reads for the shape.
43    pub last_observed_rows_scanned: Option<u64>,
44    /// Number of literal binds expected by the cached shape skeleton.
45    pub parameter_count: usize,
46    /// When true, the next cache lookup forces a fresh replan.
47    pub replan_pending: bool,
48}
49
50impl CachedPlan {
51    /// Create a new cached plan
52    pub fn new(plan: QueryPlan) -> Self {
53        let now = Instant::now();
54        Self {
55            plan,
56            cached_at: now,
57            access_count: 0,
58            last_accessed: now,
59            shape_key: None,
60            exact_query: None,
61            state: CacheEntryState::Inactive,
62            expected_rows_scanned: None,
63            last_observed_rows_scanned: None,
64            parameter_count: 0,
65            replan_pending: false,
66        }
67    }
68
69    pub fn with_shape_key(mut self, shape_key: impl Into<String>) -> Self {
70        self.shape_key = Some(shape_key.into());
71        self
72    }
73
74    pub fn with_exact_query(mut self, query: impl Into<String>) -> Self {
75        let s: String = query.into();
76        self.exact_query = Some(std::sync::Arc::<str>::from(s));
77        self
78    }
79
80    pub fn with_parameter_count(mut self, parameter_count: usize) -> Self {
81        self.parameter_count = parameter_count;
82        self
83    }
84
85    /// Check if the plan has expired
86    pub fn is_expired(&self, ttl: Duration) -> bool {
87        self.cached_at.elapsed() > ttl
88    }
89
90    /// Record an access
91    pub fn touch(&mut self) {
92        self.access_count += 1;
93        self.last_accessed = Instant::now();
94    }
95
96    pub fn matches_exact_query(&self, query: &str) -> bool {
97        self.exact_query.as_deref() == Some(query)
98    }
99
100    pub fn needs_replan(&self) -> bool {
101        self.replan_pending
102    }
103
104    pub fn record_observation(&mut self, rows_scanned: u64) {
105        self.last_observed_rows_scanned = Some(rows_scanned);
106        match (self.state, self.expected_rows_scanned) {
107            (_, None) => {
108                self.expected_rows_scanned = Some(rows_scanned.max(1));
109                self.replan_pending = false;
110            }
111            (CacheEntryState::Inactive, Some(expected)) => {
112                if rows_scanned <= expected {
113                    self.state = CacheEntryState::Active;
114                    self.expected_rows_scanned = Some(rows_scanned.max(1));
115                    self.replan_pending = false;
116                } else {
117                    self.expected_rows_scanned = Some(rows_scanned.min(expected.saturating_mul(2)));
118                }
119            }
120            (CacheEntryState::Active, Some(expected)) => {
121                if rows_scanned > expected.saturating_mul(10).max(10) {
122                    self.state = CacheEntryState::Inactive;
123                    self.expected_rows_scanned = Some(rows_scanned.max(1));
124                    self.replan_pending = true;
125                } else if rows_scanned < expected {
126                    self.expected_rows_scanned = Some(rows_scanned.max(1));
127                    self.replan_pending = false;
128                }
129            }
130        }
131    }
132}
133
134/// LRU cache for query plans
135pub struct PlanCache {
136    /// Cached plans by key
137    entries: HashMap<String, CachedPlan>,
138    /// LRU tracking - key ordering
139    lru_order: Vec<String>,
140    /// Maximum cache size
141    capacity: usize,
142    /// Time-to-live for entries
143    ttl: Duration,
144    /// Cache statistics
145    hits: u64,
146    misses: u64,
147}
148
149impl PlanCache {
150    /// Create a new plan cache with the given capacity
151    pub fn new(capacity: usize) -> Self {
152        Self {
153            entries: HashMap::with_capacity(capacity),
154            lru_order: Vec::with_capacity(capacity),
155            capacity,
156            ttl: Duration::from_secs(3600), // 1 hour default TTL
157            hits: 0,
158            misses: 0,
159        }
160    }
161
162    /// Set the TTL for cache entries
163    pub fn with_ttl(mut self, ttl: Duration) -> Self {
164        self.ttl = ttl;
165        self
166    }
167
168    /// Read-only cache probe — no LRU promotion, no mutation.
169    ///
170    /// Use this with a `read()` lock in the hot query path so concurrent
171    /// readers don't serialize on a write lock. Falls back to `None` when
172    /// the entry is expired or has a pending replan; the caller must then
173    /// re-acquire a `write()` lock and call `get()` to handle eviction and
174    /// miss insertion.
175    pub fn peek(&self, key: &str) -> Option<&CachedPlan> {
176        let entry = self.entries.get(key)?;
177        if entry.needs_replan() || entry.is_expired(self.ttl) {
178            return None;
179        }
180        Some(entry)
181    }
182
183    /// Get a cached plan by key
184    pub fn get(&mut self, key: &str) -> Option<&CachedPlan> {
185        if self
186            .entries
187            .get(key)
188            .is_some_and(|entry| entry.needs_replan())
189        {
190            self.remove(key);
191            self.misses += 1;
192            return None;
193        }
194
195        // Check if entry exists and is not expired
196        if let Some(entry) = self.entries.get_mut(key) {
197            if entry.is_expired(self.ttl) {
198                // Remove expired entry
199                self.remove(key);
200                self.misses += 1;
201                return None;
202            }
203
204            entry.touch();
205            self.promote(key);
206            self.hits += 1;
207            return self.entries.get(key);
208        }
209
210        self.misses += 1;
211        None
212    }
213
214    /// Insert a plan into the cache
215    pub fn insert(&mut self, key: String, plan: CachedPlan) {
216        // Remove existing entry if present
217        if self.entries.contains_key(&key) {
218            self.remove(&key);
219        }
220
221        // Evict if at capacity
222        while self.entries.len() >= self.capacity {
223            self.evict_lru();
224        }
225
226        // Insert new entry
227        self.entries.insert(key.clone(), plan);
228        self.lru_order.push(key);
229    }
230
231    /// Remove an entry from the cache
232    pub fn remove(&mut self, key: &str) -> Option<CachedPlan> {
233        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
234            self.lru_order.remove(pos);
235        }
236        self.entries.remove(key)
237    }
238
239    /// Invalidate entries matching a predicate
240    pub fn invalidate<F>(&mut self, predicate: F)
241    where
242        F: Fn(&str) -> bool,
243    {
244        let keys_to_remove: Vec<String> = self
245            .entries
246            .keys()
247            .filter(|k| predicate(k))
248            .cloned()
249            .collect();
250
251        for key in keys_to_remove {
252            self.remove(&key);
253        }
254    }
255
256    /// Clear all entries
257    pub fn clear(&mut self) {
258        self.entries.clear();
259        self.lru_order.clear();
260    }
261
262    /// Get cache statistics
263    pub fn stats(&self) -> CacheStats {
264        CacheStats {
265            hits: self.hits,
266            misses: self.misses,
267            size: self.entries.len(),
268            capacity: self.capacity,
269        }
270    }
271
272    /// Promote a key to most recently used
273    fn promote(&mut self, key: &str) {
274        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
275            let key = self.lru_order.remove(pos);
276            self.lru_order.push(key);
277        }
278    }
279
280    /// Evict the least recently used entry
281    fn evict_lru(&mut self) {
282        if let Some(key) = self.lru_order.first().cloned() {
283            self.remove(&key);
284        }
285    }
286
287    /// Prune expired entries
288    pub fn prune_expired(&mut self) {
289        let expired: Vec<String> = self
290            .entries
291            .iter()
292            .filter(|(_, v)| v.is_expired(self.ttl))
293            .map(|(k, _)| k.clone())
294            .collect();
295
296        for key in expired {
297            self.remove(&key);
298        }
299    }
300
301    pub fn record_observation(&mut self, key: &str, rows_scanned: u64) {
302        if let Some(entry) = self.entries.get_mut(key) {
303            entry.record_observation(rows_scanned);
304        }
305    }
306}
307
308impl Default for PlanCache {
309    fn default() -> Self {
310        Self::new(1000)
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::storage::query::ast::{Projection, QueryExpr, TableQuery};
318    use crate::storage::query::planner::cost::PlanCost;
319
320    fn make_test_plan() -> QueryPlan {
321        QueryPlan::new(
322            QueryExpr::Table(TableQuery {
323                table: "test".to_string(),
324                source: None,
325                alias: None,
326                select_items: Vec::new(),
327                columns: vec![Projection::All],
328                where_expr: None,
329                filter: None,
330                group_by_exprs: Vec::new(),
331                group_by: Vec::new(),
332                having_expr: None,
333                having: None,
334                order_by: vec![],
335                limit: None,
336                limit_param: None,
337                offset: None,
338                offset_param: None,
339                expand: None,
340                as_of: None,
341            }),
342            QueryExpr::Table(TableQuery {
343                table: "test".to_string(),
344                source: None,
345                alias: None,
346                select_items: Vec::new(),
347                columns: vec![Projection::All],
348                where_expr: None,
349                filter: None,
350                group_by_exprs: Vec::new(),
351                group_by: Vec::new(),
352                having_expr: None,
353                having: None,
354                order_by: vec![],
355                limit: None,
356                limit_param: None,
357                offset: None,
358                offset_param: None,
359                expand: None,
360                as_of: None,
361            }),
362            PlanCost::default(),
363        )
364    }
365
366    #[test]
367    fn test_cache_insert_and_get() {
368        let mut cache = PlanCache::new(10);
369        let plan = CachedPlan::new(make_test_plan());
370
371        cache.insert("query1".to_string(), plan);
372        assert!(cache.get("query1").is_some());
373        assert!(cache.get("query2").is_none());
374    }
375
376    #[test]
377    fn test_cache_lru_eviction() {
378        let mut cache = PlanCache::new(2);
379
380        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
381        cache.insert("q2".to_string(), CachedPlan::new(make_test_plan()));
382
383        // Access q1 to make it most recently used
384        let _ = cache.get("q1");
385
386        // Insert q3 - should evict q2 (LRU)
387        cache.insert("q3".to_string(), CachedPlan::new(make_test_plan()));
388
389        assert!(cache.get("q1").is_some());
390        assert!(cache.get("q2").is_none()); // Evicted
391        assert!(cache.get("q3").is_some());
392    }
393
394    #[test]
395    fn test_cache_stats() {
396        let mut cache = PlanCache::new(10);
397        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
398
399        let _ = cache.get("q1"); // Hit
400        let _ = cache.get("q2"); // Miss
401        let _ = cache.get("q1"); // Hit
402
403        let stats = cache.stats();
404        assert_eq!(stats.hits, 2);
405        assert_eq!(stats.misses, 1);
406    }
407
408    #[test]
409    fn test_cache_invalidation() {
410        let mut cache = PlanCache::new(10);
411        cache.insert(
412            "hosts_query1".to_string(),
413            CachedPlan::new(make_test_plan()),
414        );
415        cache.insert(
416            "hosts_query2".to_string(),
417            CachedPlan::new(make_test_plan()),
418        );
419        cache.insert("users_query".to_string(), CachedPlan::new(make_test_plan()));
420
421        // Invalidate all hosts queries
422        cache.invalidate(|k| k.starts_with("hosts_"));
423
424        assert!(cache.get("hosts_query1").is_none());
425        assert!(cache.get("hosts_query2").is_none());
426        assert!(cache.get("users_query").is_some());
427    }
428
429    #[test]
430    fn active_entry_forces_replan_after_large_regression() {
431        let mut cache = PlanCache::new(10);
432        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
433
434        cache.record_observation("q1", 10);
435        cache.record_observation("q1", 10);
436        cache.record_observation("q1", 500);
437
438        assert!(cache.get("q1").is_none());
439    }
440}