Skip to main content

selene_gql/runtime/
call_plan_cache.rs

1//! Shared execution-plan cache for procedure-call-rooted statements.
2
3use std::{
4    num::NonZeroUsize,
5    sync::{Arc, Mutex, MutexGuard},
6};
7
8use lru::LruCache;
9use selene_core::GraphId;
10
11use crate::{
12    ExecutionPlan, PipelineStatement,
13    ast::{Statement, format_procedure_call, format_read_statement},
14};
15
16/// Shared LRU cache for procedure-call execution plans.
17///
18/// The cache is caller-owned so an embedder can share one
19/// `Arc<CallPlanCache>` across all sessions executing against the same graph.
20pub struct CallPlanCache {
21    inner: Mutex<CallPlanCacheInner>,
22}
23
24struct CallPlanCacheInner {
25    plans: LruCache<CallPlanKey, Arc<ExecutionPlan>>,
26    source_index: LruCache<Arc<str>, Vec<CallPlanSourceEntry>>,
27    stats: CallPlanCacheStats,
28}
29
30/// Stable key for a cached procedure-call plan.
31#[derive(Clone, Debug, Eq, Hash, PartialEq)]
32pub struct CallPlanKey {
33    graph_id: GraphId,
34    schema_version: u64,
35    registry_version: u64,
36    canonical_source: Arc<str>,
37}
38
39#[derive(Clone, Debug, Eq, PartialEq)]
40struct CallPlanSourceEntry {
41    graph_id: GraphId,
42    schema_version: u64,
43    registry_version: u64,
44    key: CallPlanKey,
45}
46
47/// Counters for [`CallPlanCache`] lookup and eviction behavior.
48#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
49pub struct CallPlanCacheStats {
50    /// Successful key lookups.
51    pub hits: u64,
52    /// Keys not present in the cache.
53    pub misses: u64,
54    /// Entries evicted by LRU capacity pressure.
55    pub capacity_evictions: u64,
56}
57
58impl CallPlanCache {
59    /// Create an empty shared CALL plan cache with the given entry capacity.
60    #[must_use]
61    pub fn new(capacity: NonZeroUsize) -> Self {
62        Self {
63            inner: Mutex::new(CallPlanCacheInner {
64                plans: LruCache::new(capacity),
65                source_index: LruCache::new(capacity),
66                stats: CallPlanCacheStats::default(),
67            }),
68        }
69    }
70
71    pub(crate) fn get_source(
72        &self,
73        graph_id: GraphId,
74        schema_version: u64,
75        registry_version: u64,
76        source: &str,
77    ) -> Option<Arc<ExecutionPlan>> {
78        let mut inner = self.lock_inner();
79        let Some(key) = inner.source_index.get(source).and_then(|entries| {
80            entries
81                .iter()
82                .find(|entry| {
83                    entry.graph_id == graph_id
84                        && entry.schema_version == schema_version
85                        && entry.registry_version == registry_version
86                })
87                .map(|entry| entry.key.clone())
88        }) else {
89            inner.stats.misses = inner.stats.misses.saturating_add(1);
90            return None;
91        };
92        match inner.plans.get(&key) {
93            Some(plan) => {
94                let plan = Arc::clone(plan);
95                inner.stats.hits = inner.stats.hits.saturating_add(1);
96                Some(plan)
97            }
98            None => {
99                remove_source_entry(
100                    &mut inner,
101                    source,
102                    graph_id,
103                    schema_version,
104                    registry_version,
105                );
106                inner.stats.misses = inner.stats.misses.saturating_add(1);
107                None
108            }
109        }
110    }
111
112    pub(crate) fn get(&self, key: &CallPlanKey) -> Option<Arc<ExecutionPlan>> {
113        let mut inner = self.lock_inner();
114        match inner.plans.get(key) {
115            Some(plan) => {
116                let plan = Arc::clone(plan);
117                inner.stats.hits = inner.stats.hits.saturating_add(1);
118                Some(plan)
119            }
120            None => {
121                inner.stats.misses = inner.stats.misses.saturating_add(1);
122                None
123            }
124        }
125    }
126
127    pub(crate) fn insert_with_source(
128        &self,
129        key: CallPlanKey,
130        source: Arc<str>,
131        plan: Arc<ExecutionPlan>,
132    ) {
133        self.insert_inner(key, Some(source), plan);
134    }
135
136    fn insert_inner(&self, key: CallPlanKey, source: Option<Arc<str>>, plan: Arc<ExecutionPlan>) {
137        let mut inner = self.lock_inner();
138        let replacing_existing = inner.plans.contains(&key);
139        if inner.plans.push(key.clone(), plan).is_some() && !replacing_existing {
140            inner.stats.capacity_evictions = inner.stats.capacity_evictions.saturating_add(1);
141        }
142        if let Some(source) = source {
143            let entry = CallPlanSourceEntry {
144                graph_id: key.graph_id,
145                schema_version: key.schema_version,
146                registry_version: key.registry_version,
147                key,
148            };
149            match inner.source_index.get_mut(source.as_ref()) {
150                Some(entries) => {
151                    if let Some(existing) = entries.iter_mut().find(|existing| {
152                        existing.graph_id == entry.graph_id
153                            && existing.schema_version == entry.schema_version
154                            && existing.registry_version == entry.registry_version
155                    }) {
156                        *existing = entry;
157                    } else {
158                        entries.push(entry);
159                    }
160                }
161                None => {
162                    inner.source_index.push(source, vec![entry]);
163                }
164            }
165        }
166    }
167
168    /// Return a snapshot of the cache counters.
169    #[must_use]
170    pub fn stats(&self) -> CallPlanCacheStats {
171        self.lock_inner().stats
172    }
173
174    /// Remove all cached plans while preserving accumulated counters.
175    pub fn clear(&self) {
176        let mut inner = self.lock_inner();
177        inner.plans.clear();
178        inner.source_index.clear();
179    }
180
181    fn lock_inner(&self) -> MutexGuard<'_, CallPlanCacheInner> {
182        self.inner
183            .lock()
184            .unwrap_or_else(|poison| poison.into_inner())
185    }
186}
187
188fn remove_source_entry(
189    inner: &mut CallPlanCacheInner,
190    source: &str,
191    graph_id: GraphId,
192    schema_version: u64,
193    registry_version: u64,
194) {
195    let Some(entries) = inner.source_index.get_mut(source) else {
196        return;
197    };
198    entries.retain(|entry| {
199        !(entry.graph_id == graph_id
200            && entry.schema_version == schema_version
201            && entry.registry_version == registry_version)
202    });
203    if entries.is_empty() {
204        inner.source_index.pop(source);
205    }
206}
207
208impl CallPlanKey {
209    pub(crate) fn for_statement(
210        graph_id: GraphId,
211        schema_version: u64,
212        registry_version: u64,
213        statement: &Statement,
214    ) -> Option<Self> {
215        let canonical_source = canonical_call_source(statement)?;
216        Some(Self {
217            graph_id,
218            schema_version,
219            registry_version,
220            canonical_source: Arc::from(canonical_source),
221        })
222    }
223
224    /// Return the graph identity carried by this cache key.
225    #[must_use]
226    pub const fn graph_id(&self) -> GraphId {
227        self.graph_id
228    }
229
230    /// Return the schema-version epoch carried by this cache key.
231    #[must_use]
232    pub const fn schema_version(&self) -> u64 {
233        self.schema_version
234    }
235
236    /// Return the procedure-registry epoch carried by this cache key.
237    #[must_use]
238    pub const fn registry_version(&self) -> u64 {
239        self.registry_version
240    }
241
242    /// Return the canonical CALL source carried by this cache key.
243    #[must_use]
244    pub fn canonical_source(&self) -> &str {
245        &self.canonical_source
246    }
247}
248
249fn canonical_call_source(statement: &Statement) -> Option<String> {
250    match statement {
251        Statement::Call(call) => format_procedure_call(call).ok(),
252        // A CALL-rooted query pipeline canonicalizes through the read-side
253        // formatter. The structural test is allocation-free; the statement is
254        // formatted exactly once (here), with `.ok()` propagating a format
255        // failure as a cache miss.
256        Statement::Query(pipeline) if is_call_rooted_pipeline(pipeline) => {
257            format_read_statement(statement).ok()
258        }
259        _ => None,
260    }
261}
262
263fn is_call_rooted_pipeline(pipeline: &crate::QueryPipeline) -> bool {
264    matches!(
265        pipeline.statements.as_slice(),
266        [PipelineStatement::Call(_)] | [PipelineStatement::Call(_), PipelineStatement::Return(_)]
267    )
268}
269
270#[cfg(test)]
271mod tests {
272    use std::{num::NonZeroUsize, sync::Arc};
273
274    use selene_core::GraphId;
275
276    use super::*;
277    use crate::{
278        EmptyProcedureRegistry, ExecutionPlan, analyze, ast::format_procedure_call, parser::parse,
279        plan,
280    };
281
282    fn key(source: &str) -> CallPlanKey {
283        key_with_registry(source, 11)
284    }
285
286    fn key_with_registry(source: &str, registry_version: u64) -> CallPlanKey {
287        let statement = parse(source).expect("source parses");
288        CallPlanKey::for_statement(GraphId::new(7), 3, registry_version, &statement)
289            .expect("source produces CALL cache key")
290    }
291
292    fn plan_for(source: &str) -> Arc<ExecutionPlan> {
293        let statement = parse(source).expect("source parses");
294        let analyzed = analyze(statement, &EmptyProcedureRegistry, None).expect("source analyzes");
295        Arc::new(plan(&analyzed, &EmptyProcedureRegistry).expect("source plans"))
296    }
297
298    #[test]
299    fn call_plan_cache_keys_arg_shape_and_yield_distinctly() {
300        let arg_shape = key("CALL cache.echo(1 + 2) YIELD out");
301        let arg_value = key("CALL cache.echo(3) YIELD out");
302        let yield_order = key("CALL cache.echo() YIELD a, b");
303        let yield_order_reversed = key("CALL cache.echo() YIELD b, a");
304        let yield_alias = key("CALL cache.echo() YIELD out AS alias");
305
306        assert_ne!(arg_shape, arg_value);
307        assert_ne!(yield_order, yield_order_reversed);
308        assert_ne!(key("CALL cache.echo() YIELD out"), yield_alias);
309        assert_ne!(
310            key("CALL cache.echo($p)"),
311            key("CALL cache.echo($p :: INT)")
312        );
313        assert_ne!(
314            key("CALL cache.echo($p :: INT)"),
315            key("CALL cache.echo($p :: STRING)")
316        );
317        assert_eq!(
318            key("CALL cache.echo($p :: INT)").canonical_source(),
319            "CALL cache.echo($p :: INTEGER)"
320        );
321
322        let statement =
323            parse("CALL cache.echo(1 + 2, $p) YIELD out AS alias").expect("source parses");
324        let Statement::Call(call) = statement else {
325            panic!("expected top-level CALL");
326        };
327        let formatted = format_procedure_call(&call).expect("procedure call formats");
328        assert_eq!(formatted, "CALL cache.echo((1 + 2), $p) YIELD out AS alias");
329    }
330
331    #[test]
332    fn call_plan_key_canonicalizes_whitespace() {
333        let compact = key("CALL cache.echo(1+2) YIELD out");
334        let spaced = key("CALL cache.echo(1 + 2) YIELD out");
335
336        assert_eq!(compact, spaced);
337        assert_eq!(
338            compact.canonical_source(),
339            "CALL cache.echo((1 + 2)) YIELD out"
340        );
341    }
342
343    #[test]
344    fn embedded_pipeline_call_is_not_keyed() {
345        let statement =
346            parse("MATCH (n) CALL cache.echo(n) YIELD out RETURN out").expect("source parses");
347
348        assert!(CallPlanKey::for_statement(GraphId::new(7), 3, 11, &statement).is_none());
349    }
350
351    #[test]
352    fn key_carries_graph_id_schema_version_and_registry_version() {
353        let statement = parse("CALL cache.echo()").expect("source parses");
354        let graph_one = CallPlanKey::for_statement(GraphId::new(1), 0, 11, &statement)
355            .expect("source produces key");
356        let graph_two = CallPlanKey::for_statement(GraphId::new(2), 0, 11, &statement)
357            .expect("source produces key");
358        let schema_one = CallPlanKey::for_statement(GraphId::new(1), 1, 11, &statement)
359            .expect("source produces key");
360        let registry_one = CallPlanKey::for_statement(GraphId::new(1), 0, 12, &statement)
361            .expect("source produces key");
362
363        assert_ne!(graph_one, graph_two);
364        assert_ne!(graph_one, schema_one);
365        assert_ne!(graph_one, registry_one);
366        assert_eq!(graph_one.graph_id(), GraphId::new(1));
367        assert_eq!(graph_one.schema_version(), 0);
368        assert_eq!(graph_one.registry_version(), 11);
369    }
370
371    #[test]
372    fn call_plan_cache_tracks_hits_misses_and_evictions() {
373        let cache = CallPlanCache::new(NonZeroUsize::new(1).expect("nonzero"));
374        let first_key = key("CALL cache.one()");
375        let second_key = key("CALL cache.two()");
376
377        assert!(cache.get(&first_key).is_none());
378        cache.insert_with_source(
379            first_key.clone(),
380            Arc::from("CALL cache.one()"),
381            plan_for("RETURN 1"),
382        );
383        assert!(cache.get(&first_key).is_some());
384        cache.insert_with_source(
385            second_key,
386            Arc::from("CALL cache.two()"),
387            plan_for("RETURN 2"),
388        );
389        assert!(cache.get(&first_key).is_none());
390
391        assert_eq!(
392            cache.stats(),
393            CallPlanCacheStats {
394                hits: 1,
395                misses: 2,
396                capacity_evictions: 1,
397            }
398        );
399    }
400
401    #[test]
402    fn call_plan_cache_source_fast_path_hits_existing_plan() {
403        let cache = CallPlanCache::new(NonZeroUsize::new(2).expect("nonzero"));
404        let source = Arc::<str>::from("CALL cache.one()");
405        let key = key(&source);
406
407        cache.insert_with_source(key, Arc::clone(&source), plan_for("RETURN 1"));
408
409        assert!(
410            cache
411                .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
412                .is_some()
413        );
414        assert_eq!(cache.stats().hits, 1);
415    }
416
417    #[test]
418    fn call_plan_cache_source_misses_are_recorded() {
419        let cache = CallPlanCache::new(NonZeroUsize::new(2).expect("nonzero"));
420        let source = Arc::<str>::from("CALL cache.one()");
421        let key = key(&source);
422
423        assert!(
424            cache
425                .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
426                .is_none()
427        );
428        cache.insert_with_source(key, Arc::clone(&source), plan_for("RETURN 1"));
429        assert!(
430            cache
431                .get_source(GraphId::new(7), 3, 12, "CALL cache.one()")
432                .is_none()
433        );
434
435        assert_eq!(cache.stats().misses, 2);
436    }
437
438    #[test]
439    fn call_plan_cache_stale_source_entries_are_recorded_as_misses() {
440        let cache = CallPlanCache::new(NonZeroUsize::new(1).expect("nonzero"));
441        let source = Arc::<str>::from("CALL cache.one()");
442        let old_key = key_with_registry(&source, 11);
443        let new_key = key_with_registry(&source, 12);
444
445        cache.insert_with_source(old_key, Arc::clone(&source), plan_for("RETURN 1"));
446        cache.insert_with_source(new_key, Arc::clone(&source), plan_for("RETURN 2"));
447
448        assert!(
449            cache
450                .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
451                .is_none()
452        );
453        assert!(
454            cache
455                .get_source(GraphId::new(7), 3, 12, "CALL cache.one()")
456                .is_some()
457        );
458
459        assert_eq!(
460            cache.stats(),
461            CallPlanCacheStats {
462                hits: 1,
463                misses: 1,
464                capacity_evictions: 1,
465            }
466        );
467    }
468}