Skip to main content

selene_gql/runtime/
plan_cache.rs

1//! Session-scoped execution-plan cache.
2//!
3//! # Why the cache key is schema-epoch-only
4//!
5//! Cached plans are the *optimized* plans (`Session::execute_source` optimizes
6//! before insert). The cache freshness check compares the stored
7//! `schema_version_at_plan` against the graph's current `schema_version`, which
8//! bumps only on schema-changing commits (CREATE / DROP INDEX, type DDL). The
9//! snapshot `generation` counter — which bumps on *every* commit including
10//! data-only writes — is deliberately NOT part of the key: optimizer index
11//! *selection* depends only on the set of indexes (schema), never on row data,
12//! so a chosen access path (LabelIndex / TypedIndexRange / CompositeLookup)
13//! stays correct as rows mutate within an epoch. Adding `generation` would
14//! evict every cached plan on every data write for zero correctness benefit.
15//!
16//! FUTURE: wiring live cardinality statistics into the cost model would make
17//! plan choice data-dependent. At that point the key MUST also include
18//! `generation`; that is out of scope for the structural index-selection
19//! optimizer wired today.
20
21use std::{
22    borrow::Borrow,
23    num::NonZeroUsize,
24    sync::{Arc, Mutex, MutexGuard},
25};
26
27use lru::LruCache;
28use selene_core::GraphId;
29
30use crate::{ExecutionPlan, ImplDefinedCaps, PipelineOp, SubqueryBody};
31
32/// LRU-cached execution plans keyed by source text.
33pub struct PlanCache {
34    inner: LruCache<CacheKey, CachedPlan>,
35    stats: PlanCacheStats,
36}
37
38#[derive(Clone, Debug, Eq, Hash, PartialEq)]
39struct CacheKey {
40    source: Arc<str>,
41}
42
43impl Borrow<str> for CacheKey {
44    fn borrow(&self) -> &str {
45        &self.source
46    }
47}
48
49struct CachedPlan {
50    plan: Arc<ExecutionPlan>,
51    schema_version_at_plan: u64,
52}
53
54/// Shared LRU cache for non-CALL source-string execution plans.
55///
56/// The cache is caller-owned so embedders can share one
57/// `Arc<SharedPlanCache>` across short-lived sessions executing against the
58/// same graph. Keys include graph identity, schema epoch, procedure-registry
59/// epoch, source text, implementation-defined caps, and the optimizer
60/// index-selection mode, matching every setting currently baked into a lowered
61/// or optimized plan. Procedure-CALL-containing plans remain owned by
62/// [`crate::CallPlanCache`].
63pub struct SharedPlanCache {
64    inner: Mutex<SharedPlanCacheInner>,
65}
66
67struct SharedPlanCacheInner {
68    plans: LruCache<SharedPlanCacheKey, Arc<ExecutionPlan>>,
69    stats: SharedPlanCacheStats,
70}
71
72#[derive(Clone, Debug, Eq, Hash, PartialEq)]
73struct SharedPlanCacheKey {
74    graph_id: GraphId,
75    schema_version: u64,
76    registry_version: u64,
77    source: Arc<str>,
78    caps: ImplDefinedCaps,
79    index_selection: bool,
80}
81
82/// Counters for [`PlanCache`] lookup and eviction behavior.
83#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
84pub struct PlanCacheStats {
85    /// Successful source + schema-version lookups.
86    pub hits: u64,
87    /// Source keys not present in the cache.
88    pub misses: u64,
89    /// Entries found but invalidated because their schema version was stale.
90    pub stale_invalidations: u64,
91    /// Entries evicted by LRU capacity pressure.
92    pub capacity_evictions: u64,
93}
94
95/// Counters for [`SharedPlanCache`] lookup and eviction behavior.
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub struct SharedPlanCacheStats {
98    /// Successful key lookups.
99    pub hits: u64,
100    /// Keys not present in the cache.
101    pub misses: u64,
102    /// Entries evicted by LRU capacity pressure.
103    pub capacity_evictions: u64,
104}
105
106impl PlanCache {
107    /// Create an empty plan cache with the given entry capacity.
108    #[must_use]
109    pub fn new(capacity: NonZeroUsize) -> Self {
110        Self {
111            inner: LruCache::new(capacity),
112            stats: PlanCacheStats::default(),
113        }
114    }
115
116    pub(crate) fn get(&mut self, source: &str, schema_version: u64) -> Option<Arc<ExecutionPlan>> {
117        match self.inner.get(source) {
118            Some(cached) if cached.schema_version_at_plan == schema_version => {
119                let plan = Arc::clone(&cached.plan);
120                self.stats.hits = self.stats.hits.saturating_add(1);
121                trace_cache_event("hit", schema_version, source);
122                Some(plan)
123            }
124            Some(_) => {
125                self.inner.pop(source);
126                self.stats.stale_invalidations = self.stats.stale_invalidations.saturating_add(1);
127                trace_cache_event("stale", schema_version, source);
128                None
129            }
130            None => {
131                self.stats.misses = self.stats.misses.saturating_add(1);
132                trace_cache_event("miss", schema_version, source);
133                None
134            }
135        }
136    }
137
138    pub(crate) fn insert(
139        &mut self,
140        source: Arc<str>,
141        plan: Arc<ExecutionPlan>,
142        schema_version: u64,
143    ) {
144        if !is_cacheable(&plan) {
145            return;
146        }
147
148        let replacing_existing = self.inner.contains(source.as_ref());
149        let key = CacheKey { source };
150        let cached = CachedPlan {
151            plan,
152            schema_version_at_plan: schema_version,
153        };
154        if self.inner.push(key, cached).is_some() && !replacing_existing {
155            self.stats.capacity_evictions = self.stats.capacity_evictions.saturating_add(1);
156        }
157    }
158
159    /// Return a snapshot of the cache counters.
160    #[must_use]
161    pub const fn stats(&self) -> PlanCacheStats {
162        self.stats
163    }
164
165    /// Remove all cached plans while preserving accumulated counters.
166    pub fn clear(&mut self) {
167        self.inner.clear();
168    }
169}
170
171impl SharedPlanCache {
172    /// Create an empty shared source-plan cache with the given entry capacity.
173    #[must_use]
174    pub fn new(capacity: NonZeroUsize) -> Self {
175        Self {
176            inner: Mutex::new(SharedPlanCacheInner {
177                plans: LruCache::new(capacity),
178                stats: SharedPlanCacheStats::default(),
179            }),
180        }
181    }
182
183    pub(crate) fn get(&self, key: SharedPlanCacheLookup<'_>) -> Option<Arc<ExecutionPlan>> {
184        let mut inner = self.lock_inner();
185        let key = SharedPlanCacheKey::from_lookup(key);
186        match inner.plans.get(&key) {
187            Some(plan) => {
188                let plan = Arc::clone(plan);
189                inner.stats.hits = inner.stats.hits.saturating_add(1);
190                Some(plan)
191            }
192            None => {
193                inner.stats.misses = inner.stats.misses.saturating_add(1);
194                None
195            }
196        }
197    }
198
199    pub(crate) fn insert(&self, key: SharedPlanCacheInsert, plan: Arc<ExecutionPlan>) {
200        if !is_cacheable(&plan) {
201            return;
202        }
203
204        let mut inner = self.lock_inner();
205        let key = SharedPlanCacheKey::from_insert(key);
206        let replacing_existing = inner.plans.contains(&key);
207        if inner.plans.push(key, plan).is_some() && !replacing_existing {
208            inner.stats.capacity_evictions = inner.stats.capacity_evictions.saturating_add(1);
209        }
210    }
211
212    /// Return a snapshot of the cache counters.
213    #[must_use]
214    pub fn stats(&self) -> SharedPlanCacheStats {
215        self.lock_inner().stats
216    }
217
218    /// Remove all cached plans while preserving accumulated counters.
219    pub fn clear(&self) {
220        self.lock_inner().plans.clear();
221    }
222
223    fn lock_inner(&self) -> MutexGuard<'_, SharedPlanCacheInner> {
224        self.inner
225            .lock()
226            .unwrap_or_else(|poison| poison.into_inner())
227    }
228}
229
230pub(crate) struct SharedPlanCacheLookup<'a> {
231    pub(crate) graph_id: GraphId,
232    pub(crate) schema_version: u64,
233    pub(crate) registry_version: u64,
234    pub(crate) source: &'a str,
235    pub(crate) caps: ImplDefinedCaps,
236    pub(crate) index_selection: bool,
237}
238
239pub(crate) struct SharedPlanCacheInsert {
240    pub(crate) graph_id: GraphId,
241    pub(crate) schema_version: u64,
242    pub(crate) registry_version: u64,
243    pub(crate) source: Arc<str>,
244    pub(crate) caps: ImplDefinedCaps,
245    pub(crate) index_selection: bool,
246}
247
248impl SharedPlanCacheKey {
249    fn from_lookup(value: SharedPlanCacheLookup<'_>) -> Self {
250        Self {
251            graph_id: value.graph_id,
252            schema_version: value.schema_version,
253            registry_version: value.registry_version,
254            source: Arc::from(value.source),
255            caps: value.caps,
256            index_selection: value.index_selection,
257        }
258    }
259
260    fn from_insert(value: SharedPlanCacheInsert) -> Self {
261        Self {
262            graph_id: value.graph_id,
263            schema_version: value.schema_version,
264            registry_version: value.registry_version,
265            source: value.source,
266            caps: value.caps,
267            index_selection: value.index_selection,
268        }
269    }
270}
271
272fn is_cacheable(plan: &ExecutionPlan) -> bool {
273    !contains_call(plan)
274}
275
276fn contains_call(plan: &ExecutionPlan) -> bool {
277    plan.pipeline.iter().any(op_contains_call)
278        || plan.subqueries.iter().any(|subquery| match &subquery.body {
279            SubqueryBody::Pattern(_) => false,
280            SubqueryBody::Plan(plan) => contains_call(plan),
281        })
282}
283
284fn op_contains_call(op: &PipelineOp) -> bool {
285    match op {
286        PipelineOp::Call(_) => true,
287        PipelineOp::CallSubquery(subquery) => contains_call(&subquery.body),
288        PipelineOp::Union { rhs, .. }
289        | PipelineOp::Chain(rhs)
290        | PipelineOp::CorrelatedChain(rhs)
291        | PipelineOp::ExplainPlan { inner: rhs, .. } => contains_call(rhs),
292        PipelineOp::Match(_) | PipelineOp::OptionalMatch(_) => false,
293        _ => false,
294    }
295}
296
297fn trace_cache_event(result: &'static str, schema_version: u64, source: &str) {
298    #[cfg(feature = "plan-cache-source-tracing")]
299    tracing::debug!(
300        result,
301        schema_version,
302        source_len = source.len(),
303        source_prefix = %SourcePrefix(source),
304        "session plan cache lookup"
305    );
306
307    #[cfg(not(feature = "plan-cache-source-tracing"))]
308    tracing::debug!(
309        result,
310        schema_version,
311        source_len = source.len(),
312        "session plan cache lookup"
313    );
314}
315
316#[cfg(feature = "plan-cache-source-tracing")]
317struct SourcePrefix<'a>(&'a str);
318
319#[cfg(feature = "plan-cache-source-tracing")]
320impl std::fmt::Display for SourcePrefix<'_> {
321    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        let mut chars = self.0.chars();
323        let prefix = chars.by_ref().take(200).collect::<String>();
324        if chars.next().is_some() {
325            write!(formatter, "{prefix}...")
326        } else {
327            formatter.write_str(&prefix)
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use std::{num::NonZeroUsize, sync::Arc};
335
336    use selene_core::{DbString, db_string};
337
338    use super::*;
339    use crate::{
340        BindingTableSchema, EmptyProcedureRegistry, ExprId, ImplDefinedCaps, PipelineOpId,
341        PlannedCall, PlannedSubquery, ProcedureHandle, ProcedureMutability, ProcedureOutputSchema,
342        ProcedureTier, SourceSpan, StatementCategory, SubqueryBody, SubqueryKind, analyze::analyze,
343        parser::parse, plan::plan,
344    };
345
346    fn planned(source: &str) -> Arc<ExecutionPlan> {
347        let statement = parse(source).expect("test source parses");
348        let analyzed =
349            analyze(statement, &EmptyProcedureRegistry, None).expect("test source analyzes");
350        Arc::new(plan(&analyzed, &EmptyProcedureRegistry).expect("test source plans"))
351    }
352
353    fn admitted(value: &str) -> DbString {
354        db_string(value).expect("test name admits")
355    }
356
357    fn call_plan() -> Arc<ExecutionPlan> {
358        Arc::new(ExecutionPlan {
359            category: StatementCategory::ReadOnly,
360            pattern_plan: None,
361            pipeline: vec![PipelineOp::Call(PlannedCall {
362                optional: false,
363                procedure: Box::from([admitted("cache"), admitted("call")]),
364                handle: ProcedureHandle::new(1),
365                args: Vec::new(),
366                yield_cols: Vec::new(),
367                output_schema: ProcedureOutputSchema::default(),
368                yield_schema: Vec::new(),
369                tier: ProcedureTier::Graph,
370                mutability: ProcedureMutability::Read,
371                span: SourceSpan::default(),
372            })],
373            output_schema: BindingTableSchema {
374                columns: Vec::new(),
375            },
376            impl_defined_caps: ImplDefinedCaps::default(),
377            expr_ids: Default::default(),
378            subqueries: Default::default(),
379            next_expr_id: ExprId::new(0),
380            next_pipeline_op_id: PipelineOpId::new(1),
381        })
382    }
383
384    fn explain_call_plan() -> Arc<ExecutionPlan> {
385        let inner = call_plan();
386        Arc::new(ExecutionPlan {
387            category: StatementCategory::ReadOnly,
388            pattern_plan: None,
389            pipeline: vec![PipelineOp::ExplainPlan {
390                inner: Box::new(inner.as_ref().clone()),
391                span: SourceSpan::default(),
392            }],
393            output_schema: BindingTableSchema {
394                columns: Vec::new(),
395            },
396            impl_defined_caps: ImplDefinedCaps::default(),
397            expr_ids: Default::default(),
398            subqueries: Default::default(),
399            next_expr_id: ExprId::new(0),
400            next_pipeline_op_id: PipelineOpId::new(1),
401        })
402    }
403
404    fn expression_subquery_call_plan() -> Arc<ExecutionPlan> {
405        let mut plan = ExecutionPlan {
406            category: StatementCategory::ReadOnly,
407            pattern_plan: None,
408            pipeline: Vec::new(),
409            output_schema: BindingTableSchema {
410                columns: Vec::new(),
411            },
412            impl_defined_caps: ImplDefinedCaps::default(),
413            expr_ids: Default::default(),
414            subqueries: Default::default(),
415            next_expr_id: ExprId::new(0),
416            next_pipeline_op_id: PipelineOpId::new(0),
417        };
418        plan.subqueries.insert(
419            ExprId::new(7),
420            PlannedSubquery {
421                kind: SubqueryKind::Value,
422                body: SubqueryBody::Plan(Box::new(call_plan().as_ref().clone())),
423                outer_binding_refs: Vec::new(),
424                span: SourceSpan::default(),
425            },
426        );
427        Arc::new(plan)
428    }
429
430    #[test]
431    fn plan_cache_basic_hit_miss() {
432        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
433        assert!(cache.get("RETURN 1", 0).is_none());
434
435        cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
436        assert!(cache.get("RETURN 1", 0).is_some());
437
438        assert_eq!(
439            cache.stats(),
440            PlanCacheStats {
441                hits: 1,
442                misses: 1,
443                stale_invalidations: 0,
444                capacity_evictions: 0,
445            }
446        );
447    }
448
449    #[test]
450    fn plan_cache_lru_evicts_oldest() {
451        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
452        cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
453        cache.insert(Arc::from("RETURN 2"), planned("RETURN 2"), 0);
454        cache.insert(Arc::from("RETURN 3"), planned("RETURN 3"), 0);
455
456        assert!(cache.get("RETURN 1", 0).is_none());
457        assert!(cache.get("RETURN 2", 0).is_some());
458        assert!(cache.get("RETURN 3", 0).is_some());
459        assert_eq!(cache.stats().capacity_evictions, 1);
460    }
461
462    #[test]
463    fn plan_cache_schema_version_mismatch_is_miss() {
464        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
465        cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
466
467        assert!(cache.get("RETURN 1", 1).is_none());
468        assert_eq!(cache.stats().stale_invalidations, 1);
469        assert!(cache.get("RETURN 1", 1).is_none());
470        assert_eq!(cache.stats().misses, 1);
471    }
472
473    #[test]
474    fn plan_cache_clear_resets_state() {
475        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
476        cache.insert(Arc::from("RETURN 1"), planned("RETURN 1"), 0);
477        assert!(cache.get("RETURN 1", 0).is_some());
478
479        cache.clear();
480        assert!(cache.get("RETURN 1", 0).is_none());
481
482        assert_eq!(
483            cache.stats(),
484            PlanCacheStats {
485                hits: 1,
486                misses: 1,
487                stale_invalidations: 0,
488                capacity_evictions: 0,
489            }
490        );
491    }
492
493    #[test]
494    fn cache_skips_call_plans() {
495        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
496        cache.insert(Arc::from("CALL cache.call()"), call_plan(), 0);
497
498        assert!(cache.get("CALL cache.call()", 0).is_none());
499        assert_eq!(cache.stats().misses, 1);
500    }
501
502    #[test]
503    fn cache_skips_explain_call_plans() {
504        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
505        cache.insert(
506            Arc::from("EXPLAIN CALL cache.call()"),
507            explain_call_plan(),
508            0,
509        );
510
511        assert!(cache.get("EXPLAIN CALL cache.call()", 0).is_none());
512        assert_eq!(cache.stats().misses, 1);
513    }
514
515    #[test]
516    fn cache_skips_expression_subqueries_that_contain_calls() {
517        let mut cache = PlanCache::new(NonZeroUsize::new(2).unwrap());
518        cache.insert(
519            Arc::from("RETURN VALUE { CALL cache.call() RETURN 1 LIMIT 1 } AS v"),
520            expression_subquery_call_plan(),
521            0,
522        );
523
524        assert!(
525            cache
526                .get(
527                    "RETURN VALUE { CALL cache.call() RETURN 1 LIMIT 1 } AS v",
528                    0
529                )
530                .is_none()
531        );
532        assert_eq!(cache.stats().misses, 1);
533    }
534
535    #[test]
536    fn shared_plan_cache_hits_across_sessions() {
537        let cache = SharedPlanCache::new(NonZeroUsize::new(2).unwrap());
538        let lookup = |source| SharedPlanCacheLookup {
539            graph_id: selene_core::GraphId::new(7),
540            schema_version: 0,
541            registry_version: 0,
542            source,
543            caps: ImplDefinedCaps::DEFAULT,
544            index_selection: true,
545        };
546
547        assert!(cache.get(lookup("RETURN 1")).is_none());
548        cache.insert(
549            SharedPlanCacheInsert {
550                graph_id: selene_core::GraphId::new(7),
551                schema_version: 0,
552                registry_version: 0,
553                source: Arc::from("RETURN 1"),
554                caps: ImplDefinedCaps::DEFAULT,
555                index_selection: true,
556            },
557            planned("RETURN 1"),
558        );
559        assert!(cache.get(lookup("RETURN 1")).is_some());
560
561        assert_eq!(
562            cache.stats(),
563            SharedPlanCacheStats {
564                hits: 1,
565                misses: 1,
566                capacity_evictions: 0,
567            }
568        );
569    }
570
571    #[test]
572    fn shared_plan_cache_keys_planning_settings() {
573        let cache = SharedPlanCache::new(NonZeroUsize::new(4).unwrap());
574        let base = SharedPlanCacheInsert {
575            graph_id: selene_core::GraphId::new(7),
576            schema_version: 0,
577            registry_version: 0,
578            source: Arc::from("RETURN 1"),
579            caps: ImplDefinedCaps::DEFAULT,
580            index_selection: true,
581        };
582        cache.insert(base, planned("RETURN 1"));
583
584        assert!(
585            cache
586                .get(SharedPlanCacheLookup {
587                    graph_id: selene_core::GraphId::new(7),
588                    schema_version: 0,
589                    registry_version: 0,
590                    source: "RETURN 1",
591                    caps: ImplDefinedCaps::DEFAULT,
592                    index_selection: true,
593                })
594                .is_some()
595        );
596        assert!(
597            cache
598                .get(SharedPlanCacheLookup {
599                    graph_id: selene_core::GraphId::new(7),
600                    schema_version: 0,
601                    registry_version: 0,
602                    source: "RETURN 1",
603                    caps: ImplDefinedCaps::DEFAULT.with_max_list_length(1),
604                    index_selection: true,
605                })
606                .is_none()
607        );
608        assert!(
609            cache
610                .get(SharedPlanCacheLookup {
611                    graph_id: selene_core::GraphId::new(7),
612                    schema_version: 0,
613                    registry_version: 0,
614                    source: "RETURN 1",
615                    caps: ImplDefinedCaps::DEFAULT,
616                    index_selection: false,
617                })
618                .is_none()
619        );
620    }
621}