1use std::{
4 num::NonZeroUsize,
5 sync::{Arc, Mutex, MutexGuard},
6};
7
8use lru::LruCache;
9use selene_core::GraphId;
10
11use crate::{
12 ExecutionPlan, PipelineStatement,
13 ast::{Statement, format_procedure_call, format_read_statement},
14};
15
16pub struct CallPlanCache {
21 inner: Mutex<CallPlanCacheInner>,
22}
23
24struct CallPlanCacheInner {
25 plans: LruCache<CallPlanKey, Arc<ExecutionPlan>>,
26 source_index: LruCache<Arc<str>, Vec<CallPlanSourceEntry>>,
27 stats: CallPlanCacheStats,
28}
29
30#[derive(Clone, Debug, Eq, Hash, PartialEq)]
32pub struct CallPlanKey {
33 graph_id: GraphId,
34 schema_version: u64,
35 registry_version: u64,
36 canonical_source: Arc<str>,
37}
38
39#[derive(Clone, Debug, Eq, PartialEq)]
40struct CallPlanSourceEntry {
41 graph_id: GraphId,
42 schema_version: u64,
43 registry_version: u64,
44 key: CallPlanKey,
45}
46
47#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
49pub struct CallPlanCacheStats {
50 pub hits: u64,
52 pub misses: u64,
54 pub capacity_evictions: u64,
56}
57
58impl CallPlanCache {
59 #[must_use]
61 pub fn new(capacity: NonZeroUsize) -> Self {
62 Self {
63 inner: Mutex::new(CallPlanCacheInner {
64 plans: LruCache::new(capacity),
65 source_index: LruCache::new(capacity),
66 stats: CallPlanCacheStats::default(),
67 }),
68 }
69 }
70
71 pub(crate) fn get_source(
72 &self,
73 graph_id: GraphId,
74 schema_version: u64,
75 registry_version: u64,
76 source: &str,
77 ) -> Option<Arc<ExecutionPlan>> {
78 let mut inner = self.lock_inner();
79 let Some(key) = inner.source_index.get(source).and_then(|entries| {
80 entries
81 .iter()
82 .find(|entry| {
83 entry.graph_id == graph_id
84 && entry.schema_version == schema_version
85 && entry.registry_version == registry_version
86 })
87 .map(|entry| entry.key.clone())
88 }) else {
89 inner.stats.misses = inner.stats.misses.saturating_add(1);
90 return None;
91 };
92 match inner.plans.get(&key) {
93 Some(plan) => {
94 let plan = Arc::clone(plan);
95 inner.stats.hits = inner.stats.hits.saturating_add(1);
96 Some(plan)
97 }
98 None => {
99 remove_source_entry(
100 &mut inner,
101 source,
102 graph_id,
103 schema_version,
104 registry_version,
105 );
106 inner.stats.misses = inner.stats.misses.saturating_add(1);
107 None
108 }
109 }
110 }
111
112 pub(crate) fn get(&self, key: &CallPlanKey) -> Option<Arc<ExecutionPlan>> {
113 let mut inner = self.lock_inner();
114 match inner.plans.get(key) {
115 Some(plan) => {
116 let plan = Arc::clone(plan);
117 inner.stats.hits = inner.stats.hits.saturating_add(1);
118 Some(plan)
119 }
120 None => {
121 inner.stats.misses = inner.stats.misses.saturating_add(1);
122 None
123 }
124 }
125 }
126
127 pub(crate) fn insert_with_source(
128 &self,
129 key: CallPlanKey,
130 source: Arc<str>,
131 plan: Arc<ExecutionPlan>,
132 ) {
133 self.insert_inner(key, Some(source), plan);
134 }
135
136 fn insert_inner(&self, key: CallPlanKey, source: Option<Arc<str>>, plan: Arc<ExecutionPlan>) {
137 let mut inner = self.lock_inner();
138 let replacing_existing = inner.plans.contains(&key);
139 if inner.plans.push(key.clone(), plan).is_some() && !replacing_existing {
140 inner.stats.capacity_evictions = inner.stats.capacity_evictions.saturating_add(1);
141 }
142 if let Some(source) = source {
143 let entry = CallPlanSourceEntry {
144 graph_id: key.graph_id,
145 schema_version: key.schema_version,
146 registry_version: key.registry_version,
147 key,
148 };
149 match inner.source_index.get_mut(source.as_ref()) {
150 Some(entries) => {
151 if let Some(existing) = entries.iter_mut().find(|existing| {
152 existing.graph_id == entry.graph_id
153 && existing.schema_version == entry.schema_version
154 && existing.registry_version == entry.registry_version
155 }) {
156 *existing = entry;
157 } else {
158 entries.push(entry);
159 }
160 }
161 None => {
162 inner.source_index.push(source, vec![entry]);
163 }
164 }
165 }
166 }
167
168 #[must_use]
170 pub fn stats(&self) -> CallPlanCacheStats {
171 self.lock_inner().stats
172 }
173
174 pub fn clear(&self) {
176 let mut inner = self.lock_inner();
177 inner.plans.clear();
178 inner.source_index.clear();
179 }
180
181 fn lock_inner(&self) -> MutexGuard<'_, CallPlanCacheInner> {
182 self.inner
183 .lock()
184 .unwrap_or_else(|poison| poison.into_inner())
185 }
186}
187
188fn remove_source_entry(
189 inner: &mut CallPlanCacheInner,
190 source: &str,
191 graph_id: GraphId,
192 schema_version: u64,
193 registry_version: u64,
194) {
195 let Some(entries) = inner.source_index.get_mut(source) else {
196 return;
197 };
198 entries.retain(|entry| {
199 !(entry.graph_id == graph_id
200 && entry.schema_version == schema_version
201 && entry.registry_version == registry_version)
202 });
203 if entries.is_empty() {
204 inner.source_index.pop(source);
205 }
206}
207
208impl CallPlanKey {
209 pub(crate) fn for_statement(
210 graph_id: GraphId,
211 schema_version: u64,
212 registry_version: u64,
213 statement: &Statement,
214 ) -> Option<Self> {
215 let canonical_source = canonical_call_source(statement)?;
216 Some(Self {
217 graph_id,
218 schema_version,
219 registry_version,
220 canonical_source: Arc::from(canonical_source),
221 })
222 }
223
224 #[must_use]
226 pub const fn graph_id(&self) -> GraphId {
227 self.graph_id
228 }
229
230 #[must_use]
232 pub const fn schema_version(&self) -> u64 {
233 self.schema_version
234 }
235
236 #[must_use]
238 pub const fn registry_version(&self) -> u64 {
239 self.registry_version
240 }
241
242 #[must_use]
244 pub fn canonical_source(&self) -> &str {
245 &self.canonical_source
246 }
247}
248
249fn canonical_call_source(statement: &Statement) -> Option<String> {
250 match statement {
251 Statement::Call(call) => format_procedure_call(call).ok(),
252 Statement::Query(pipeline) if is_call_rooted_pipeline(pipeline) => {
257 format_read_statement(statement).ok()
258 }
259 _ => None,
260 }
261}
262
263fn is_call_rooted_pipeline(pipeline: &crate::QueryPipeline) -> bool {
264 matches!(
265 pipeline.statements.as_slice(),
266 [PipelineStatement::Call(_)] | [PipelineStatement::Call(_), PipelineStatement::Return(_)]
267 )
268}
269
270#[cfg(test)]
271mod tests {
272 use std::{num::NonZeroUsize, sync::Arc};
273
274 use selene_core::GraphId;
275
276 use super::*;
277 use crate::{
278 EmptyProcedureRegistry, ExecutionPlan, analyze, ast::format_procedure_call, parser::parse,
279 plan,
280 };
281
282 fn key(source: &str) -> CallPlanKey {
283 key_with_registry(source, 11)
284 }
285
286 fn key_with_registry(source: &str, registry_version: u64) -> CallPlanKey {
287 let statement = parse(source).expect("source parses");
288 CallPlanKey::for_statement(GraphId::new(7), 3, registry_version, &statement)
289 .expect("source produces CALL cache key")
290 }
291
292 fn plan_for(source: &str) -> Arc<ExecutionPlan> {
293 let statement = parse(source).expect("source parses");
294 let analyzed = analyze(statement, &EmptyProcedureRegistry, None).expect("source analyzes");
295 Arc::new(plan(&analyzed, &EmptyProcedureRegistry).expect("source plans"))
296 }
297
298 #[test]
299 fn call_plan_cache_keys_arg_shape_and_yield_distinctly() {
300 let arg_shape = key("CALL cache.echo(1 + 2) YIELD out");
301 let arg_value = key("CALL cache.echo(3) YIELD out");
302 let yield_order = key("CALL cache.echo() YIELD a, b");
303 let yield_order_reversed = key("CALL cache.echo() YIELD b, a");
304 let yield_alias = key("CALL cache.echo() YIELD out AS alias");
305
306 assert_ne!(arg_shape, arg_value);
307 assert_ne!(yield_order, yield_order_reversed);
308 assert_ne!(key("CALL cache.echo() YIELD out"), yield_alias);
309 assert_ne!(
310 key("CALL cache.echo($p)"),
311 key("CALL cache.echo($p :: INT)")
312 );
313 assert_ne!(
314 key("CALL cache.echo($p :: INT)"),
315 key("CALL cache.echo($p :: STRING)")
316 );
317 assert_eq!(
318 key("CALL cache.echo($p :: INT)").canonical_source(),
319 "CALL cache.echo($p :: INTEGER)"
320 );
321
322 let statement =
323 parse("CALL cache.echo(1 + 2, $p) YIELD out AS alias").expect("source parses");
324 let Statement::Call(call) = statement else {
325 panic!("expected top-level CALL");
326 };
327 let formatted = format_procedure_call(&call).expect("procedure call formats");
328 assert_eq!(formatted, "CALL cache.echo((1 + 2), $p) YIELD out AS alias");
329 }
330
331 #[test]
332 fn call_plan_key_canonicalizes_whitespace() {
333 let compact = key("CALL cache.echo(1+2) YIELD out");
334 let spaced = key("CALL cache.echo(1 + 2) YIELD out");
335
336 assert_eq!(compact, spaced);
337 assert_eq!(
338 compact.canonical_source(),
339 "CALL cache.echo((1 + 2)) YIELD out"
340 );
341 }
342
343 #[test]
344 fn embedded_pipeline_call_is_not_keyed() {
345 let statement =
346 parse("MATCH (n) CALL cache.echo(n) YIELD out RETURN out").expect("source parses");
347
348 assert!(CallPlanKey::for_statement(GraphId::new(7), 3, 11, &statement).is_none());
349 }
350
351 #[test]
352 fn key_carries_graph_id_schema_version_and_registry_version() {
353 let statement = parse("CALL cache.echo()").expect("source parses");
354 let graph_one = CallPlanKey::for_statement(GraphId::new(1), 0, 11, &statement)
355 .expect("source produces key");
356 let graph_two = CallPlanKey::for_statement(GraphId::new(2), 0, 11, &statement)
357 .expect("source produces key");
358 let schema_one = CallPlanKey::for_statement(GraphId::new(1), 1, 11, &statement)
359 .expect("source produces key");
360 let registry_one = CallPlanKey::for_statement(GraphId::new(1), 0, 12, &statement)
361 .expect("source produces key");
362
363 assert_ne!(graph_one, graph_two);
364 assert_ne!(graph_one, schema_one);
365 assert_ne!(graph_one, registry_one);
366 assert_eq!(graph_one.graph_id(), GraphId::new(1));
367 assert_eq!(graph_one.schema_version(), 0);
368 assert_eq!(graph_one.registry_version(), 11);
369 }
370
371 #[test]
372 fn call_plan_cache_tracks_hits_misses_and_evictions() {
373 let cache = CallPlanCache::new(NonZeroUsize::new(1).expect("nonzero"));
374 let first_key = key("CALL cache.one()");
375 let second_key = key("CALL cache.two()");
376
377 assert!(cache.get(&first_key).is_none());
378 cache.insert_with_source(
379 first_key.clone(),
380 Arc::from("CALL cache.one()"),
381 plan_for("RETURN 1"),
382 );
383 assert!(cache.get(&first_key).is_some());
384 cache.insert_with_source(
385 second_key,
386 Arc::from("CALL cache.two()"),
387 plan_for("RETURN 2"),
388 );
389 assert!(cache.get(&first_key).is_none());
390
391 assert_eq!(
392 cache.stats(),
393 CallPlanCacheStats {
394 hits: 1,
395 misses: 2,
396 capacity_evictions: 1,
397 }
398 );
399 }
400
401 #[test]
402 fn call_plan_cache_source_fast_path_hits_existing_plan() {
403 let cache = CallPlanCache::new(NonZeroUsize::new(2).expect("nonzero"));
404 let source = Arc::<str>::from("CALL cache.one()");
405 let key = key(&source);
406
407 cache.insert_with_source(key, Arc::clone(&source), plan_for("RETURN 1"));
408
409 assert!(
410 cache
411 .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
412 .is_some()
413 );
414 assert_eq!(cache.stats().hits, 1);
415 }
416
417 #[test]
418 fn call_plan_cache_source_misses_are_recorded() {
419 let cache = CallPlanCache::new(NonZeroUsize::new(2).expect("nonzero"));
420 let source = Arc::<str>::from("CALL cache.one()");
421 let key = key(&source);
422
423 assert!(
424 cache
425 .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
426 .is_none()
427 );
428 cache.insert_with_source(key, Arc::clone(&source), plan_for("RETURN 1"));
429 assert!(
430 cache
431 .get_source(GraphId::new(7), 3, 12, "CALL cache.one()")
432 .is_none()
433 );
434
435 assert_eq!(cache.stats().misses, 2);
436 }
437
438 #[test]
439 fn call_plan_cache_stale_source_entries_are_recorded_as_misses() {
440 let cache = CallPlanCache::new(NonZeroUsize::new(1).expect("nonzero"));
441 let source = Arc::<str>::from("CALL cache.one()");
442 let old_key = key_with_registry(&source, 11);
443 let new_key = key_with_registry(&source, 12);
444
445 cache.insert_with_source(old_key, Arc::clone(&source), plan_for("RETURN 1"));
446 cache.insert_with_source(new_key, Arc::clone(&source), plan_for("RETURN 2"));
447
448 assert!(
449 cache
450 .get_source(GraphId::new(7), 3, 11, "CALL cache.one()")
451 .is_none()
452 );
453 assert!(
454 cache
455 .get_source(GraphId::new(7), 3, 12, "CALL cache.one()")
456 .is_some()
457 );
458
459 assert_eq!(
460 cache.stats(),
461 CallPlanCacheStats {
462 hits: 1,
463 misses: 1,
464 capacity_evictions: 1,
465 }
466 );
467 }
468}