Skip to main content

reddb_server/storage/query/planner/
cache.rs

1//! Query Plan Cache
2//!
3//! LRU cache for compiled query plans with TTL validation.
4//!
5//! # Features
6//!
7//! - LRU eviction policy
8//! - TTL-based invalidation
9//! - Thread-safe access
10//! - Statistics tracking
11
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15use super::{CacheStats, QueryPlan};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CacheEntryState {
19    Inactive,
20    Active,
21}
22
23/// A cached query plan with metadata
24#[derive(Debug, Clone)]
25pub struct CachedPlan {
26    /// The compiled query plan
27    pub plan: QueryPlan,
28    /// When this plan was cached
29    pub cached_at: Instant,
30    /// Number of times this plan was accessed
31    pub access_count: u64,
32    /// Last access time
33    pub last_accessed: Instant,
34    /// Query shape key used for parameter-insensitive cache grouping.
35    pub shape_key: Option<String>,
36    /// Last exact query string stored in this slot.
37    pub exact_query: Option<std::sync::Arc<str>>,
38    /// Runtime activation state inspired by Mongo's active/inactive plan cache.
39    pub state: CacheEntryState,
40    /// Moving expectation for storage reads (`rows_scanned`) on this shape.
41    pub expected_rows_scanned: Option<u64>,
42    /// Last observed runtime reads for the shape.
43    pub last_observed_rows_scanned: Option<u64>,
44    /// Number of literal binds expected by the cached shape skeleton.
45    pub parameter_count: usize,
46    /// When true, the next cache lookup forces a fresh replan.
47    pub replan_pending: bool,
48}
49
50impl CachedPlan {
51    /// Create a new cached plan
52    pub fn new(plan: QueryPlan) -> Self {
53        let now = Instant::now();
54        Self {
55            plan,
56            cached_at: now,
57            access_count: 0,
58            last_accessed: now,
59            shape_key: None,
60            exact_query: None,
61            state: CacheEntryState::Inactive,
62            expected_rows_scanned: None,
63            last_observed_rows_scanned: None,
64            parameter_count: 0,
65            replan_pending: false,
66        }
67    }
68
69    pub fn with_shape_key(mut self, shape_key: impl Into<String>) -> Self {
70        self.shape_key = Some(shape_key.into());
71        self
72    }
73
74    pub fn with_exact_query(mut self, query: impl Into<String>) -> Self {
75        let s: String = query.into();
76        self.exact_query = Some(std::sync::Arc::<str>::from(s));
77        self
78    }
79
80    pub fn with_parameter_count(mut self, parameter_count: usize) -> Self {
81        self.parameter_count = parameter_count;
82        self
83    }
84
85    /// Check if the plan has expired
86    pub fn is_expired(&self, ttl: Duration) -> bool {
87        self.cached_at.elapsed() > ttl
88    }
89
90    /// Record an access
91    pub fn touch(&mut self) {
92        self.access_count += 1;
93        self.last_accessed = Instant::now();
94    }
95
96    pub fn matches_exact_query(&self, query: &str) -> bool {
97        self.exact_query.as_deref() == Some(query)
98    }
99
100    pub fn needs_replan(&self) -> bool {
101        self.replan_pending
102    }
103
104    pub fn record_observation(&mut self, rows_scanned: u64) {
105        self.last_observed_rows_scanned = Some(rows_scanned);
106        match (self.state, self.expected_rows_scanned) {
107            (_, None) => {
108                self.expected_rows_scanned = Some(rows_scanned.max(1));
109                self.replan_pending = false;
110            }
111            (CacheEntryState::Inactive, Some(expected)) => {
112                if rows_scanned <= expected {
113                    self.state = CacheEntryState::Active;
114                    self.expected_rows_scanned = Some(rows_scanned.max(1));
115                    self.replan_pending = false;
116                } else {
117                    self.expected_rows_scanned = Some(rows_scanned.min(expected.saturating_mul(2)));
118                }
119            }
120            (CacheEntryState::Active, Some(expected)) => {
121                if rows_scanned > expected.saturating_mul(10).max(10) {
122                    self.state = CacheEntryState::Inactive;
123                    self.expected_rows_scanned = Some(rows_scanned.max(1));
124                    self.replan_pending = true;
125                } else if rows_scanned < expected {
126                    self.expected_rows_scanned = Some(rows_scanned.max(1));
127                    self.replan_pending = false;
128                }
129            }
130        }
131    }
132}
133
134/// LRU cache for query plans
135pub struct PlanCache {
136    /// Cached plans by key
137    entries: HashMap<String, CachedPlan>,
138    /// LRU tracking - key ordering
139    lru_order: Vec<String>,
140    /// Maximum cache size
141    capacity: usize,
142    /// Time-to-live for entries
143    ttl: Duration,
144    /// Cache statistics
145    hits: u64,
146    misses: u64,
147}
148
149impl PlanCache {
150    /// Create a new plan cache with the given capacity
151    pub fn new(capacity: usize) -> Self {
152        Self {
153            entries: HashMap::with_capacity(capacity),
154            lru_order: Vec::with_capacity(capacity),
155            capacity,
156            ttl: Duration::from_secs(3600), // 1 hour default TTL
157            hits: 0,
158            misses: 0,
159        }
160    }
161
162    /// Set the TTL for cache entries
163    pub fn with_ttl(mut self, ttl: Duration) -> Self {
164        self.ttl = ttl;
165        self
166    }
167
168    /// Read-only cache probe — no LRU promotion, no mutation.
169    ///
170    /// Use this with a `read()` lock in the hot query path so concurrent
171    /// readers don't serialize on a write lock. Falls back to `None` when
172    /// the entry is expired or has a pending replan; the caller must then
173    /// re-acquire a `write()` lock and call `get()` to handle eviction and
174    /// miss insertion.
175    pub fn peek(&self, key: &str) -> Option<&CachedPlan> {
176        let entry = self.entries.get(key)?;
177        if entry.needs_replan() || entry.is_expired(self.ttl) {
178            return None;
179        }
180        Some(entry)
181    }
182
183    /// Get a cached plan by key
184    pub fn get(&mut self, key: &str) -> Option<&CachedPlan> {
185        if self
186            .entries
187            .get(key)
188            .is_some_and(|entry| entry.needs_replan())
189        {
190            self.remove(key);
191            self.misses += 1;
192            return None;
193        }
194
195        // Check if entry exists and is not expired
196        if let Some(entry) = self.entries.get_mut(key) {
197            if entry.is_expired(self.ttl) {
198                // Remove expired entry
199                self.remove(key);
200                self.misses += 1;
201                return None;
202            }
203
204            entry.touch();
205            self.promote(key);
206            self.hits += 1;
207            return self.entries.get(key);
208        }
209
210        self.misses += 1;
211        None
212    }
213
214    /// Insert a plan into the cache
215    pub fn insert(&mut self, key: String, plan: CachedPlan) {
216        // Remove existing entry if present
217        if self.entries.contains_key(&key) {
218            self.remove(&key);
219        }
220
221        // Evict if at capacity
222        while self.entries.len() >= self.capacity {
223            self.evict_lru();
224        }
225
226        // Insert new entry
227        self.entries.insert(key.clone(), plan);
228        self.lru_order.push(key);
229    }
230
231    /// Remove an entry from the cache
232    pub fn remove(&mut self, key: &str) -> Option<CachedPlan> {
233        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
234            self.lru_order.remove(pos);
235        }
236        self.entries.remove(key)
237    }
238
239    /// Invalidate entries matching a predicate
240    pub fn invalidate<F>(&mut self, predicate: F)
241    where
242        F: Fn(&str) -> bool,
243    {
244        let keys_to_remove: Vec<String> = self
245            .entries
246            .keys()
247            .filter(|k| predicate(k))
248            .cloned()
249            .collect();
250
251        for key in keys_to_remove {
252            self.remove(&key);
253        }
254    }
255
256    /// Clear all entries
257    pub fn clear(&mut self) {
258        self.entries.clear();
259        self.lru_order.clear();
260    }
261
262    /// Get cache statistics
263    pub fn stats(&self) -> CacheStats {
264        CacheStats {
265            hits: self.hits,
266            misses: self.misses,
267            size: self.entries.len(),
268            capacity: self.capacity,
269        }
270    }
271
272    /// Promote a key to most recently used
273    fn promote(&mut self, key: &str) {
274        if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
275            let key = self.lru_order.remove(pos);
276            self.lru_order.push(key);
277        }
278    }
279
280    /// Evict the least recently used entry
281    fn evict_lru(&mut self) {
282        if let Some(key) = self.lru_order.first().cloned() {
283            self.remove(&key);
284        }
285    }
286
287    /// Prune expired entries
288    pub fn prune_expired(&mut self) {
289        let expired: Vec<String> = self
290            .entries
291            .iter()
292            .filter(|(_, v)| v.is_expired(self.ttl))
293            .map(|(k, _)| k.clone())
294            .collect();
295
296        for key in expired {
297            self.remove(&key);
298        }
299    }
300
301    pub fn record_observation(&mut self, key: &str, rows_scanned: u64) {
302        if let Some(entry) = self.entries.get_mut(key) {
303            entry.record_observation(rows_scanned);
304        }
305    }
306}
307
308impl Default for PlanCache {
309    fn default() -> Self {
310        Self::new(1000)
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::storage::query::ast::{Projection, QueryExpr, TableQuery};
318    use crate::storage::query::planner::cost::PlanCost;
319
320    fn make_test_plan() -> QueryPlan {
321        QueryPlan::new(
322            QueryExpr::Table(TableQuery {
323                table: "test".to_string(),
324                source: None,
325                alias: None,
326                select_items: Vec::new(),
327                columns: vec![Projection::All],
328                where_expr: None,
329                filter: None,
330                group_by_exprs: Vec::new(),
331                group_by: Vec::new(),
332                having_expr: None,
333                having: None,
334                order_by: vec![],
335                limit: None,
336                offset: None,
337                expand: None,
338                as_of: None,
339            }),
340            QueryExpr::Table(TableQuery {
341                table: "test".to_string(),
342                source: None,
343                alias: None,
344                select_items: Vec::new(),
345                columns: vec![Projection::All],
346                where_expr: None,
347                filter: None,
348                group_by_exprs: Vec::new(),
349                group_by: Vec::new(),
350                having_expr: None,
351                having: None,
352                order_by: vec![],
353                limit: None,
354                offset: None,
355                expand: None,
356                as_of: None,
357            }),
358            PlanCost::default(),
359        )
360    }
361
362    #[test]
363    fn test_cache_insert_and_get() {
364        let mut cache = PlanCache::new(10);
365        let plan = CachedPlan::new(make_test_plan());
366
367        cache.insert("query1".to_string(), plan);
368        assert!(cache.get("query1").is_some());
369        assert!(cache.get("query2").is_none());
370    }
371
372    #[test]
373    fn test_cache_lru_eviction() {
374        let mut cache = PlanCache::new(2);
375
376        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
377        cache.insert("q2".to_string(), CachedPlan::new(make_test_plan()));
378
379        // Access q1 to make it most recently used
380        let _ = cache.get("q1");
381
382        // Insert q3 - should evict q2 (LRU)
383        cache.insert("q3".to_string(), CachedPlan::new(make_test_plan()));
384
385        assert!(cache.get("q1").is_some());
386        assert!(cache.get("q2").is_none()); // Evicted
387        assert!(cache.get("q3").is_some());
388    }
389
390    #[test]
391    fn test_cache_stats() {
392        let mut cache = PlanCache::new(10);
393        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
394
395        let _ = cache.get("q1"); // Hit
396        let _ = cache.get("q2"); // Miss
397        let _ = cache.get("q1"); // Hit
398
399        let stats = cache.stats();
400        assert_eq!(stats.hits, 2);
401        assert_eq!(stats.misses, 1);
402    }
403
404    #[test]
405    fn test_cache_invalidation() {
406        let mut cache = PlanCache::new(10);
407        cache.insert(
408            "hosts_query1".to_string(),
409            CachedPlan::new(make_test_plan()),
410        );
411        cache.insert(
412            "hosts_query2".to_string(),
413            CachedPlan::new(make_test_plan()),
414        );
415        cache.insert("users_query".to_string(), CachedPlan::new(make_test_plan()));
416
417        // Invalidate all hosts queries
418        cache.invalidate(|k| k.starts_with("hosts_"));
419
420        assert!(cache.get("hosts_query1").is_none());
421        assert!(cache.get("hosts_query2").is_none());
422        assert!(cache.get("users_query").is_some());
423    }
424
425    #[test]
426    fn active_entry_forces_replan_after_large_regression() {
427        let mut cache = PlanCache::new(10);
428        cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
429
430        cache.record_observation("q1", 10);
431        cache.record_observation("q1", 10);
432        cache.record_observation("q1", 500);
433
434        assert!(cache.get("q1").is_none());
435    }
436}