reddb_server/storage/query/planner/
cache.rs1use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15use super::{CacheStats, QueryPlan};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CacheEntryState {
19 Inactive,
20 Active,
21}
22
23#[derive(Debug, Clone)]
25pub struct CachedPlan {
26 pub plan: QueryPlan,
28 pub cached_at: Instant,
30 pub access_count: u64,
32 pub last_accessed: Instant,
34 pub shape_key: Option<String>,
36 pub exact_query: Option<std::sync::Arc<str>>,
38 pub state: CacheEntryState,
40 pub expected_rows_scanned: Option<u64>,
42 pub last_observed_rows_scanned: Option<u64>,
44 pub parameter_count: usize,
46 pub replan_pending: bool,
48}
49
50impl CachedPlan {
51 pub fn new(plan: QueryPlan) -> Self {
53 let now = Instant::now();
54 Self {
55 plan,
56 cached_at: now,
57 access_count: 0,
58 last_accessed: now,
59 shape_key: None,
60 exact_query: None,
61 state: CacheEntryState::Inactive,
62 expected_rows_scanned: None,
63 last_observed_rows_scanned: None,
64 parameter_count: 0,
65 replan_pending: false,
66 }
67 }
68
69 pub fn with_shape_key(mut self, shape_key: impl Into<String>) -> Self {
70 self.shape_key = Some(shape_key.into());
71 self
72 }
73
74 pub fn with_exact_query(mut self, query: impl Into<String>) -> Self {
75 let s: String = query.into();
76 self.exact_query = Some(std::sync::Arc::<str>::from(s));
77 self
78 }
79
80 pub fn with_parameter_count(mut self, parameter_count: usize) -> Self {
81 self.parameter_count = parameter_count;
82 self
83 }
84
85 pub fn is_expired(&self, ttl: Duration) -> bool {
87 self.cached_at.elapsed() > ttl
88 }
89
90 pub fn touch(&mut self) {
92 self.access_count += 1;
93 self.last_accessed = Instant::now();
94 }
95
96 pub fn matches_exact_query(&self, query: &str) -> bool {
97 self.exact_query.as_deref() == Some(query)
98 }
99
100 pub fn needs_replan(&self) -> bool {
101 self.replan_pending
102 }
103
104 pub fn record_observation(&mut self, rows_scanned: u64) {
105 self.last_observed_rows_scanned = Some(rows_scanned);
106 match (self.state, self.expected_rows_scanned) {
107 (_, None) => {
108 self.expected_rows_scanned = Some(rows_scanned.max(1));
109 self.replan_pending = false;
110 }
111 (CacheEntryState::Inactive, Some(expected)) => {
112 if rows_scanned <= expected {
113 self.state = CacheEntryState::Active;
114 self.expected_rows_scanned = Some(rows_scanned.max(1));
115 self.replan_pending = false;
116 } else {
117 self.expected_rows_scanned = Some(rows_scanned.min(expected.saturating_mul(2)));
118 }
119 }
120 (CacheEntryState::Active, Some(expected)) => {
121 if rows_scanned > expected.saturating_mul(10).max(10) {
122 self.state = CacheEntryState::Inactive;
123 self.expected_rows_scanned = Some(rows_scanned.max(1));
124 self.replan_pending = true;
125 } else if rows_scanned < expected {
126 self.expected_rows_scanned = Some(rows_scanned.max(1));
127 self.replan_pending = false;
128 }
129 }
130 }
131 }
132}
133
134pub struct PlanCache {
136 entries: HashMap<String, CachedPlan>,
138 lru_order: Vec<String>,
140 capacity: usize,
142 ttl: Duration,
144 hits: u64,
146 misses: u64,
147}
148
149impl PlanCache {
150 pub fn new(capacity: usize) -> Self {
152 Self {
153 entries: HashMap::with_capacity(capacity),
154 lru_order: Vec::with_capacity(capacity),
155 capacity,
156 ttl: Duration::from_secs(3600), hits: 0,
158 misses: 0,
159 }
160 }
161
162 pub fn with_ttl(mut self, ttl: Duration) -> Self {
164 self.ttl = ttl;
165 self
166 }
167
168 pub fn peek(&self, key: &str) -> Option<&CachedPlan> {
176 let entry = self.entries.get(key)?;
177 if entry.needs_replan() || entry.is_expired(self.ttl) {
178 return None;
179 }
180 Some(entry)
181 }
182
183 pub fn get(&mut self, key: &str) -> Option<&CachedPlan> {
185 if self
186 .entries
187 .get(key)
188 .is_some_and(|entry| entry.needs_replan())
189 {
190 self.remove(key);
191 self.misses += 1;
192 return None;
193 }
194
195 if let Some(entry) = self.entries.get_mut(key) {
197 if entry.is_expired(self.ttl) {
198 self.remove(key);
200 self.misses += 1;
201 return None;
202 }
203
204 entry.touch();
205 self.promote(key);
206 self.hits += 1;
207 return self.entries.get(key);
208 }
209
210 self.misses += 1;
211 None
212 }
213
214 pub fn insert(&mut self, key: String, plan: CachedPlan) {
216 if self.entries.contains_key(&key) {
218 self.remove(&key);
219 }
220
221 while self.entries.len() >= self.capacity {
223 self.evict_lru();
224 }
225
226 self.entries.insert(key.clone(), plan);
228 self.lru_order.push(key);
229 }
230
231 pub fn remove(&mut self, key: &str) -> Option<CachedPlan> {
233 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
234 self.lru_order.remove(pos);
235 }
236 self.entries.remove(key)
237 }
238
239 pub fn invalidate<F>(&mut self, predicate: F)
241 where
242 F: Fn(&str) -> bool,
243 {
244 let keys_to_remove: Vec<String> = self
245 .entries
246 .keys()
247 .filter(|k| predicate(k))
248 .cloned()
249 .collect();
250
251 for key in keys_to_remove {
252 self.remove(&key);
253 }
254 }
255
256 pub fn clear(&mut self) {
258 self.entries.clear();
259 self.lru_order.clear();
260 }
261
262 pub fn stats(&self) -> CacheStats {
264 CacheStats {
265 hits: self.hits,
266 misses: self.misses,
267 size: self.entries.len(),
268 capacity: self.capacity,
269 }
270 }
271
272 fn promote(&mut self, key: &str) {
274 if let Some(pos) = self.lru_order.iter().position(|k| k == key) {
275 let key = self.lru_order.remove(pos);
276 self.lru_order.push(key);
277 }
278 }
279
280 fn evict_lru(&mut self) {
282 if let Some(key) = self.lru_order.first().cloned() {
283 self.remove(&key);
284 }
285 }
286
287 pub fn prune_expired(&mut self) {
289 let expired: Vec<String> = self
290 .entries
291 .iter()
292 .filter(|(_, v)| v.is_expired(self.ttl))
293 .map(|(k, _)| k.clone())
294 .collect();
295
296 for key in expired {
297 self.remove(&key);
298 }
299 }
300
301 pub fn record_observation(&mut self, key: &str, rows_scanned: u64) {
302 if let Some(entry) = self.entries.get_mut(key) {
303 entry.record_observation(rows_scanned);
304 }
305 }
306}
307
308impl Default for PlanCache {
309 fn default() -> Self {
310 Self::new(1000)
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use crate::storage::query::ast::{Projection, QueryExpr, TableQuery};
318 use crate::storage::query::planner::cost::PlanCost;
319
320 fn make_test_plan() -> QueryPlan {
321 QueryPlan::new(
322 QueryExpr::Table(TableQuery {
323 table: "test".to_string(),
324 source: None,
325 alias: None,
326 select_items: Vec::new(),
327 columns: vec![Projection::All],
328 where_expr: None,
329 filter: None,
330 group_by_exprs: Vec::new(),
331 group_by: Vec::new(),
332 having_expr: None,
333 having: None,
334 order_by: vec![],
335 limit: None,
336 offset: None,
337 expand: None,
338 as_of: None,
339 }),
340 QueryExpr::Table(TableQuery {
341 table: "test".to_string(),
342 source: None,
343 alias: None,
344 select_items: Vec::new(),
345 columns: vec![Projection::All],
346 where_expr: None,
347 filter: None,
348 group_by_exprs: Vec::new(),
349 group_by: Vec::new(),
350 having_expr: None,
351 having: None,
352 order_by: vec![],
353 limit: None,
354 offset: None,
355 expand: None,
356 as_of: None,
357 }),
358 PlanCost::default(),
359 )
360 }
361
362 #[test]
363 fn test_cache_insert_and_get() {
364 let mut cache = PlanCache::new(10);
365 let plan = CachedPlan::new(make_test_plan());
366
367 cache.insert("query1".to_string(), plan);
368 assert!(cache.get("query1").is_some());
369 assert!(cache.get("query2").is_none());
370 }
371
372 #[test]
373 fn test_cache_lru_eviction() {
374 let mut cache = PlanCache::new(2);
375
376 cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
377 cache.insert("q2".to_string(), CachedPlan::new(make_test_plan()));
378
379 let _ = cache.get("q1");
381
382 cache.insert("q3".to_string(), CachedPlan::new(make_test_plan()));
384
385 assert!(cache.get("q1").is_some());
386 assert!(cache.get("q2").is_none()); assert!(cache.get("q3").is_some());
388 }
389
390 #[test]
391 fn test_cache_stats() {
392 let mut cache = PlanCache::new(10);
393 cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
394
395 let _ = cache.get("q1"); let _ = cache.get("q2"); let _ = cache.get("q1"); let stats = cache.stats();
400 assert_eq!(stats.hits, 2);
401 assert_eq!(stats.misses, 1);
402 }
403
404 #[test]
405 fn test_cache_invalidation() {
406 let mut cache = PlanCache::new(10);
407 cache.insert(
408 "hosts_query1".to_string(),
409 CachedPlan::new(make_test_plan()),
410 );
411 cache.insert(
412 "hosts_query2".to_string(),
413 CachedPlan::new(make_test_plan()),
414 );
415 cache.insert("users_query".to_string(), CachedPlan::new(make_test_plan()));
416
417 cache.invalidate(|k| k.starts_with("hosts_"));
419
420 assert!(cache.get("hosts_query1").is_none());
421 assert!(cache.get("hosts_query2").is_none());
422 assert!(cache.get("users_query").is_some());
423 }
424
425 #[test]
426 fn active_entry_forces_replan_after_large_regression() {
427 let mut cache = PlanCache::new(10);
428 cache.insert("q1".to_string(), CachedPlan::new(make_test_plan()));
429
430 cache.record_observation("q1", 10);
431 cache.record_observation("q1", 10);
432 cache.record_observation("q1", 500);
433
434 assert!(cache.get("q1").is_none());
435 }
436}