1use std::fmt;
4
5use super::ast::*;
6use crate::db::DbStats;
7
8#[derive(Debug, Clone, PartialEq)]
12pub struct QueryPlan {
13 pub steps: Vec<PlanStep>,
14 pub verb: PlanVerb,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum PlanVerb {
20 Recall,
21 Think,
22 Remember,
23 Forget,
24 Correct,
25 Supersede,
26 Merge,
27 Retract,
28 Connect,
29 Inspect,
30 History,
31 Trace,
32 Consolidate,
33 Watch,
34 Other,
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub struct PlanStep {
40 pub op: PlanOp,
41 pub estimated_cost: f64,
42 pub estimated_cardinality: u64,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum IndexHint {
49 IvfHnsw,
51 BTree,
53 Bitmap,
55 Fts,
57 LabelList,
59 SeqScan,
61 Auto,
63}
64
65#[derive(Debug, Clone, PartialEq)]
67pub enum PlanOp {
68 LayerFilter {
70 layers: Vec<hirn_core::types::Layer>,
71 },
72 NamespaceFilter { namespace: String },
74 TemporalFilter { temporal: TemporalClause },
76 ImportanceFilter { op: ComparisonOp, threshold: f64 },
78 ConfidenceFilter { op: ComparisonOp, threshold: f64 },
80 EntityFilter { entities: Vec<String> },
82 ConditionFilter { condition: WhereCondition },
84 VectorSearch {
86 query: String,
87 limit: usize,
88 index_hint: IndexHint,
89 },
90 GraphExpand {
92 depth: usize,
93 min_weight: Option<f32>,
94 activation: Option<ActivationModeAst>,
95 },
96 ActivationPass {
98 decay: f64,
99 inhibition: f64,
100 max_iter: usize,
101 },
102 CausalTraverse { depth: usize },
104 ScoreAndRank,
106 Aggregate {
108 field: String,
109 function: AggFunction,
110 },
111 Project { fields: Vec<String> },
113 FormatOutput { format: OutputFormat },
115 ContextAssemble { budget: usize, format: OutputFormat },
117 LimitResults { n: usize },
119 ResolveActiveHead { target: String },
121 CorrectRecord { target: String, fields: Vec<String> },
123 SupersedeRecord { target: String, fields: Vec<String> },
125 MergeMemory {
127 target: String,
128 sources: Vec<String>,
129 fields: Vec<String>,
130 },
131 RetractRecord { target: String },
133 InspectRecord { target: String },
135 HistoryRecord { target: String },
137 TraceProvenance { target: String },
139 SubqueryResolve { field: String, subquery: Subquery },
141 TimeTravelFilter { snapshot: RecallSnapshotAst },
143}
144
145pub fn plan(stmt: &Statement, stats: Option<&DbStats>) -> QueryPlan {
152 match stmt {
153 Statement::Recall(r) => plan_recall(r, stats),
154 Statement::Think(t) => plan_think(t, stats),
155 Statement::Correct(c) => plan_correct(c),
156 Statement::Supersede(s) => plan_supersede(s),
157 Statement::MergeMemory(m) => plan_merge_memory(m),
158 Statement::Retract(r) => plan_retract(r),
159 Statement::Inspect(i) => plan_inspect(i),
160 Statement::History(h) => plan_history(h),
161 Statement::Trace(t) => plan_trace(t),
162 Statement::Traverse(t) => plan_traverse(t),
163 Statement::Explain(e) => plan(&e.inner, stats),
164 Statement::CreateRealm(_)
165 | Statement::DropRealm(_)
166 | Statement::Grant(_)
167 | Statement::Revoke(_)
168 | Statement::ShowPolicies(_)
169 | Statement::ExplainPolicy(_)
170 | Statement::RecallEvents(_)
171 | Statement::ShowCluster
172 | Statement::SetTierPolicy(_)
173 | Statement::ExplainCauses(_)
174 | Statement::WhatIf(_)
175 | Statement::Counterfactual(_) => QueryPlan {
176 verb: PlanVerb::Other,
177 steps: Vec::new(),
178 },
179 }
180}
181
182fn plan_recall(r: &RecallStmt, stats: Option<&DbStats>) -> QueryPlan {
183 let total = stats.map(|s| s.total_count).unwrap_or(0);
184 let mut remaining = total;
185 let mut steps = Vec::new();
186
187 let layer_cardinality = estimate_layer_cardinality(&r.layers, stats);
189 remaining = remaining.min(layer_cardinality);
190 steps.push(PlanStep {
191 op: PlanOp::LayerFilter {
192 layers: r.layers.clone(),
193 },
194 estimated_cost: 0.01,
195 estimated_cardinality: remaining,
196 });
197
198 if let Some(ref ns) = r.namespace {
200 remaining = (remaining as f64 * 0.3) as u64; steps.push(PlanStep {
202 op: PlanOp::NamespaceFilter {
203 namespace: ns.clone(),
204 },
205 estimated_cost: 0.02,
206 estimated_cardinality: remaining,
207 });
208 }
209
210 for sf in &r.subquery_filters {
212 remaining = (remaining as f64 * 0.2) as u64; steps.push(PlanStep {
214 op: PlanOp::SubqueryResolve {
215 field: sf.field.clone(),
216 subquery: sf.subquery.clone(),
217 },
218 estimated_cost: 0.8, estimated_cardinality: remaining.max(1),
220 });
221 }
222
223 if let Some(ref snapshot) = r.as_of {
225 remaining = (remaining as f64 * 0.6) as u64; steps.push(PlanStep {
227 op: PlanOp::TimeTravelFilter {
228 snapshot: snapshot.clone(),
229 },
230 estimated_cost: 0.15,
231 estimated_cardinality: remaining.max(1),
232 });
233 }
234
235 let mut scalar_filters: Vec<(PlanOp, f64, f64)> = Vec::new(); if let Some(ref entities) = r.involving {
240 let sel = (0.1_f64).powi(entities.len() as i32).max(0.01);
243 scalar_filters.push((
244 PlanOp::EntityFilter {
245 entities: entities.clone(),
246 },
247 0.08,
248 sel,
249 ));
250 }
251
252 if let Some(ref tc) = r.temporal {
253 let sel = estimate_temporal_selectivity(tc, stats);
254 scalar_filters.push((
255 PlanOp::TemporalFilter {
256 temporal: tc.clone(),
257 },
258 0.1,
259 sel,
260 ));
261 }
262
263 let mut general_conditions = Vec::new();
264 for wc in &r.where_clauses {
265 match wc.field.as_str() {
266 "importance" => {
267 if let ConditionValue::Float(v) = wc.value {
268 let sel = estimate_threshold_selectivity(v, &wc.op);
269 scalar_filters.push((
270 PlanOp::ImportanceFilter {
271 op: wc.op,
272 threshold: v,
273 },
274 0.05,
275 sel,
276 ));
277 }
278 }
279 "confidence" => {
280 if let ConditionValue::Float(v) = wc.value {
281 let sel = estimate_threshold_selectivity(v, &wc.op);
282 scalar_filters.push((
283 PlanOp::ConfidenceFilter {
284 op: wc.op,
285 threshold: v,
286 },
287 0.05,
288 sel,
289 ));
290 }
291 }
292 _ => general_conditions.push(wc.clone()),
293 }
294 }
295
296 scalar_filters.sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
298
299 let has_highly_selective_scalar = scalar_filters
302 .first()
303 .map(|(_, _, sel)| *sel < 0.3)
304 .unwrap_or(false);
305 let scalar_before_vector = has_highly_selective_scalar && total > 100;
306
307 if scalar_before_vector {
308 for (op, cost, sel) in &scalar_filters {
309 remaining = (remaining as f64 * sel) as u64;
310 steps.push(PlanStep {
311 op: op.clone(),
312 estimated_cost: *cost,
313 estimated_cardinality: remaining.max(1),
314 });
315 }
316 }
317
318 let limit = r.limit.unwrap_or(10);
320 let index_hint = if total > 1000 {
321 IndexHint::IvfHnsw
322 } else {
323 IndexHint::Auto
324 };
325 let vector_cardinality = limit as u64;
326 steps.push(PlanStep {
327 op: PlanOp::VectorSearch {
328 query: r.about.clone(),
329 limit,
330 index_hint,
331 },
332 estimated_cost: if total > 1000 { 1.0 } else { 0.5 },
333 estimated_cardinality: vector_cardinality,
334 });
335 remaining = remaining.min(vector_cardinality);
336
337 if !scalar_before_vector {
339 for (op, cost, sel) in &scalar_filters {
340 remaining = (remaining as f64 * sel) as u64;
341 steps.push(PlanStep {
342 op: op.clone(),
343 estimated_cost: *cost,
344 estimated_cardinality: remaining.max(1),
345 });
346 }
347 }
348
349 for cond in general_conditions {
351 remaining = (remaining as f64 * 0.5) as u64; steps.push(PlanStep {
353 op: PlanOp::ConditionFilter { condition: cond },
354 estimated_cost: 0.05,
355 estimated_cardinality: remaining.max(1),
356 });
357 }
358
359 if let Some(ref ex) = r.expand {
361 let graph_output = remaining * (ex.depth as u64 + 1);
362 steps.push(PlanStep {
363 op: PlanOp::GraphExpand {
364 depth: ex.depth,
365 min_weight: ex.min_weight,
366 activation: ex.activation,
367 },
368 estimated_cost: compute_graph_cost(ex.depth),
369 estimated_cardinality: graph_output,
370 });
371 remaining = graph_output;
372 }
373
374 if let Some(depth) = r.follow_causes {
376 steps.push(PlanStep {
377 op: PlanOp::CausalTraverse { depth },
378 estimated_cost: compute_graph_cost(depth),
379 estimated_cardinality: remaining,
380 });
381 }
382
383 steps.push(PlanStep {
385 op: PlanOp::ScoreAndRank,
386 estimated_cost: 0.1,
387 estimated_cardinality: remaining,
388 });
389
390 if let Some(ref gb) = r.group_by {
392 let group_count = (remaining as f64 * 0.1).max(1.0) as u64;
394 steps.push(PlanStep {
395 op: PlanOp::Aggregate {
396 field: gb.field.clone(),
397 function: gb.function,
398 },
399 estimated_cost: 0.15,
400 estimated_cardinality: group_count,
401 });
402 remaining = group_count;
403 }
404
405 if let Some(ref fields) = r.projection {
407 steps.push(PlanStep {
408 op: PlanOp::Project {
409 fields: fields.clone(),
410 },
411 estimated_cost: 0.02,
412 estimated_cardinality: remaining,
413 });
414 }
415
416 if let Some(fmt) = r.result_format {
418 steps.push(PlanStep {
419 op: PlanOp::FormatOutput { format: fmt },
420 estimated_cost: 0.05,
421 estimated_cardinality: remaining,
422 });
423 }
424
425 let final_count = remaining.min(limit as u64);
427 steps.push(PlanStep {
428 op: PlanOp::LimitResults { n: limit },
429 estimated_cost: 0.01,
430 estimated_cardinality: final_count,
431 });
432
433 QueryPlan {
434 steps,
435 verb: PlanVerb::Recall,
436 }
437}
438
439fn plan_think(t: &ThinkStmt, stats: Option<&DbStats>) -> QueryPlan {
440 let recall_equivalent = RecallStmt {
442 layers: vec![
443 hirn_core::types::Layer::Episodic,
444 hirn_core::types::Layer::Semantic,
445 ],
446 about: t.about.clone(),
447 involving: t.involving.clone(),
448 temporal: t.temporal.clone(),
449 expand: t.expand.clone(),
450 follow_causes: t.follow_causes,
451 where_clauses: t.where_clauses.clone(),
452 modality: None,
453 resource_roles: None,
454 hydration_modes: None,
455 artifact_kinds: None,
456 group_by: None,
457 projection: None,
458 output_format: t.output_format,
459 result_format: None,
460 as_of: None,
461 subquery_filters: vec![],
462 budget: t.budget,
463 namespace: t.namespace.clone(),
464 consistency: t.consistency,
465 limit: t.limit,
466 hybrid: false,
467 depth_mode: None,
468 with_prospective: None,
469 with_mcfa: None,
470 with_conflicts: false,
471 provenance_depth: None,
472 topic: None,
473 from_realms: None,
474 };
475
476 let mut plan = plan_recall(&recall_equivalent, stats);
477 plan.verb = PlanVerb::Think;
478
479 let budget = t.budget.unwrap_or(4096);
481 let format = t.output_format.unwrap_or(OutputFormat::Context);
482 let cardinality = plan
483 .steps
484 .last()
485 .map(|s| s.estimated_cardinality)
486 .unwrap_or(0);
487 let assemble = PlanStep {
488 op: PlanOp::ContextAssemble { budget, format },
489 estimated_cost: 0.5,
490 estimated_cardinality: cardinality,
491 };
492
493 if let Some(pos) = plan
495 .steps
496 .iter()
497 .position(|s| matches!(s.op, PlanOp::LimitResults { .. }))
498 {
499 plan.steps.insert(pos, assemble);
500 } else {
501 plan.steps.push(assemble);
502 }
503
504 plan
505}
506
507fn plan_correct(c: &CorrectStmt) -> QueryPlan {
508 QueryPlan {
509 steps: vec![
510 PlanStep {
511 op: PlanOp::ResolveActiveHead {
512 target: c.target.to_string(),
513 },
514 estimated_cost: 0.1,
515 estimated_cardinality: 1,
516 },
517 PlanStep {
518 op: PlanOp::CorrectRecord {
519 target: c.target.to_string(),
520 fields: c
521 .updates
522 .iter()
523 .map(|update| update.field.clone())
524 .collect(),
525 },
526 estimated_cost: 0.2,
527 estimated_cardinality: 1,
528 },
529 ],
530 verb: PlanVerb::Correct,
531 }
532}
533
534fn plan_supersede(s: &SupersedeStmt) -> QueryPlan {
535 QueryPlan {
536 steps: vec![
537 PlanStep {
538 op: PlanOp::ResolveActiveHead {
539 target: s.target.to_string(),
540 },
541 estimated_cost: 0.1,
542 estimated_cardinality: 1,
543 },
544 PlanStep {
545 op: PlanOp::SupersedeRecord {
546 target: s.target.to_string(),
547 fields: s
548 .updates
549 .iter()
550 .map(|update| update.field.clone())
551 .collect(),
552 },
553 estimated_cost: 0.2,
554 estimated_cardinality: 1,
555 },
556 ],
557 verb: PlanVerb::Supersede,
558 }
559}
560
561fn plan_merge_memory(m: &MergeMemoryStmt) -> QueryPlan {
562 QueryPlan {
563 steps: vec![
564 PlanStep {
565 op: PlanOp::ResolveActiveHead {
566 target: m.target.to_string(),
567 },
568 estimated_cost: 0.1,
569 estimated_cardinality: 1,
570 },
571 PlanStep {
572 op: PlanOp::MergeMemory {
573 target: m.target.to_string(),
574 sources: m.sources.iter().map(ToString::to_string).collect(),
575 fields: m
576 .updates
577 .iter()
578 .map(|update| update.field.clone())
579 .collect(),
580 },
581 estimated_cost: 0.3,
582 estimated_cardinality: 1,
583 },
584 ],
585 verb: PlanVerb::Merge,
586 }
587}
588
589fn plan_retract(r: &RetractStmt) -> QueryPlan {
590 QueryPlan {
591 steps: vec![
592 PlanStep {
593 op: PlanOp::ResolveActiveHead {
594 target: r.target.to_string(),
595 },
596 estimated_cost: 0.1,
597 estimated_cardinality: 1,
598 },
599 PlanStep {
600 op: PlanOp::RetractRecord {
601 target: r.target.to_string(),
602 },
603 estimated_cost: 0.2,
604 estimated_cardinality: 1,
605 },
606 ],
607 verb: PlanVerb::Retract,
608 }
609}
610
611fn plan_traverse(t: &TraverseStmt) -> QueryPlan {
612 let mut steps = vec![];
613
614 steps.push(PlanStep {
616 op: PlanOp::VectorSearch {
617 query: t.from.clone(),
618 limit: 1,
619 index_hint: IndexHint::SeqScan,
620 },
621 estimated_cost: 0.5,
622 estimated_cardinality: 1,
623 });
624
625 steps.push(PlanStep {
627 op: PlanOp::GraphExpand {
628 depth: t.depth,
629 min_weight: None,
630 activation: None,
631 },
632 estimated_cost: 0.5 * t.depth as f64,
633 estimated_cardinality: (t.depth * 5) as u64,
634 });
635
636 for wc in &t.where_clauses {
638 steps.push(PlanStep {
639 op: PlanOp::ConditionFilter {
640 condition: wc.clone(),
641 },
642 estimated_cost: 0.2,
643 estimated_cardinality: (t.depth * 3) as u64,
644 });
645 }
646
647 if let Some(n) = t.limit {
649 steps.push(PlanStep {
650 op: PlanOp::LimitResults { n },
651 estimated_cost: 0.0,
652 estimated_cardinality: n as u64,
653 });
654 }
655
656 QueryPlan {
657 steps,
658 verb: PlanVerb::Recall, }
660}
661
662fn plan_inspect(i: &InspectStmt) -> QueryPlan {
663 QueryPlan {
664 steps: vec![PlanStep {
665 op: PlanOp::InspectRecord {
666 target: i.target.to_string(),
667 },
668 estimated_cost: 0.1,
669 estimated_cardinality: 1,
670 }],
671 verb: PlanVerb::Inspect,
672 }
673}
674
675fn plan_history(h: &HistoryStmt) -> QueryPlan {
676 let mut steps = vec![PlanStep {
677 op: PlanOp::HistoryRecord {
678 target: h.target.to_string(),
679 },
680 estimated_cost: 0.15,
681 estimated_cardinality: 1,
682 }];
683
684 if let Some(namespace) = &h.namespace {
685 steps.insert(
686 0,
687 PlanStep {
688 op: PlanOp::NamespaceFilter {
689 namespace: namespace.clone(),
690 },
691 estimated_cost: 0.05,
692 estimated_cardinality: 1,
693 },
694 );
695 }
696
697 QueryPlan {
698 steps,
699 verb: PlanVerb::History,
700 }
701}
702
703fn plan_trace(t: &TraceStmt) -> QueryPlan {
704 QueryPlan {
705 steps: vec![PlanStep {
706 op: PlanOp::TraceProvenance {
707 target: t.target.to_string(),
708 },
709 estimated_cost: 0.1,
710 estimated_cardinality: 1,
711 }],
712 verb: PlanVerb::Trace,
713 }
714}
715
716fn estimate_layer_cardinality(layers: &[hirn_core::types::Layer], stats: Option<&DbStats>) -> u64 {
720 let Some(s) = stats else { return 0 };
721 let mut total = 0u64;
722 for l in layers {
723 match l {
724 hirn_core::types::Layer::Episodic => total += s.episodic_count,
725 hirn_core::types::Layer::Semantic => total += s.semantic_count,
726 hirn_core::types::Layer::Working => total += s.working_count,
727 hirn_core::types::Layer::Procedural => total += 0, }
729 }
730 total
731}
732
733fn estimate_temporal_selectivity(tc: &TemporalClause, stats: Option<&DbStats>) -> f64 {
735 let Some(stats) = stats else { return 0.5 };
736 if stats.total_count == 0 {
737 return 1.0;
738 }
739
740 match tc {
743 TemporalClause::Between { .. } => 0.2, TemporalClause::After(_) => 0.4, TemporalClause::Before(_) => 0.4, }
747}
748
749fn estimate_threshold_selectivity(threshold: f64, op: &ComparisonOp) -> f64 {
751 match op {
753 ComparisonOp::Gt | ComparisonOp::Gte => (1.0 - threshold).max(0.01),
754 ComparisonOp::Lt | ComparisonOp::Lte => threshold.max(0.01),
755 ComparisonOp::Eq => 0.05, ComparisonOp::Neq => 0.95, }
758}
759
760fn compute_graph_cost(depth: usize) -> f64 {
761 (depth as f64).powi(2) * 0.5
763}
764
765impl fmt::Display for QueryPlan {
768 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
769 writeln!(f, "QueryPlan ({:?}):", self.verb)?;
770 for (i, step) in self.steps.iter().enumerate() {
771 writeln!(
772 f,
773 " Step {}: {} (est. cost: {:.3}, est. rows: {})",
774 i + 1,
775 step.op,
776 step.estimated_cost,
777 step.estimated_cardinality,
778 )?;
779 }
780 let total: f64 = self.steps.iter().map(|s| s.estimated_cost).sum();
781 writeln!(f, " Total estimated cost: {total:.3}")?;
782 Ok(())
783 }
784}
785
786impl fmt::Display for PlanOp {
787 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
788 match self {
789 Self::LayerFilter { layers } => {
790 let names: Vec<&str> = layers
791 .iter()
792 .map(|l| match l {
793 hirn_core::types::Layer::Episodic => "episodic",
794 hirn_core::types::Layer::Semantic => "semantic",
795 hirn_core::types::Layer::Working => "working",
796 hirn_core::types::Layer::Procedural => "procedural",
797 })
798 .collect();
799 write!(f, "LayerFilter({})", names.join(", "))
800 }
801 Self::NamespaceFilter { namespace } => write!(f, "NamespaceFilter({namespace})"),
802 Self::TemporalFilter { temporal } => write!(f, "TemporalFilter({temporal})"),
803 Self::ImportanceFilter { op, threshold } => {
804 write!(f, "ImportanceFilter({op} {threshold})")
805 }
806 Self::ConfidenceFilter { op, threshold } => {
807 write!(f, "ConfidenceFilter({op} {threshold})")
808 }
809 Self::ConditionFilter { condition } => write!(f, "ConditionFilter({condition})"),
810 Self::EntityFilter { entities } => {
811 write!(f, "EntityFilter({})", entities.join(", "))
812 }
813 Self::VectorSearch {
814 query,
815 limit,
816 index_hint,
817 } => {
818 write!(
819 f,
820 "VectorSearch(\"{query}\", limit={limit}, index={index_hint:?})"
821 )
822 }
823 Self::GraphExpand {
824 depth,
825 min_weight,
826 activation,
827 } => {
828 write!(f, "GraphExpand(depth={depth}")?;
829 if let Some(mw) = min_weight {
830 write!(f, ", min_weight={mw}")?;
831 }
832 if let Some(am) = activation {
833 write!(f, ", activation={am}")?;
834 }
835 write!(f, ")")
836 }
837 Self::ActivationPass {
838 decay,
839 inhibition,
840 max_iter,
841 } => write!(
842 f,
843 "ActivationPass(decay={decay}, inhibition={inhibition}, max_iter={max_iter})"
844 ),
845 Self::CausalTraverse { depth } => write!(f, "CausalTraverse(depth={depth})"),
846 Self::ScoreAndRank => write!(f, "ScoreAndRank"),
847 Self::Aggregate { field, function } => {
848 write!(f, "Aggregate(GROUP BY {field} {function})")
849 }
850 Self::Project { fields } => write!(f, "Project({})", fields.join(", ")),
851 Self::FormatOutput { format } => write!(f, "FormatOutput({format})"),
852 Self::ContextAssemble { budget, format } => {
853 write!(f, "ContextAssemble(budget={budget}, format={format})")
854 }
855 Self::LimitResults { n } => write!(f, "LimitResults({n})"),
856 Self::ResolveActiveHead { target } => write!(f, "ResolveActiveHead({target})"),
857 Self::CorrectRecord { target, fields } => {
858 write!(f, "CorrectRecord({target}; fields={})", fields.join(", "))
859 }
860 Self::SupersedeRecord { target, fields } => {
861 write!(f, "SupersedeRecord({target}; fields={})", fields.join(", "))
862 }
863 Self::MergeMemory {
864 target,
865 sources,
866 fields,
867 } => write!(
868 f,
869 "MergeMemory(target={target}; sources={}; fields={})",
870 sources.join(", "),
871 fields.join(", ")
872 ),
873 Self::RetractRecord { target } => write!(f, "RetractRecord({target})"),
874 Self::InspectRecord { target } => write!(f, "InspectRecord({target})"),
875 Self::HistoryRecord { target } => write!(f, "HistoryRecord({target})"),
876 Self::TraceProvenance { target } => write!(f, "TraceProvenance({target})"),
877 Self::SubqueryResolve { field, .. } => write!(f, "SubqueryResolve({field})"),
878 Self::TimeTravelFilter { snapshot } => {
879 write!(f, "TimeTravelFilter(AS OF {snapshot})")
880 }
881 }
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::ql::parser;
889
890 fn empty_stats() -> DbStats {
891 DbStats {
892 working_count: 0,
893 episodic_count: 0,
894 semantic_count: 0,
895 edge_count: 0,
896 procedural_count: 0,
897 total_count: 0,
898 file_size_bytes: 0,
899 }
900 }
901
902 fn large_stats() -> DbStats {
903 DbStats {
904 working_count: 10,
905 episodic_count: 5000,
906 semantic_count: 2000,
907 edge_count: 0,
908 procedural_count: 0,
909 total_count: 7010,
910 file_size_bytes: 100_000_000,
911 }
912 }
913
914 #[test]
915 fn simple_recall_plan() {
916 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
917 let p = plan(&stmt, None);
918 assert_eq!(p.verb, PlanVerb::Recall);
919
920 let op_names: Vec<String> = p.steps.iter().map(|s| format!("{}", s.op)).collect();
921 assert!(op_names[0].starts_with("LayerFilter"));
922 assert!(op_names.iter().any(|o| o.starts_with("VectorSearch")));
923 assert!(op_names.iter().any(|o| o.starts_with("ScoreAndRank")));
924 assert!(op_names.last().unwrap().starts_with("LimitResults"));
925 }
926
927 #[test]
928 fn recall_with_expand_includes_graph_step() {
929 let stmt = parser::parse(
930 r#"RECALL episodic ABOUT "test" EXPAND GRAPH DEPTH 2 ACTIVATION spreading"#,
931 )
932 .unwrap();
933 let p = plan(&stmt, None);
934 assert!(
935 p.steps
936 .iter()
937 .any(|s| matches!(s.op, PlanOp::GraphExpand { .. }))
938 );
939 }
940
941 #[test]
942 fn think_includes_context_assemble() {
943 let stmt = parser::parse(r#"THINK ABOUT "optimize" BUDGET 4096"#).unwrap();
944 let p = plan(&stmt, None);
945 assert_eq!(p.verb, PlanVerb::Think);
946 assert!(
947 p.steps
948 .iter()
949 .any(|s| matches!(s.op, PlanOp::ContextAssemble { .. }))
950 );
951 }
952
953 #[test]
954 fn plan_display_readable() {
955 let stmt = parser::parse(r#"RECALL episodic ABOUT "test" LIMIT 5"#).unwrap();
956 let p = plan(&stmt, None);
957 let display = format!("{p}");
958 assert!(display.contains("QueryPlan"));
959 assert!(display.contains("Step 1"));
960 assert!(display.contains("Total estimated cost"));
961 }
962
963 #[test]
964 fn highly_selective_temporal_before_vector_with_large_stats() {
965 let stmt = parser::parse(
967 r#"RECALL episodic ABOUT "test" BETWEEN "2026-03-01" AND "2026-03-14" LIMIT 5"#,
968 )
969 .unwrap();
970 let p = plan(&stmt, Some(&large_stats()));
971
972 let temporal_pos = p
973 .steps
974 .iter()
975 .position(|s| matches!(s.op, PlanOp::TemporalFilter { .. }))
976 .unwrap();
977 let vector_pos = p
978 .steps
979 .iter()
980 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
981 .unwrap();
982 assert!(
983 temporal_pos < vector_pos,
984 "highly selective temporal filter should run before vector search with large dataset"
985 );
986 }
987
988 #[test]
989 fn moderate_temporal_after_vector_with_large_stats() {
990 let stmt =
992 parser::parse(r#"RECALL episodic ABOUT "test" AFTER "2026-03-14" LIMIT 5"#).unwrap();
993 let p = plan(&stmt, Some(&large_stats()));
994
995 let temporal_pos = p
996 .steps
997 .iter()
998 .position(|s| matches!(s.op, PlanOp::TemporalFilter { .. }))
999 .unwrap();
1000 let vector_pos = p
1001 .steps
1002 .iter()
1003 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1004 .unwrap();
1005 assert!(
1006 temporal_pos > vector_pos,
1007 "moderate-selectivity temporal filter should run after vector search"
1008 );
1009 }
1010
1011 #[test]
1012 fn wide_temporal_after_vector_with_small_stats() {
1013 let stmt =
1014 parser::parse(r#"RECALL episodic ABOUT "test" AFTER "2020-01-01" LIMIT 5"#).unwrap();
1015 let p = plan(&stmt, Some(&empty_stats()));
1016
1017 let temporal_pos = p
1019 .steps
1020 .iter()
1021 .position(|s| matches!(s.op, PlanOp::TemporalFilter { .. }));
1022 let vector_pos = p
1023 .steps
1024 .iter()
1025 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1026 .unwrap();
1027
1028 if let Some(tp) = temporal_pos {
1029 assert!(
1030 tp > vector_pos,
1031 "temporal filter should run after vector search with small dataset"
1032 );
1033 }
1034 }
1035
1036 #[test]
1037 fn layer_filter_always_first() {
1038 let queries = [
1039 r#"RECALL episodic ABOUT "x""#,
1040 r#"RECALL semantic ABOUT "y" LIMIT 5"#,
1041 r#"RECALL episodic ABOUT "z" EXPAND GRAPH DEPTH 2"#,
1042 ];
1043 for q in queries {
1044 let stmt = parser::parse(q).unwrap();
1045 let p = plan(&stmt, None);
1046 assert!(
1047 matches!(p.steps[0].op, PlanOp::LayerFilter { .. }),
1048 "LayerFilter must be first step for: {q}"
1049 );
1050 }
1051 }
1052
1053 #[test]
1054 fn explain_no_side_effects() {
1055 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
1057 let p1 = plan(&stmt, None);
1058 let p2 = plan(&stmt, None);
1059 assert_eq!(p1, p2);
1060 }
1061
1062 #[test]
1063 fn importance_filter_before_graph() {
1064 let stmt = parser::parse(
1065 r#"RECALL episodic ABOUT "test" EXPAND GRAPH DEPTH 2 WHERE importance > 0.5"#,
1066 )
1067 .unwrap();
1068 let p = plan(&stmt, None);
1069
1070 let imp_pos = p
1071 .steps
1072 .iter()
1073 .position(|s| matches!(s.op, PlanOp::ImportanceFilter { .. }))
1074 .unwrap();
1075 let graph_pos = p
1076 .steps
1077 .iter()
1078 .position(|s| matches!(s.op, PlanOp::GraphExpand { .. }))
1079 .unwrap();
1080 assert!(
1081 imp_pos < graph_pos,
1082 "importance filter should run before graph expansion"
1083 );
1084 }
1085
1086 #[test]
1089 fn entity_filter_before_vector_with_large_stats() {
1090 let stmt = parser::parse(
1092 r#"RECALL episodic ABOUT "auth" INVOLVING "JWT" AFTER "2026-03-01" LIMIT 5"#,
1093 )
1094 .unwrap();
1095 let p = plan(&stmt, Some(&large_stats()));
1096
1097 let entity_pos = p
1098 .steps
1099 .iter()
1100 .position(|s| matches!(s.op, PlanOp::EntityFilter { .. }))
1101 .unwrap();
1102 let vector_pos = p
1103 .steps
1104 .iter()
1105 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1106 .unwrap();
1107 assert!(
1108 entity_pos < vector_pos,
1109 "entity filter should be placed before vector search (high selectivity)"
1110 );
1111 }
1112
1113 #[test]
1114 fn explain_shows_estimated_cardinalities() {
1115 let stmt = parser::parse(
1116 r#"RECALL episodic ABOUT "auth" INVOLVING "JWT" AFTER "2026-03-01" LIMIT 5"#,
1117 )
1118 .unwrap();
1119 let p = plan(&stmt, Some(&large_stats()));
1120 let display = format!("{p}");
1121
1122 assert!(
1124 display.contains("est. rows"),
1125 "plan display should show estimated rows"
1126 );
1127 for step in &p.steps {
1129 assert!(
1130 step.estimated_cardinality > 0,
1131 "cardinality should be > 0: {:?}",
1132 step.op
1133 );
1134 }
1135 }
1136
1137 #[test]
1138 fn plan_changes_with_data_distribution() {
1139 let stmt = parser::parse(
1141 r#"RECALL episodic ABOUT "test" BETWEEN "2026-03-01" AND "2026-03-14" LIMIT 5"#,
1142 )
1143 .unwrap();
1144 let plan_empty = plan(&stmt, Some(&empty_stats()));
1145 let plan_large = plan(&stmt, Some(&large_stats()));
1146
1147 let temporal_pos_large = plan_large
1149 .steps
1150 .iter()
1151 .position(|s| matches!(s.op, PlanOp::TemporalFilter { .. }))
1152 .unwrap();
1153 let vector_pos_large = plan_large
1154 .steps
1155 .iter()
1156 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1157 .unwrap();
1158 assert!(temporal_pos_large < vector_pos_large);
1159
1160 let temporal_pos_empty = plan_empty
1162 .steps
1163 .iter()
1164 .position(|s| matches!(s.op, PlanOp::TemporalFilter { .. }))
1165 .unwrap();
1166 let vector_pos_empty = plan_empty
1167 .steps
1168 .iter()
1169 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1170 .unwrap();
1171 assert!(temporal_pos_empty > vector_pos_empty);
1172 }
1173
1174 #[test]
1175 fn ivf_hnsw_for_large_dataset() {
1176 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
1177 let p = plan(&stmt, Some(&large_stats()));
1178 let vs = p
1179 .steps
1180 .iter()
1181 .find(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1182 .unwrap();
1183 match &vs.op {
1184 PlanOp::VectorSearch { index_hint, .. } => {
1185 assert_eq!(*index_hint, IndexHint::IvfHnsw);
1186 }
1187 _ => unreachable!(),
1188 }
1189 }
1190
1191 #[test]
1192 fn auto_index_for_small_dataset() {
1193 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
1194 let p = plan(&stmt, Some(&empty_stats()));
1195 let vs = p
1196 .steps
1197 .iter()
1198 .find(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1199 .unwrap();
1200 match &vs.op {
1201 PlanOp::VectorSearch { index_hint, .. } => {
1202 assert_eq!(*index_hint, IndexHint::Auto);
1203 }
1204 _ => unreachable!(),
1205 }
1206 }
1207
1208 #[test]
1209 fn hybrid_entity_and_vector_plan() {
1210 let stmt =
1212 parser::parse(r#"RECALL episodic ABOUT "auth" INVOLVING "JWT" LIMIT 5"#).unwrap();
1213 let p = plan(&stmt, Some(&large_stats()));
1214
1215 assert!(
1217 p.steps
1218 .iter()
1219 .any(|s| matches!(s.op, PlanOp::EntityFilter { .. })),
1220 "should have entity filter"
1221 );
1222 assert!(
1223 p.steps
1224 .iter()
1225 .any(|s| matches!(s.op, PlanOp::VectorSearch { .. })),
1226 "should have vector search"
1227 );
1228 }
1229
1230 #[test]
1233 fn group_by_produces_aggregate_step() {
1234 let stmt =
1235 parser::parse(r#"RECALL episodic ABOUT "test" GROUP BY entity_type COUNT"#).unwrap();
1236 let p = plan(&stmt, None);
1237 assert!(
1238 p.steps
1239 .iter()
1240 .any(|s| matches!(s.op, PlanOp::Aggregate { .. })),
1241 "should have Aggregate step"
1242 );
1243 let agg_pos = p
1245 .steps
1246 .iter()
1247 .position(|s| matches!(s.op, PlanOp::Aggregate { .. }))
1248 .unwrap();
1249 let rank_pos = p
1250 .steps
1251 .iter()
1252 .position(|s| matches!(s.op, PlanOp::ScoreAndRank))
1253 .unwrap();
1254 assert!(agg_pos > rank_pos);
1255 }
1256
1257 #[test]
1258 fn select_produces_project_step() {
1259 let stmt = parser::parse(r#"RECALL episodic ABOUT "test" SELECT id, summary, importance"#)
1260 .unwrap();
1261 let p = plan(&stmt, None);
1262 let proj = p
1263 .steps
1264 .iter()
1265 .find(|s| matches!(s.op, PlanOp::Project { .. }))
1266 .unwrap();
1267 match &proj.op {
1268 PlanOp::Project { fields } => {
1269 assert_eq!(fields, &["id", "summary", "importance"]);
1270 }
1271 _ => unreachable!(),
1272 }
1273 }
1274
1275 #[test]
1276 fn format_json_produces_format_step() {
1277 let stmt = parser::parse(r#"RECALL episodic ABOUT "test" FORMAT json"#).unwrap();
1278 let p = plan(&stmt, None);
1279 assert!(
1280 p.steps.iter().any(|s| matches!(
1281 s.op,
1282 PlanOp::FormatOutput {
1283 format: OutputFormat::Json
1284 }
1285 )),
1286 "should have FormatOutput(Json) step"
1287 );
1288 }
1289
1290 #[test]
1291 fn format_csv_produces_format_step() {
1292 let stmt = parser::parse(r#"RECALL episodic ABOUT "test" FORMAT csv"#).unwrap();
1293 let p = plan(&stmt, None);
1294 assert!(
1295 p.steps.iter().any(|s| matches!(
1296 s.op,
1297 PlanOp::FormatOutput {
1298 format: OutputFormat::Csv
1299 }
1300 )),
1301 "should have FormatOutput(Csv) step"
1302 );
1303 }
1304
1305 #[test]
1306 fn no_group_by_no_aggregate_step() {
1307 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
1308 let p = plan(&stmt, None);
1309 assert!(
1310 !p.steps
1311 .iter()
1312 .any(|s| matches!(s.op, PlanOp::Aggregate { .. })),
1313 "should not have Aggregate step without GROUP BY"
1314 );
1315 }
1316
1317 #[test]
1320 fn subquery_produces_resolve_step() {
1321 let stmt = parser::parse(
1322 r#"RECALL episodic ABOUT "outage" WHERE entity IN (RECALL semantic ABOUT "services")"#,
1323 )
1324 .unwrap();
1325 let p = plan(&stmt, None);
1326 assert!(
1327 p.steps.iter().any(
1328 |s| matches!(&s.op, PlanOp::SubqueryResolve { field, .. } if field == "entity")
1329 ),
1330 "should have SubqueryResolve step for entity field"
1331 );
1332 }
1333
1334 #[test]
1335 fn as_of_produces_time_travel_step() {
1336 let stmt = parser::parse(r#"RECALL episodic ABOUT "deploy" AS OF "2026-03-01T12:00:00Z""#)
1337 .unwrap();
1338 let p = plan(&stmt, None);
1339 assert!(
1340 p.steps.iter().any(|s| matches!(
1341 &s.op,
1342 PlanOp::TimeTravelFilter { snapshot }
1343 if snapshot
1344 == &RecallSnapshotAst::Unqualified(
1345 "2026-03-01T12:00:00Z".to_string()
1346 )
1347 )),
1348 "should have TimeTravelFilter step"
1349 );
1350 }
1351
1352 #[test]
1353 fn recorded_as_of_produces_structured_time_travel_step() {
1354 let stmt = parser::parse(
1355 r#"RECALL episodic ABOUT "deploy" AS OF RECORDED "2026-03-01T12:00:00Z""#,
1356 )
1357 .unwrap();
1358 let p = plan(&stmt, None);
1359 assert!(
1360 p.steps.iter().any(|s| matches!(
1361 &s.op,
1362 PlanOp::TimeTravelFilter { snapshot }
1363 if snapshot
1364 == &RecallSnapshotAst::Recorded(
1365 "2026-03-01T12:00:00Z".to_string()
1366 )
1367 )),
1368 "should have structured TimeTravelFilter step"
1369 );
1370 }
1371
1372 #[test]
1373 fn subquery_step_before_vector_search() {
1374 let stmt = parser::parse(
1375 r#"RECALL episodic ABOUT "test" WHERE entity IN (RECALL semantic ABOUT "svc")"#,
1376 )
1377 .unwrap();
1378 let p = plan(&stmt, None);
1379 let sq_idx = p
1380 .steps
1381 .iter()
1382 .position(|s| matches!(s.op, PlanOp::SubqueryResolve { .. }));
1383 let vs_idx = p
1384 .steps
1385 .iter()
1386 .position(|s| matches!(s.op, PlanOp::VectorSearch { .. }));
1387 assert!(
1388 sq_idx.unwrap() < vs_idx.unwrap(),
1389 "SubqueryResolve should appear before VectorSearch"
1390 );
1391 }
1392
1393 #[test]
1394 fn no_as_of_no_time_travel_step() {
1395 let stmt = parser::parse(r#"RECALL episodic ABOUT "test""#).unwrap();
1396 let p = plan(&stmt, None);
1397 assert!(
1398 !p.steps
1399 .iter()
1400 .any(|s| matches!(s.op, PlanOp::TimeTravelFilter { .. })),
1401 "should not have TimeTravelFilter step without AS OF"
1402 );
1403 }
1404
1405 #[test]
1408 fn traverse_plan_has_graph_expand() {
1409 let stmt = parser::parse(r#"TRAVERSE FROM "node1" DEPTH 3"#).unwrap();
1410 let p = plan(&stmt, None);
1411 assert!(
1412 p.steps
1413 .iter()
1414 .any(|s| matches!(&s.op, PlanOp::GraphExpand { depth: 3, .. })),
1415 "should have GraphExpand step with depth 3"
1416 );
1417 }
1418
1419 #[test]
1420 fn traverse_plan_has_vector_search_for_start() {
1421 let stmt = parser::parse(r#"TRAVERSE FROM "node1" DEPTH 2"#).unwrap();
1422 let p = plan(&stmt, None);
1423 assert!(
1424 p.steps
1425 .iter()
1426 .any(|s| matches!(&s.op, PlanOp::VectorSearch { .. })),
1427 "should have VectorSearch step for start node lookup"
1428 );
1429 }
1430
1431 #[test]
1432 fn traverse_with_where_has_condition_filter() {
1433 let stmt = parser::parse(r#"TRAVERSE FROM "root" DEPTH 3 WHERE weight > 0.5"#).unwrap();
1434 let p = plan(&stmt, None);
1435 assert!(
1436 p.steps
1437 .iter()
1438 .any(|s| matches!(s.op, PlanOp::ConditionFilter { .. })),
1439 "should have ConditionFilter step"
1440 );
1441 }
1442
1443 #[test]
1444 fn traverse_with_limit_has_limit_step() {
1445 let stmt = parser::parse(r#"TRAVERSE FROM "root" DEPTH 2 LIMIT 10"#).unwrap();
1446 let p = plan(&stmt, None);
1447 assert!(
1448 p.steps
1449 .iter()
1450 .any(|s| matches!(s.op, PlanOp::LimitResults { n: 10 })),
1451 "should have LimitResults step"
1452 );
1453 }
1454
1455 #[test]
1458 fn explain_plans_inner_recall() {
1459 let stmt =
1460 parser::parse(r#"EXPLAIN RECALL episodic ABOUT "test" WHERE importance > 0.5 LIMIT 5"#)
1461 .unwrap();
1462 let p = plan(&stmt, None);
1463 assert_eq!(p.verb, PlanVerb::Recall);
1465 assert!(
1466 p.steps
1467 .iter()
1468 .any(|s| matches!(s.op, PlanOp::VectorSearch { .. }))
1469 );
1470 assert!(
1471 p.steps
1472 .iter()
1473 .any(|s| matches!(s.op, PlanOp::ImportanceFilter { .. }))
1474 );
1475 assert!(
1476 p.steps
1477 .iter()
1478 .any(|s| matches!(s.op, PlanOp::LimitResults { .. }))
1479 );
1480 }
1481
1482 #[test]
1483 fn explain_analyze_plans_same_as_explain() {
1484 let stmt1 = parser::parse(r#"EXPLAIN RECALL episodic ABOUT "q" LIMIT 3"#).unwrap();
1485 let stmt2 = parser::parse(r#"EXPLAIN ANALYZE RECALL episodic ABOUT "q" LIMIT 3"#).unwrap();
1486 let p1 = plan(&stmt1, None);
1487 let p2 = plan(&stmt2, None);
1488 assert_eq!(p1.steps.len(), p2.steps.len());
1490 assert_eq!(p1.verb, p2.verb);
1491 }
1492}