1use std::{
22 borrow::Borrow,
23 num::NonZeroUsize,
24 sync::{Arc, Mutex, MutexGuard},
25};
26
27use lru::LruCache;
28use selene_core::GraphId;
29
30use crate::{ExecutionPlan, ImplDefinedCaps, PipelineOp, SubqueryBody};
31
32pub struct PlanCache {
34 inner: LruCache<CacheKey, CachedPlan>,
35 stats: PlanCacheStats,
36}
37
38#[derive(Clone, Debug, Eq, Hash, PartialEq)]
39struct CacheKey {
40 source: Arc<str>,
41}
42
43impl Borrow<str> for CacheKey {
44 fn borrow(&self) -> &str {
45 &self.source
46 }
47}
48
49struct CachedPlan {
50 plan: Arc<ExecutionPlan>,
51 schema_version_at_plan: u64,
52}
53
54pub struct SharedPlanCache {
64 inner: Mutex<SharedPlanCacheInner>,
65}
66
67struct SharedPlanCacheInner {
68 plans: LruCache<SharedPlanCacheKey, Arc<ExecutionPlan>>,
69 stats: SharedPlanCacheStats,
70}
71
72#[derive(Clone, Debug, Eq, Hash, PartialEq)]
73struct SharedPlanCacheKey {
74 graph_id: GraphId,
75 schema_version: u64,
76 registry_version: u64,
77 source: Arc<str>,
78 caps: ImplDefinedCaps,
79 index_selection: bool,
80}
81
82#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
84pub struct PlanCacheStats {
85 pub hits: u64,
87 pub misses: u64,
89 pub stale_invalidations: u64,
91 pub capacity_evictions: u64,
93}
94
95#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub struct SharedPlanCacheStats {
98 pub hits: u64,
100 pub misses: u64,
102 pub capacity_evictions: u64,
104}
105
106impl PlanCache {
107 #[must_use]
109 pub fn new(capacity: NonZeroUsize) -> Self {
110 Self {
111 inner: LruCache::new(capacity),
112 stats: PlanCacheStats::default(),
113 }
114 }
115
116 pub(crate) fn get(&mut self, source: &str, schema_version: u64) -> Option<Arc<ExecutionPlan>> {
117 match self.inner.get(source) {
118 Some(cached) if cached.schema_version_at_plan == schema_version => {
119 let plan = Arc::clone(&cached.plan);
120 self.stats.hits = self.stats.hits.saturating_add(1);
121 trace_cache_event("hit", schema_version, source);
122 Some(plan)
123 }
124 Some(_) => {
125 self.inner.pop(source);
126 self.stats.stale_invalidations = self.stats.stale_invalidations.saturating_add(1);
127 trace_cache_event("stale", schema_version, source);
128 None
129 }
130 None => {
131 self.stats.misses = self.stats.misses.saturating_add(1);
132 trace_cache_event("miss", schema_version, source);
133 None
134 }
135 }
136 }
137
138 pub(crate) fn insert(
139 &mut self,
140 source: Arc<str>,
141 plan: Arc<ExecutionPlan>,
142 schema_version: u64,
143 ) {
144 if !is_cacheable(&plan) {
145 return;
146 }
147
148 let replacing_existing = self.inner.contains(source.as_ref());
149 let key = CacheKey { source };
150 let cached = CachedPlan {
151 plan,
152 schema_version_at_plan: schema_version,
153 };
154 if self.inner.push(key, cached).is_some() && !replacing_existing {
155 self.stats.capacity_evictions = self.stats.capacity_evictions.saturating_add(1);
156 }
157 }
158
159 #[must_use]
161 pub const fn stats(&self) -> PlanCacheStats {
162 self.stats
163 }
164
165 pub fn clear(&mut self) {
167 self.inner.clear();
168 }
169}
170
171impl SharedPlanCache {
172 #[must_use]
174 pub fn new(capacity: NonZeroUsize) -> Self {
175 Self {
176 inner: Mutex::new(SharedPlanCacheInner {
177 plans: LruCache::new(capacity),
178 stats: SharedPlanCacheStats::default(),
179 }),
180 }
181 }
182
183 pub(crate) fn get(&self, key: SharedPlanCacheLookup<'_>) -> Option<Arc<ExecutionPlan>> {
184 let mut inner = self.lock_inner();
185 let key = SharedPlanCacheKey::from_lookup(key);
186 match inner.plans.get(&key) {
187 Some(plan) => {
188 let plan = Arc::clone(plan);
189 inner.stats.hits = inner.stats.hits.saturating_add(1);
190 Some(plan)
191 }
192 None => {
193 inner.stats.misses = inner.stats.misses.saturating_add(1);
194 None
195 }
196 }
197 }
198
199 pub(crate) fn insert(&self, key: SharedPlanCacheInsert, plan: Arc<ExecutionPlan>) {
200 if !is_cacheable(&plan) {
201 return;
202 }
203
204 let mut inner = self.lock_inner();
205 let key = SharedPlanCacheKey::from_insert(key);
206 let replacing_existing = inner.plans.contains(&key);
207 if inner.plans.push(key, plan).is_some() && !replacing_existing {
208 inner.stats.capacity_evictions = inner.stats.capacity_evictions.saturating_add(1);
209 }
210 }
211
212 #[must_use]
214 pub fn stats(&self) -> SharedPlanCacheStats {
215 self.lock_inner().stats
216 }
217
218 pub fn clear(&self) {
220 self.lock_inner().plans.clear();
221 }
222
223 fn lock_inner(&self) -> MutexGuard<'_, SharedPlanCacheInner> {
224 self.inner
225 .lock()
226 .unwrap_or_else(|poison| poison.into_inner())
227 }
228}
229
230pub(crate) struct SharedPlanCacheLookup<'a> {
231 pub(crate) graph_id: GraphId,
232 pub(crate) schema_version: u64,
233 pub(crate) registry_version: u64,
234 pub(crate) source: &'a str,
235 pub(crate) caps: ImplDefinedCaps,
236 pub(crate) index_selection: bool,
237}
238
239pub(crate) struct SharedPlanCacheInsert {
240 pub(crate) graph_id: GraphId,
241 pub(crate) schema_version: u64,
242 pub(crate) registry_version: u64,
243 pub(crate) source: Arc<str>,
244 pub(crate) caps: ImplDefinedCaps,
245 pub(crate) index_selection: bool,
246}
247
248impl SharedPlanCacheKey {
249 fn from_lookup(value: SharedPlanCacheLookup<'_>) -> Self {
250 Self {
251 graph_id: value.graph_id,
252 schema_version: value.schema_version,
253 registry_version: value.registry_version,
254 source: Arc::from(value.source),
255 caps: value.caps,
256 index_selection: value.index_selection,
257 }
258 }
259
260 fn from_insert(value: SharedPlanCacheInsert) -> Self {
261 Self {
262 graph_id: value.graph_id,
263 schema_version: value.schema_version,
264 registry_version: value.registry_version,
265 source: value.source,
266 caps: value.caps,
267 index_selection: value.index_selection,
268 }
269 }
270}
271
272fn is_cacheable(plan: &ExecutionPlan) -> bool {
273 !contains_call(plan)
274}
275
276fn contains_call(plan: &ExecutionPlan) -> bool {
277 plan.pipeline.iter().any(op_contains_call)
278 || plan.subqueries.iter().any(|subquery| match &subquery.body {
279 SubqueryBody::Pattern(_) => false,
280 SubqueryBody::Plan(plan) => contains_call(plan),
281 })
282}
283
284fn op_contains_call(op: &PipelineOp) -> bool {
285 match op {
286 PipelineOp::Call(_) => true,
287 PipelineOp::CallSubquery(subquery) => contains_call(&subquery.body),
288 PipelineOp::Union { rhs, .. }
289 | PipelineOp::Chain(rhs)
290 | PipelineOp::CorrelatedChain(rhs)
291 | PipelineOp::ExplainPlan { inner: rhs, .. } => contains_call(rhs),
292 PipelineOp::Match(_) | PipelineOp::OptionalMatch(_) => false,
293 _ => false,
294 }
295}
296
297fn trace_cache_event(result: &'static str, schema_version: u64, source: &str) {
298 #[cfg(feature = "plan-cache-source-tracing")]
299 tracing::debug!(
300 result,
301 schema_version,
302 source_len = source.len(),
303 source_prefix = %SourcePrefix(source),
304 "session plan cache lookup"
305 );
306
307 #[cfg(not(feature = "plan-cache-source-tracing"))]
308 tracing::debug!(
309 result,
310 schema_version,
311 source_len = source.len(),
312 "session plan cache lookup"
313 );
314}
315
316#[cfg(feature = "plan-cache-source-tracing")]
317struct SourcePrefix<'a>(&'a str);
318
319#[cfg(feature = "plan-cache-source-tracing")]
320impl std::fmt::Display for SourcePrefix<'_> {
321 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 let mut chars = self.0.chars();
323 let prefix = chars.by_ref().take(200).collect::<String>();
324 if chars.next().is_some() {
325 write!(formatter, "{prefix}...")
326 } else {
327 formatter.write_str(&prefix)
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use std::{num::NonZeroUsize, sync::Arc};
335
336 use selene_core::{DbString, db_string};
337
338 use super::*;
339 use crate::{
340 BindingTableSchema, EmptyProcedureRegistry, ExprId, ImplDefinedCaps, PipelineOpId,
341 PlannedCall, PlannedSubquery, ProcedureHandle, ProcedureMutability, ProcedureOutputSchema,
342 ProcedureTier, SourceSpan, StatementCategory, SubqueryBody, SubqueryKind, analyze::analyze,
343 parser::parse, plan::plan,
344 };
345
346 fn planned(source: &str) -> Arc<ExecutionPlan> {
347 let statement = parse(source).expect("test source parses");
348 let analyzed =
349 analyze(statement, &EmptyProcedureRegistry, None).expect("test source analyzes");
350 Arc::new(plan(&analyzed, &EmptyProcedureRegistry).expect("test source plans"))
351 }
352
353 fn admitted(value: &str) -> DbString {
354 db_string(value).expect("test name admits")
355 }
356
357 fn call_plan() -> Arc<ExecutionPlan> {
358 Arc::new(ExecutionPlan {
359 category: StatementCategory::ReadOnly,
360 pattern_plan: None,
361 pipeline: vec![PipelineOp::Call(PlannedCall {
362 optional: false,
363 procedure: Box::from([admitted("cache"), admitted("call")]),
364 handle: ProcedureHandle::new(1),
365 args: Vec::new(),
366 yield_cols: Vec::new(),
367 output_schema: ProcedureOutputSchema::default(),
368 yield_schema: Vec::new(),
369 tier: ProcedureTier::Graph,
370 mutability: ProcedureMutability::Read,
371 span: SourceSpan::default(),
372 })],
373 output_schema: BindingTableSchema {
374 columns: Vec::new(),
375 },
376 impl_defined_caps: ImplDefinedCaps::default(),
377 expr_ids: Default::default(),
378 subqueries: Default::default(),
379 next_expr_id: ExprId::new(0),
380 next_pipeline_op_id: PipelineOpId::new(1),
381 })
382 }
383
384 fn explain_call_plan() -> Arc<ExecutionPlan> {
385 let inner = call_plan();
386 Arc::new(ExecutionPlan {
387 category: StatementCategory::ReadOnly,
388 pattern_plan: None,
389 pipeline: vec![PipelineOp::ExplainPlan {
390 inner: Box::new(inner.as_ref().clone()),
391 span: SourceSpan::default(),
392 }],
393 output_schema: BindingTableSchema {
394 columns: Vec::new(),
395 },
396 impl_defined_caps: ImplDefinedCaps::default(),
397 expr_ids: Default::default(),
398 subqueries: Default::default(),
399 next_expr_id: ExprId::new(0),
400 next_pipeline_op_id: PipelineOpId::new(1),
401 })
402 }
403
404 fn expression_subquery_call_plan() -> Arc<ExecutionPlan> {
405 let mut plan = ExecutionPlan {
406 category: StatementCategory::ReadOnly,
407 pattern_plan: None,
408 pipeline: Vec::new(),
409 output_schema: BindingTableSchema {
410 columns: Vec::new(),
411 },
412 impl_defined_caps: ImplDefinedCaps::default(),
413 expr_ids: Default::default(),
414 subqueries: Default::default(),
415 next_expr_id: ExprId::new(0),
416 next_pipeline_op_id: PipelineOpId::new(0),
417 };
418 plan.subqueries.insert(
419 ExprId::new(7),
420 PlannedSubquery {
421 kind: SubqueryKind::Value,
422 body: SubqueryBody::Plan(Box::new(call_plan().as_ref().clone())),
423 outer_binding_refs: Vec::new(),
424 span: SourceSpan::default(),
425 },
426 );
427 Arc::new(plan)
428 }
429
430 #[test]
431 fn plan_cache_basic_hit_miss() {
432 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
433 assert!(cache.get("RETURN 1", 0).is_none());
434
435 cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
436 assert!(cache.get("RETURN 1", 0).is_some());
437
438 assert_eq!(
439 cache.stats(),
440 PlanCacheStats {
441 hits: 1,
442 misses: 1,
443 stale_invalidations: 0,
444 capacity_evictions: 0,
445 }
446 );
447 }
448
449 #[test]
450 fn plan_cache_lru_evicts_oldest() {
451 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
452 cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
453 cache.insert(Arc::from("RETURN 2"), planned("RETURN 2"), 0);
454 cache.insert(Arc::from("RETURN 3"), planned("RETURN 3"), 0);
455
456 assert!(cache.get("RETURN 1", 0).is_none());
457 assert!(cache.get("RETURN 2", 0).is_some());
458 assert!(cache.get("RETURN 3", 0).is_some());
459 assert_eq!(cache.stats().capacity_evictions, 1);
460 }
461
462 #[test]
463 fn plan_cache_schema_version_mismatch_is_miss() {
464 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
465 cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
466
467 assert!(cache.get("RETURN 1", 1).is_none());
468 assert_eq!(cache.stats().stale_invalidations, 1);
469 assert!(cache.get("RETURN 1", 1).is_none());
470 assert_eq!(cache.stats().misses, 1);
471 }
472
473 #[test]
474 fn plan_cache_clear_resets_state() {
475 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
476 cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
477 assert!(cache.get("RETURN 1", 0).is_some());
478
479 cache.clear();
480 assert!(cache.get("RETURN 1", 0).is_none());
481
482 assert_eq!(
483 cache.stats(),
484 PlanCacheStats {
485 hits: 1,
486 misses: 1,
487 stale_invalidations: 0,
488 capacity_evictions: 0,
489 }
490 );
491 }
492
493 #[test]
494 fn cache_skips_call_plans() {
495 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
496 cache.insert(Arc::from("CALL cache.call()"), call_plan(), 0);
497
498 assert!(cache.get("CALL cache.call()", 0).is_none());
499 assert_eq!(cache.stats().misses, 1);
500 }
501
502 #[test]
503 fn cache_skips_explain_call_plans() {
504 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
505 cache.insert(
506 Arc::from("EXPLAIN CALL cache.call()"),
507 explain_call_plan(),
508 0,
509 );
510
511 assert!(cache.get("EXPLAIN CALL cache.call()", 0).is_none());
512 assert_eq!(cache.stats().misses, 1);
513 }
514
515 #[test]
516 fn cache_skips_expression_subqueries_that_contain_calls() {
517 let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
518 cache.insert(
519 Arc::from("RETURN VALUE { CALL cache.call() RETURN 1 LIMIT 1 } AS v"),
520 expression_subquery_call_plan(),
521 0,
522 );
523
524 assert!(
525 cache
526 .get(
527 "RETURN VALUE { CALL cache.call() RETURN 1 LIMIT 1 } AS v",
528 0
529 )
530 .is_none()
531 );
532 assert_eq!(cache.stats().misses, 1);
533 }
534
535 #[test]
536 fn shared_plan_cache_hits_across_sessions() {
537 let cache = SharedPlanCache::new(NonZeroUsize::new(2).unwrap());
538 let lookup = |source| SharedPlanCacheLookup {
539 graph_id: selene_core::GraphId::new(7),
540 schema_version: 0,
541 registry_version: 0,
542 source,
543 caps: ImplDefinedCaps::DEFAULT,
544 index_selection: true,
545 };
546
547 assert!(cache.get(lookup("RETURN 1")).is_none());
548 cache.insert(
549 SharedPlanCacheInsert {
550 graph_id: selene_core::GraphId::new(7),
551 schema_version: 0,
552 registry_version: 0,
553 source: Arc::from("RETURN 1"),
554 caps: ImplDefinedCaps::DEFAULT,
555 index_selection: true,
556 },
557 planned("RETURN 1"),
558 );
559 assert!(cache.get(lookup("RETURN 1")).is_some());
560
561 assert_eq!(
562 cache.stats(),
563 SharedPlanCacheStats {
564 hits: 1,
565 misses: 1,
566 capacity_evictions: 0,
567 }
568 );
569 }
570
571 #[test]
572 fn shared_plan_cache_keys_planning_settings() {
573 let cache = SharedPlanCache::new(NonZeroUsize::new(4).unwrap());
574 let base = SharedPlanCacheInsert {
575 graph_id: selene_core::GraphId::new(7),
576 schema_version: 0,
577 registry_version: 0,
578 source: Arc::from("RETURN 1"),
579 caps: ImplDefinedCaps::DEFAULT,
580 index_selection: true,
581 };
582 cache.insert(base, planned("RETURN 1"));
583
584 assert!(
585 cache
586 .get(SharedPlanCacheLookup {
587 graph_id: selene_core::GraphId::new(7),
588 schema_version: 0,
589 registry_version: 0,
590 source: "RETURN 1",
591 caps: ImplDefinedCaps::DEFAULT,
592 index_selection: true,
593 })
594 .is_some()
595 );
596 assert!(
597 cache
598 .get(SharedPlanCacheLookup {
599 graph_id: selene_core::GraphId::new(7),
600 schema_version: 0,
601 registry_version: 0,
602 source: "RETURN 1",
603 caps: ImplDefinedCaps::DEFAULT.with_max_list_length(1),
604 index_selection: true,
605 })
606 .is_none()
607 );
608 assert!(
609 cache
610 .get(SharedPlanCacheLookup {
611 graph_id: selene_core::GraphId::new(7),
612 schema_version: 0,
613 registry_version: 0,
614 source: "RETURN 1",
615 caps: ImplDefinedCaps::DEFAULT,
616 index_selection: false,
617 })
618 .is_none()
619 );
620 }
621}