Skip to main content

grafeo_engine/query/
plan.rs

1//! Logical query plan representation.
2//!
3//! The logical plan is the intermediate representation between parsed queries
4//! and physical execution. Both GQL and Cypher queries are translated to this
5//! common representation.
6
7use std::collections::HashMap;
8use std::fmt;
9
10use grafeo_common::types::Value;
11
12/// A count expression for SKIP/LIMIT: either a resolved literal or an unresolved parameter.
13#[derive(Debug, Clone, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum CountExpr {
16    /// A resolved integer count.
17    Literal(usize),
18    /// An unresolved parameter reference (e.g., `$limit`).
19    Parameter(String),
20}
21
22impl CountExpr {
23    /// Returns the resolved count, or panics if still a parameter reference.
24    ///
25    /// Call this only after parameter substitution has run.
26    ///
27    /// # Panics
28    ///
29    /// Panics if the expression is an unresolved `Parameter` reference.
30    pub fn value(&self) -> usize {
31        match self {
32            Self::Literal(n) => *n,
33            Self::Parameter(name) => panic!("Unresolved parameter: ${name}"),
34        }
35    }
36
37    /// Returns the resolved count, or an error if still a parameter reference.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error string if the expression is an unresolved `Parameter`.
42    pub fn try_value(&self) -> Result<usize, String> {
43        match self {
44            Self::Literal(n) => Ok(*n),
45            Self::Parameter(name) => Err(format!("Unresolved SKIP/LIMIT parameter: ${name}")),
46        }
47    }
48
49    /// Returns the count as f64 for cardinality estimation (defaults to 10 for unresolved params).
50    pub fn estimate(&self) -> f64 {
51        match self {
52            Self::Literal(n) => *n as f64,
53            Self::Parameter(_) => 10.0, // reasonable default for unresolved params
54        }
55    }
56}
57
58impl fmt::Display for CountExpr {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        match self {
61            Self::Literal(n) => write!(f, "{n}"),
62            Self::Parameter(name) => write!(f, "${name}"),
63        }
64    }
65}
66
67impl From<usize> for CountExpr {
68    fn from(n: usize) -> Self {
69        Self::Literal(n)
70    }
71}
72
73impl PartialEq<usize> for CountExpr {
74    fn eq(&self, other: &usize) -> bool {
75        matches!(self, Self::Literal(n) if n == other)
76    }
77}
78
79/// A logical query plan.
80#[derive(Debug, Clone)]
81pub struct LogicalPlan {
82    /// The root operator of the plan.
83    pub root: LogicalOperator,
84    /// When true, return the plan tree as text instead of executing.
85    pub explain: bool,
86    /// When true, execute the query and return per-operator runtime metrics.
87    pub profile: bool,
88    /// Default parameter values from variable declarations (e.g., GraphQL
89    /// `query($limit: Int = 2)`). The processor merges these with caller-supplied
90    /// params, giving caller values higher precedence.
91    pub default_params: HashMap<String, Value>,
92}
93
94impl LogicalPlan {
95    /// Creates a new logical plan with the given root operator.
96    pub fn new(root: LogicalOperator) -> Self {
97        Self {
98            root,
99            explain: false,
100            profile: false,
101            default_params: HashMap::new(),
102        }
103    }
104
105    /// Creates an EXPLAIN plan that returns the plan tree without executing.
106    pub fn explain(root: LogicalOperator) -> Self {
107        Self {
108            root,
109            explain: true,
110            profile: false,
111            default_params: HashMap::new(),
112        }
113    }
114
115    /// Creates a PROFILE plan that executes and returns per-operator metrics.
116    pub fn profile(root: LogicalOperator) -> Self {
117        Self {
118            root,
119            explain: false,
120            profile: true,
121            default_params: HashMap::new(),
122        }
123    }
124}
125
126/// A logical operator in the query plan.
127#[derive(Debug, Clone)]
128#[non_exhaustive]
129pub enum LogicalOperator {
130    /// Scan all nodes, optionally filtered by label.
131    NodeScan(NodeScanOp),
132
133    /// Scan all edges, optionally filtered by type.
134    EdgeScan(EdgeScanOp),
135
136    /// Expand from nodes to neighbors via edges.
137    Expand(ExpandOp),
138
139    /// Filter rows based on a predicate.
140    Filter(FilterOp),
141
142    /// Project specific columns.
143    Project(ProjectOp),
144
145    /// Join two inputs.
146    Join(JoinOp),
147
148    /// Aggregate with grouping.
149    Aggregate(AggregateOp),
150
151    /// Limit the number of results.
152    Limit(LimitOp),
153
154    /// Skip a number of results.
155    Skip(SkipOp),
156
157    /// Sort results.
158    Sort(SortOp),
159
160    /// Remove duplicate results.
161    Distinct(DistinctOp),
162
163    /// Create a new node.
164    CreateNode(CreateNodeOp),
165
166    /// Create a new edge.
167    CreateEdge(CreateEdgeOp),
168
169    /// Delete a node.
170    DeleteNode(DeleteNodeOp),
171
172    /// Delete an edge.
173    DeleteEdge(DeleteEdgeOp),
174
175    /// Set properties on a node or edge.
176    SetProperty(SetPropertyOp),
177
178    /// Add labels to a node.
179    AddLabel(AddLabelOp),
180
181    /// Remove labels from a node.
182    RemoveLabel(RemoveLabelOp),
183
184    /// Return results (terminal operator).
185    Return(ReturnOp),
186
187    /// Empty result set.
188    Empty,
189
190    // ==================== RDF/SPARQL Operators ====================
191    /// Scan RDF triples matching a pattern.
192    TripleScan(TripleScanOp),
193
194    /// Union of multiple result sets.
195    Union(UnionOp),
196
197    /// Left outer join for OPTIONAL patterns.
198    LeftJoin(LeftJoinOp),
199
200    /// Anti-join for MINUS patterns.
201    AntiJoin(AntiJoinOp),
202
203    /// SPARQL CONSTRUCT: evaluate WHERE, substitute bindings into template,
204    /// output (subject, predicate, object) columns.
205    Construct(ConstructOp),
206
207    /// Bind a variable to an expression.
208    Bind(BindOp),
209
210    /// Unwind a list into individual rows.
211    Unwind(UnwindOp),
212
213    /// Collect grouped key-value rows into a single Map value.
214    /// Used for Gremlin `groupCount()` semantics.
215    MapCollect(MapCollectOp),
216
217    /// Merge a node pattern (match or create).
218    Merge(MergeOp),
219
220    /// Merge a relationship pattern (match or create).
221    MergeRelationship(MergeRelationshipOp),
222
223    /// Find shortest path between nodes.
224    ShortestPath(ShortestPathOp),
225
226    // ==================== SPARQL Update Operators ====================
227    /// Insert RDF triples.
228    InsertTriple(InsertTripleOp),
229
230    /// Delete RDF triples.
231    DeleteTriple(DeleteTripleOp),
232
233    /// SPARQL MODIFY operation (DELETE/INSERT WHERE).
234    /// Evaluates WHERE once, applies DELETE templates, then INSERT templates.
235    Modify(ModifyOp),
236
237    /// Clear a graph (remove all triples).
238    ClearGraph(ClearGraphOp),
239
240    /// Create a new named graph.
241    CreateGraph(CreateGraphOp),
242
243    /// Drop (remove) a named graph.
244    DropGraph(DropGraphOp),
245
246    /// Load data from a URL into a graph.
247    LoadGraph(LoadGraphOp),
248
249    /// Copy triples from one graph to another.
250    CopyGraph(CopyGraphOp),
251
252    /// Move triples from one graph to another.
253    MoveGraph(MoveGraphOp),
254
255    /// Add (merge) triples from one graph to another.
256    AddGraph(AddGraphOp),
257
258    /// Per-row aggregation over a list-valued column (horizontal aggregation, GE09).
259    HorizontalAggregate(HorizontalAggregateOp),
260
261    // ==================== Vector Search Operators ====================
262    /// Scan using vector similarity search.
263    VectorScan(VectorScanOp),
264
265    /// Join graph patterns with vector similarity search.
266    ///
267    /// Computes vector distances between entities from the left input and
268    /// a query vector, then joins with similarity scores. Useful for:
269    /// - Filtering graph traversal results by vector similarity
270    /// - Computing aggregated embeddings and finding similar entities
271    /// - Combining multiple vector sources with graph structure
272    VectorJoin(VectorJoinOp),
273
274    /// Scan using full-text search with BM25 scoring.
275    TextScan(TextScanOp),
276
277    // ==================== Set Operations ====================
278    /// Set difference: rows in left that are not in right.
279    Except(ExceptOp),
280
281    /// Set intersection: rows common to all inputs.
282    Intersect(IntersectOp),
283
284    /// Fallback: use left result if non-empty, otherwise right.
285    Otherwise(OtherwiseOp),
286
287    // ==================== Correlated Subquery ====================
288    /// Apply (lateral join): evaluate a subplan per input row.
289    Apply(ApplyOp),
290
291    /// Parameter scan: leaf of a correlated inner plan that receives values
292    /// from the outer Apply operator. The column names match `ApplyOp.shared_variables`.
293    ParameterScan(ParameterScanOp),
294
295    // ==================== DDL Operators ====================
296    /// Define a property graph schema (SQL/PGQ DDL).
297    CreatePropertyGraph(CreatePropertyGraphOp),
298
299    // ==================== Multi-Way Join ====================
300    /// Multi-way join using worst-case optimal join (leapfrog).
301    /// Used for cyclic patterns (triangles, cliques) with 3+ relations.
302    MultiWayJoin(MultiWayJoinOp),
303
304    // ==================== Procedure Call Operators ====================
305    /// Invoke a stored procedure (CALL ... YIELD).
306    CallProcedure(CallProcedureOp),
307
308    // ==================== Data Import Operators ====================
309    /// Load data from a file (CSV, JSONL, or Parquet), producing one row per record.
310    LoadData(LoadDataOp),
311}
312
313impl LogicalOperator {
314    /// Returns `true` if this operator or any of its children perform mutations.
315    #[must_use]
316    pub fn has_mutations(&self) -> bool {
317        match self {
318            // Direct mutation operators
319            Self::CreateNode(_)
320            | Self::CreateEdge(_)
321            | Self::DeleteNode(_)
322            | Self::DeleteEdge(_)
323            | Self::SetProperty(_)
324            | Self::AddLabel(_)
325            | Self::RemoveLabel(_)
326            | Self::Merge(_)
327            | Self::MergeRelationship(_)
328            | Self::InsertTriple(_)
329            | Self::DeleteTriple(_)
330            | Self::Modify(_)
331            | Self::ClearGraph(_)
332            | Self::CreateGraph(_)
333            | Self::DropGraph(_)
334            | Self::LoadGraph(_)
335            | Self::CopyGraph(_)
336            | Self::MoveGraph(_)
337            | Self::AddGraph(_)
338            | Self::CreatePropertyGraph(_) => true,
339
340            // Operators with an `input` child
341            Self::Filter(op) => op.input.has_mutations(),
342            Self::Project(op) => op.input.has_mutations(),
343            Self::Aggregate(op) => op.input.has_mutations(),
344            Self::Limit(op) => op.input.has_mutations(),
345            Self::Skip(op) => op.input.has_mutations(),
346            Self::Sort(op) => op.input.has_mutations(),
347            Self::Distinct(op) => op.input.has_mutations(),
348            Self::Unwind(op) => op.input.has_mutations(),
349            Self::Bind(op) => op.input.has_mutations(),
350            Self::MapCollect(op) => op.input.has_mutations(),
351            Self::Return(op) => op.input.has_mutations(),
352            Self::HorizontalAggregate(op) => op.input.has_mutations(),
353            Self::VectorScan(op) => op.input.as_deref().is_some_and(Self::has_mutations),
354            Self::VectorJoin(op) => op.input.has_mutations(),
355            Self::TextScan(_) => false,
356
357            // Operators with two children
358            Self::Join(op) => op.left.has_mutations() || op.right.has_mutations(),
359            Self::LeftJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
360            Self::AntiJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
361            Self::Except(op) => op.left.has_mutations() || op.right.has_mutations(),
362            Self::Intersect(op) => op.left.has_mutations() || op.right.has_mutations(),
363            Self::Otherwise(op) => op.left.has_mutations() || op.right.has_mutations(),
364            Self::Union(op) => op.inputs.iter().any(|i| i.has_mutations()),
365            Self::MultiWayJoin(op) => op.inputs.iter().any(|i| i.has_mutations()),
366            Self::Apply(op) => op.input.has_mutations() || op.subplan.has_mutations(),
367
368            // Leaf operators (read-only)
369            Self::NodeScan(_)
370            | Self::EdgeScan(_)
371            | Self::Expand(_)
372            | Self::TripleScan(_)
373            | Self::ShortestPath(_)
374            | Self::Empty
375            | Self::ParameterScan(_)
376            | Self::CallProcedure(_)
377            | Self::LoadData(_) => false,
378            Self::Construct(op) => op.input.has_mutations(),
379        }
380    }
381
382    /// Returns references to the child operators.
383    ///
384    /// Used by [`crate::query::profile::build_profile_tree`] to walk the logical
385    /// plan tree in post-order, matching operators to profiling entries.
386    #[must_use]
387    pub fn children(&self) -> Vec<&LogicalOperator> {
388        match self {
389            // Optional single input
390            Self::NodeScan(op) => op.input.as_deref().into_iter().collect(),
391            Self::EdgeScan(op) => op.input.as_deref().into_iter().collect(),
392            Self::TripleScan(op) => op.input.as_deref().into_iter().collect(),
393            Self::VectorScan(op) => op.input.as_deref().into_iter().collect(),
394            Self::CreateNode(op) => op.input.as_deref().into_iter().collect(),
395            Self::InsertTriple(op) => op.input.as_deref().into_iter().collect(),
396            Self::DeleteTriple(op) => op.input.as_deref().into_iter().collect(),
397
398            // Single required input
399            Self::Expand(op) => vec![&*op.input],
400            Self::Filter(op) => vec![&*op.input],
401            Self::Project(op) => vec![&*op.input],
402            Self::Aggregate(op) => vec![&*op.input],
403            Self::Limit(op) => vec![&*op.input],
404            Self::Skip(op) => vec![&*op.input],
405            Self::Sort(op) => vec![&*op.input],
406            Self::Distinct(op) => vec![&*op.input],
407            Self::Return(op) => vec![&*op.input],
408            Self::Unwind(op) => vec![&*op.input],
409            Self::Bind(op) => vec![&*op.input],
410            Self::Construct(op) => vec![&*op.input],
411            Self::MapCollect(op) => vec![&*op.input],
412            Self::ShortestPath(op) => vec![&*op.input],
413            Self::Merge(op) => vec![&*op.input],
414            Self::MergeRelationship(op) => vec![&*op.input],
415            Self::CreateEdge(op) => vec![&*op.input],
416            Self::DeleteNode(op) => vec![&*op.input],
417            Self::DeleteEdge(op) => vec![&*op.input],
418            Self::SetProperty(op) => vec![&*op.input],
419            Self::AddLabel(op) => vec![&*op.input],
420            Self::RemoveLabel(op) => vec![&*op.input],
421            Self::HorizontalAggregate(op) => vec![&*op.input],
422            Self::VectorJoin(op) => vec![&*op.input],
423            Self::Modify(op) => vec![&*op.where_clause],
424
425            // Two children (left + right)
426            Self::Join(op) => vec![&*op.left, &*op.right],
427            Self::LeftJoin(op) => vec![&*op.left, &*op.right],
428            Self::AntiJoin(op) => vec![&*op.left, &*op.right],
429            Self::Except(op) => vec![&*op.left, &*op.right],
430            Self::Intersect(op) => vec![&*op.left, &*op.right],
431            Self::Otherwise(op) => vec![&*op.left, &*op.right],
432
433            // Two children (input + subplan)
434            Self::Apply(op) => vec![&*op.input, &*op.subplan],
435
436            // Vec children
437            Self::Union(op) => op.inputs.iter().collect(),
438            Self::MultiWayJoin(op) => op.inputs.iter().collect(),
439
440            // Leaf operators
441            Self::Empty
442            | Self::ParameterScan(_)
443            | Self::CallProcedure(_)
444            | Self::ClearGraph(_)
445            | Self::CreateGraph(_)
446            | Self::DropGraph(_)
447            | Self::LoadGraph(_)
448            | Self::CopyGraph(_)
449            | Self::MoveGraph(_)
450            | Self::AddGraph(_)
451            | Self::CreatePropertyGraph(_)
452            | Self::LoadData(_)
453            | Self::TextScan(_) => vec![],
454        }
455    }
456
457    /// Returns a new `LogicalOperator` with each child replaced by `f(child)`.
458    ///
459    /// Mirrors [`Self::children`] in arm coverage; any new operator variant
460    /// must extend both. Child-recursive optimizer passes (e.g. predicate
461    /// propagation) call this to descend without enumerating every variant
462    /// at every call site, eliminating a "forgot to recurse into the new
463    /// variant" bug class.
464    #[must_use]
465    pub fn map_children<F: FnMut(LogicalOperator) -> LogicalOperator>(self, mut f: F) -> Self {
466        match self {
467            // Optional single input
468            Self::NodeScan(mut op) => {
469                op.input = op.input.map(|i| Box::new(f(*i)));
470                Self::NodeScan(op)
471            }
472            Self::EdgeScan(mut op) => {
473                op.input = op.input.map(|i| Box::new(f(*i)));
474                Self::EdgeScan(op)
475            }
476            Self::TripleScan(mut op) => {
477                op.input = op.input.map(|i| Box::new(f(*i)));
478                Self::TripleScan(op)
479            }
480            Self::VectorScan(mut op) => {
481                op.input = op.input.map(|i| Box::new(f(*i)));
482                Self::VectorScan(op)
483            }
484            Self::CreateNode(mut op) => {
485                op.input = op.input.map(|i| Box::new(f(*i)));
486                Self::CreateNode(op)
487            }
488            Self::InsertTriple(mut op) => {
489                op.input = op.input.map(|i| Box::new(f(*i)));
490                Self::InsertTriple(op)
491            }
492            Self::DeleteTriple(mut op) => {
493                op.input = op.input.map(|i| Box::new(f(*i)));
494                Self::DeleteTriple(op)
495            }
496
497            // Single required input
498            Self::Expand(mut op) => {
499                op.input = Box::new(f(*op.input));
500                Self::Expand(op)
501            }
502            Self::Filter(mut op) => {
503                op.input = Box::new(f(*op.input));
504                Self::Filter(op)
505            }
506            Self::Project(mut op) => {
507                op.input = Box::new(f(*op.input));
508                Self::Project(op)
509            }
510            Self::Aggregate(mut op) => {
511                op.input = Box::new(f(*op.input));
512                Self::Aggregate(op)
513            }
514            Self::Limit(mut op) => {
515                op.input = Box::new(f(*op.input));
516                Self::Limit(op)
517            }
518            Self::Skip(mut op) => {
519                op.input = Box::new(f(*op.input));
520                Self::Skip(op)
521            }
522            Self::Sort(mut op) => {
523                op.input = Box::new(f(*op.input));
524                Self::Sort(op)
525            }
526            Self::Distinct(mut op) => {
527                op.input = Box::new(f(*op.input));
528                Self::Distinct(op)
529            }
530            Self::Return(mut op) => {
531                op.input = Box::new(f(*op.input));
532                Self::Return(op)
533            }
534            Self::Unwind(mut op) => {
535                op.input = Box::new(f(*op.input));
536                Self::Unwind(op)
537            }
538            Self::Bind(mut op) => {
539                op.input = Box::new(f(*op.input));
540                Self::Bind(op)
541            }
542            Self::Construct(mut op) => {
543                op.input = Box::new(f(*op.input));
544                Self::Construct(op)
545            }
546            Self::MapCollect(mut op) => {
547                op.input = Box::new(f(*op.input));
548                Self::MapCollect(op)
549            }
550            Self::ShortestPath(mut op) => {
551                op.input = Box::new(f(*op.input));
552                Self::ShortestPath(op)
553            }
554            Self::Merge(mut op) => {
555                op.input = Box::new(f(*op.input));
556                Self::Merge(op)
557            }
558            Self::MergeRelationship(mut op) => {
559                op.input = Box::new(f(*op.input));
560                Self::MergeRelationship(op)
561            }
562            Self::CreateEdge(mut op) => {
563                op.input = Box::new(f(*op.input));
564                Self::CreateEdge(op)
565            }
566            Self::DeleteNode(mut op) => {
567                op.input = Box::new(f(*op.input));
568                Self::DeleteNode(op)
569            }
570            Self::DeleteEdge(mut op) => {
571                op.input = Box::new(f(*op.input));
572                Self::DeleteEdge(op)
573            }
574            Self::SetProperty(mut op) => {
575                op.input = Box::new(f(*op.input));
576                Self::SetProperty(op)
577            }
578            Self::AddLabel(mut op) => {
579                op.input = Box::new(f(*op.input));
580                Self::AddLabel(op)
581            }
582            Self::RemoveLabel(mut op) => {
583                op.input = Box::new(f(*op.input));
584                Self::RemoveLabel(op)
585            }
586            Self::HorizontalAggregate(mut op) => {
587                op.input = Box::new(f(*op.input));
588                Self::HorizontalAggregate(op)
589            }
590            Self::VectorJoin(mut op) => {
591                op.input = Box::new(f(*op.input));
592                Self::VectorJoin(op)
593            }
594            Self::Modify(mut op) => {
595                op.where_clause = Box::new(f(*op.where_clause));
596                Self::Modify(op)
597            }
598
599            // Two children (left + right)
600            Self::Join(mut op) => {
601                op.left = Box::new(f(*op.left));
602                op.right = Box::new(f(*op.right));
603                Self::Join(op)
604            }
605            Self::LeftJoin(mut op) => {
606                op.left = Box::new(f(*op.left));
607                op.right = Box::new(f(*op.right));
608                Self::LeftJoin(op)
609            }
610            Self::AntiJoin(mut op) => {
611                op.left = Box::new(f(*op.left));
612                op.right = Box::new(f(*op.right));
613                Self::AntiJoin(op)
614            }
615            Self::Except(mut op) => {
616                op.left = Box::new(f(*op.left));
617                op.right = Box::new(f(*op.right));
618                Self::Except(op)
619            }
620            Self::Intersect(mut op) => {
621                op.left = Box::new(f(*op.left));
622                op.right = Box::new(f(*op.right));
623                Self::Intersect(op)
624            }
625            Self::Otherwise(mut op) => {
626                op.left = Box::new(f(*op.left));
627                op.right = Box::new(f(*op.right));
628                Self::Otherwise(op)
629            }
630
631            // Two children (input + subplan)
632            Self::Apply(mut op) => {
633                op.input = Box::new(f(*op.input));
634                op.subplan = Box::new(f(*op.subplan));
635                Self::Apply(op)
636            }
637
638            // Vec children
639            Self::Union(mut op) => {
640                op.inputs = op.inputs.into_iter().map(&mut f).collect();
641                Self::Union(op)
642            }
643            Self::MultiWayJoin(mut op) => {
644                op.inputs = op.inputs.into_iter().map(&mut f).collect();
645                Self::MultiWayJoin(op)
646            }
647
648            // Leaf operators
649            leaf @ (Self::Empty
650            | Self::ParameterScan(_)
651            | Self::CallProcedure(_)
652            | Self::ClearGraph(_)
653            | Self::CreateGraph(_)
654            | Self::DropGraph(_)
655            | Self::LoadGraph(_)
656            | Self::CopyGraph(_)
657            | Self::MoveGraph(_)
658            | Self::AddGraph(_)
659            | Self::CreatePropertyGraph(_)
660            | Self::LoadData(_)
661            | Self::TextScan(_)) => leaf,
662        }
663    }
664
665    /// Returns a compact display label for this operator, used in PROFILE output.
666    #[must_use]
667    pub fn display_label(&self) -> String {
668        match self {
669            Self::NodeScan(op) => {
670                let label = op.label.as_deref().unwrap_or("*");
671                format!("{}:{}", op.variable, label)
672            }
673            Self::EdgeScan(op) => {
674                let types = if op.edge_types.is_empty() {
675                    "*".to_string()
676                } else {
677                    op.edge_types.join("|")
678                };
679                format!("{}:{}", op.variable, types)
680            }
681            Self::Expand(op) => {
682                let types = if op.edge_types.is_empty() {
683                    "*".to_string()
684                } else {
685                    op.edge_types.join("|")
686                };
687                let dir = match op.direction {
688                    ExpandDirection::Outgoing => "->",
689                    ExpandDirection::Incoming => "<-",
690                    ExpandDirection::Both => "--",
691                };
692                format!(
693                    "({from}){dir}[:{types}]{dir}({to})",
694                    from = op.from_variable,
695                    to = op.to_variable,
696                )
697            }
698            Self::Filter(op) => {
699                let hint = match &op.pushdown_hint {
700                    Some(PushdownHint::IndexLookup { property }) => {
701                        format!(" [index: {property}]")
702                    }
703                    Some(PushdownHint::RangeScan { property }) => {
704                        format!(" [range: {property}]")
705                    }
706                    Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
707                    None => String::new(),
708                };
709                format!("{}{hint}", fmt_expr(&op.predicate))
710            }
711            Self::Project(op) => {
712                let cols: Vec<String> = op
713                    .projections
714                    .iter()
715                    .map(|p| match &p.alias {
716                        Some(alias) => alias.clone(),
717                        None => fmt_expr(&p.expression),
718                    })
719                    .collect();
720                cols.join(", ")
721            }
722            Self::Join(op) => format!("{:?}", op.join_type),
723            Self::Aggregate(op) => {
724                let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
725                format!("group: [{}]", groups.join(", "))
726            }
727            Self::Limit(op) => format!("{}", op.count),
728            Self::Skip(op) => format!("{}", op.count),
729            Self::Sort(op) => {
730                let keys: Vec<String> = op
731                    .keys
732                    .iter()
733                    .map(|k| {
734                        let dir = match k.order {
735                            SortOrder::Ascending => "ASC",
736                            SortOrder::Descending => "DESC",
737                        };
738                        format!("{} {dir}", fmt_expr(&k.expression))
739                    })
740                    .collect();
741                keys.join(", ")
742            }
743            Self::Distinct(_) => String::new(),
744            Self::Return(op) => {
745                let items: Vec<String> = op
746                    .items
747                    .iter()
748                    .map(|item| match &item.alias {
749                        Some(alias) => alias.clone(),
750                        None => fmt_expr(&item.expression),
751                    })
752                    .collect();
753                items.join(", ")
754            }
755            Self::Union(op) => format!("{} branches", op.inputs.len()),
756            Self::MultiWayJoin(op) => {
757                format!("{} inputs", op.inputs.len())
758            }
759            Self::LeftJoin(_) => String::new(),
760            Self::AntiJoin(_) => String::new(),
761            Self::Unwind(op) => op.variable.clone(),
762            Self::Bind(op) => op.variable.clone(),
763            Self::MapCollect(op) => op.alias.clone(),
764            Self::ShortestPath(op) => {
765                format!("{} -> {}", op.source_var, op.target_var)
766            }
767            Self::Merge(op) => op.variable.clone(),
768            Self::MergeRelationship(op) => op.variable.clone(),
769            Self::CreateNode(op) => {
770                let labels = op.labels.join(":");
771                format!("{}:{labels}", op.variable)
772            }
773            Self::CreateEdge(op) => {
774                format!(
775                    "[{}:{}]",
776                    op.variable.as_deref().unwrap_or("?"),
777                    op.edge_type
778                )
779            }
780            Self::DeleteNode(op) => op.variable.clone(),
781            Self::DeleteEdge(op) => op.variable.clone(),
782            Self::SetProperty(op) => op.variable.clone(),
783            Self::AddLabel(op) => {
784                let labels = op.labels.join(":");
785                format!("{}:{labels}", op.variable)
786            }
787            Self::RemoveLabel(op) => {
788                let labels = op.labels.join(":");
789                format!("{}:{labels}", op.variable)
790            }
791            Self::CallProcedure(op) => op.name.join("."),
792            Self::LoadData(op) => format!("{} AS {}", op.path, op.variable),
793            Self::Apply(_) => String::new(),
794            Self::VectorScan(op) => op.variable.clone(),
795            Self::VectorJoin(op) => op.right_variable.clone(),
796            Self::TextScan(op) => format!("{}:{}", op.variable, op.label),
797            _ => String::new(),
798        }
799    }
800}
801
802impl LogicalOperator {
803    /// Formats this operator tree as a human-readable plan for EXPLAIN output.
804    pub fn explain_tree(&self) -> String {
805        let mut output = String::new();
806        self.fmt_tree(&mut output, 0);
807        output
808    }
809
810    fn fmt_tree(&self, out: &mut String, depth: usize) {
811        use std::fmt::Write;
812
813        let indent = "  ".repeat(depth);
814        match self {
815            Self::NodeScan(op) => {
816                let label = op.label.as_deref().unwrap_or("*");
817                let _ = writeln!(out, "{indent}NodeScan ({var}:{label})", var = op.variable);
818                if let Some(input) = &op.input {
819                    input.fmt_tree(out, depth + 1);
820                }
821            }
822            Self::EdgeScan(op) => {
823                let types = if op.edge_types.is_empty() {
824                    "*".to_string()
825                } else {
826                    op.edge_types.join("|")
827                };
828                let _ = writeln!(out, "{indent}EdgeScan ({var}:{types})", var = op.variable);
829            }
830            Self::Expand(op) => {
831                let types = if op.edge_types.is_empty() {
832                    "*".to_string()
833                } else {
834                    op.edge_types.join("|")
835                };
836                let dir = match op.direction {
837                    ExpandDirection::Outgoing => "->",
838                    ExpandDirection::Incoming => "<-",
839                    ExpandDirection::Both => "--",
840                };
841                let hops = match (op.min_hops, op.max_hops) {
842                    (1, Some(1)) => String::new(),
843                    (min, Some(max)) if min == max => format!("*{min}"),
844                    (min, Some(max)) => format!("*{min}..{max}"),
845                    (min, None) => format!("*{min}.."),
846                };
847                let _ = writeln!(
848                    out,
849                    "{indent}Expand ({from}){dir}[:{types}{hops}]{dir}({to})",
850                    from = op.from_variable,
851                    to = op.to_variable,
852                );
853                op.input.fmt_tree(out, depth + 1);
854            }
855            Self::Filter(op) => {
856                let hint = match &op.pushdown_hint {
857                    Some(PushdownHint::IndexLookup { property }) => {
858                        format!(" [index: {property}]")
859                    }
860                    Some(PushdownHint::RangeScan { property }) => {
861                        format!(" [range: {property}]")
862                    }
863                    Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
864                    None => String::new(),
865                };
866                let _ = writeln!(
867                    out,
868                    "{indent}Filter ({expr}){hint}",
869                    expr = fmt_expr(&op.predicate)
870                );
871                op.input.fmt_tree(out, depth + 1);
872            }
873            Self::Project(op) => {
874                let cols: Vec<String> = op
875                    .projections
876                    .iter()
877                    .map(|p| {
878                        let expr = fmt_expr(&p.expression);
879                        match &p.alias {
880                            Some(alias) => format!("{expr} AS {alias}"),
881                            None => expr,
882                        }
883                    })
884                    .collect();
885                let _ = writeln!(out, "{indent}Project ({cols})", cols = cols.join(", "));
886                op.input.fmt_tree(out, depth + 1);
887            }
888            Self::Join(op) => {
889                let _ = writeln!(out, "{indent}Join ({ty:?})", ty = op.join_type);
890                op.left.fmt_tree(out, depth + 1);
891                op.right.fmt_tree(out, depth + 1);
892            }
893            Self::Aggregate(op) => {
894                let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
895                let aggs: Vec<String> = op
896                    .aggregates
897                    .iter()
898                    .map(|a| {
899                        let func = format!("{:?}", a.function).to_lowercase();
900                        match &a.alias {
901                            Some(alias) => format!("{func}(...) AS {alias}"),
902                            None => format!("{func}(...)"),
903                        }
904                    })
905                    .collect();
906                let _ = writeln!(
907                    out,
908                    "{indent}Aggregate (group: [{groups}], aggs: [{aggs}])",
909                    groups = groups.join(", "),
910                    aggs = aggs.join(", "),
911                );
912                op.input.fmt_tree(out, depth + 1);
913            }
914            Self::Limit(op) => {
915                let _ = writeln!(out, "{indent}Limit ({})", op.count);
916                op.input.fmt_tree(out, depth + 1);
917            }
918            Self::Skip(op) => {
919                let _ = writeln!(out, "{indent}Skip ({})", op.count);
920                op.input.fmt_tree(out, depth + 1);
921            }
922            Self::Sort(op) => {
923                let keys: Vec<String> = op
924                    .keys
925                    .iter()
926                    .map(|k| {
927                        let dir = match k.order {
928                            SortOrder::Ascending => "ASC",
929                            SortOrder::Descending => "DESC",
930                        };
931                        format!("{} {dir}", fmt_expr(&k.expression))
932                    })
933                    .collect();
934                let _ = writeln!(out, "{indent}Sort ({keys})", keys = keys.join(", "));
935                op.input.fmt_tree(out, depth + 1);
936            }
937            Self::Distinct(op) => {
938                let _ = writeln!(out, "{indent}Distinct");
939                op.input.fmt_tree(out, depth + 1);
940            }
941            Self::Return(op) => {
942                let items: Vec<String> = op
943                    .items
944                    .iter()
945                    .map(|item| {
946                        let expr = fmt_expr(&item.expression);
947                        match &item.alias {
948                            Some(alias) => format!("{expr} AS {alias}"),
949                            None => expr,
950                        }
951                    })
952                    .collect();
953                let distinct = if op.distinct { " DISTINCT" } else { "" };
954                let _ = writeln!(
955                    out,
956                    "{indent}Return{distinct} ({items})",
957                    items = items.join(", ")
958                );
959                op.input.fmt_tree(out, depth + 1);
960            }
961            Self::Union(op) => {
962                let _ = writeln!(out, "{indent}Union ({n} branches)", n = op.inputs.len());
963                for input in &op.inputs {
964                    input.fmt_tree(out, depth + 1);
965                }
966            }
967            Self::MultiWayJoin(op) => {
968                let vars = op.shared_variables.join(", ");
969                let _ = writeln!(
970                    out,
971                    "{indent}MultiWayJoin ({n} inputs, shared: [{vars}])",
972                    n = op.inputs.len()
973                );
974                for input in &op.inputs {
975                    input.fmt_tree(out, depth + 1);
976                }
977            }
978            Self::LeftJoin(op) => {
979                if let Some(cond) = &op.condition {
980                    let _ = writeln!(out, "{indent}LeftJoin (condition: {cond:?})");
981                } else {
982                    let _ = writeln!(out, "{indent}LeftJoin");
983                }
984                op.left.fmt_tree(out, depth + 1);
985                op.right.fmt_tree(out, depth + 1);
986            }
987            Self::AntiJoin(op) => {
988                let _ = writeln!(out, "{indent}AntiJoin");
989                op.left.fmt_tree(out, depth + 1);
990                op.right.fmt_tree(out, depth + 1);
991            }
992            Self::Unwind(op) => {
993                let _ = writeln!(out, "{indent}Unwind ({var})", var = op.variable);
994                op.input.fmt_tree(out, depth + 1);
995            }
996            Self::Bind(op) => {
997                let _ = writeln!(out, "{indent}Bind ({var})", var = op.variable);
998                op.input.fmt_tree(out, depth + 1);
999            }
1000            Self::MapCollect(op) => {
1001                let _ = writeln!(
1002                    out,
1003                    "{indent}MapCollect ({key} -> {val} AS {alias})",
1004                    key = op.key_var,
1005                    val = op.value_var,
1006                    alias = op.alias
1007                );
1008                op.input.fmt_tree(out, depth + 1);
1009            }
1010            Self::Apply(op) => {
1011                let _ = writeln!(out, "{indent}Apply");
1012                op.input.fmt_tree(out, depth + 1);
1013                op.subplan.fmt_tree(out, depth + 1);
1014            }
1015            Self::Except(op) => {
1016                let all = if op.all { " ALL" } else { "" };
1017                let _ = writeln!(out, "{indent}Except{all}");
1018                op.left.fmt_tree(out, depth + 1);
1019                op.right.fmt_tree(out, depth + 1);
1020            }
1021            Self::Intersect(op) => {
1022                let all = if op.all { " ALL" } else { "" };
1023                let _ = writeln!(out, "{indent}Intersect{all}");
1024                op.left.fmt_tree(out, depth + 1);
1025                op.right.fmt_tree(out, depth + 1);
1026            }
1027            Self::Otherwise(op) => {
1028                let _ = writeln!(out, "{indent}Otherwise");
1029                op.left.fmt_tree(out, depth + 1);
1030                op.right.fmt_tree(out, depth + 1);
1031            }
1032            Self::ShortestPath(op) => {
1033                let _ = writeln!(
1034                    out,
1035                    "{indent}ShortestPath ({from} -> {to})",
1036                    from = op.source_var,
1037                    to = op.target_var
1038                );
1039                op.input.fmt_tree(out, depth + 1);
1040            }
1041            Self::Merge(op) => {
1042                let _ = writeln!(out, "{indent}Merge ({var})", var = op.variable);
1043                op.input.fmt_tree(out, depth + 1);
1044            }
1045            Self::MergeRelationship(op) => {
1046                let _ = writeln!(out, "{indent}MergeRelationship ({var})", var = op.variable);
1047                op.input.fmt_tree(out, depth + 1);
1048            }
1049            Self::CreateNode(op) => {
1050                let labels = op.labels.join(":");
1051                let _ = writeln!(
1052                    out,
1053                    "{indent}CreateNode ({var}:{labels})",
1054                    var = op.variable
1055                );
1056                if let Some(input) = &op.input {
1057                    input.fmt_tree(out, depth + 1);
1058                }
1059            }
1060            Self::CreateEdge(op) => {
1061                let var = op.variable.as_deref().unwrap_or("?");
1062                let _ = writeln!(
1063                    out,
1064                    "{indent}CreateEdge ({from})-[{var}:{ty}]->({to})",
1065                    from = op.from_variable,
1066                    ty = op.edge_type,
1067                    to = op.to_variable
1068                );
1069                op.input.fmt_tree(out, depth + 1);
1070            }
1071            Self::DeleteNode(op) => {
1072                let _ = writeln!(out, "{indent}DeleteNode ({var})", var = op.variable);
1073                op.input.fmt_tree(out, depth + 1);
1074            }
1075            Self::DeleteEdge(op) => {
1076                let _ = writeln!(out, "{indent}DeleteEdge ({var})", var = op.variable);
1077                op.input.fmt_tree(out, depth + 1);
1078            }
1079            Self::SetProperty(op) => {
1080                let props: Vec<String> = op
1081                    .properties
1082                    .iter()
1083                    .map(|(k, _)| format!("{}.{k}", op.variable))
1084                    .collect();
1085                let _ = writeln!(
1086                    out,
1087                    "{indent}SetProperty ({props})",
1088                    props = props.join(", ")
1089                );
1090                op.input.fmt_tree(out, depth + 1);
1091            }
1092            Self::AddLabel(op) => {
1093                let labels = op.labels.join(":");
1094                let _ = writeln!(out, "{indent}AddLabel ({var}:{labels})", var = op.variable);
1095                op.input.fmt_tree(out, depth + 1);
1096            }
1097            Self::RemoveLabel(op) => {
1098                let labels = op.labels.join(":");
1099                let _ = writeln!(
1100                    out,
1101                    "{indent}RemoveLabel ({var}:{labels})",
1102                    var = op.variable
1103                );
1104                op.input.fmt_tree(out, depth + 1);
1105            }
1106            Self::CallProcedure(op) => {
1107                let _ = writeln!(
1108                    out,
1109                    "{indent}CallProcedure ({name})",
1110                    name = op.name.join(".")
1111                );
1112            }
1113            Self::LoadData(op) => {
1114                let format_name = match op.format {
1115                    LoadDataFormat::Csv => "LoadCsv",
1116                    LoadDataFormat::Jsonl => "LoadJsonl",
1117                    LoadDataFormat::Parquet => "LoadParquet",
1118                    _ => "LoadData",
1119                };
1120                let headers = if op.with_headers && op.format == LoadDataFormat::Csv {
1121                    " WITH HEADERS"
1122                } else {
1123                    ""
1124                };
1125                let _ = writeln!(
1126                    out,
1127                    "{indent}{format_name}{headers} ('{path}' AS {var})",
1128                    path = op.path,
1129                    var = op.variable,
1130                );
1131            }
1132            Self::TripleScan(op) => {
1133                let _ = writeln!(
1134                    out,
1135                    "{indent}TripleScan ({s} {p} {o})",
1136                    s = fmt_triple_component(&op.subject),
1137                    p = fmt_triple_component(&op.predicate),
1138                    o = fmt_triple_component(&op.object)
1139                );
1140                if let Some(input) = &op.input {
1141                    input.fmt_tree(out, depth + 1);
1142                }
1143            }
1144            Self::VectorScan(op) => {
1145                let metric = op.metric.map_or("default", |m| match m {
1146                    VectorMetric::Cosine => "cosine",
1147                    VectorMetric::Euclidean => "euclidean",
1148                    VectorMetric::DotProduct => "dot_product",
1149                    VectorMetric::Manhattan => "manhattan",
1150                });
1151                let mode = match op.k {
1152                    Some(k) => format!("top-{k}"),
1153                    None => "threshold".to_string(),
1154                };
1155                let _ = writeln!(
1156                    out,
1157                    "{indent}VectorScan ({var}:{label}.{prop}, {metric}, {mode})",
1158                    var = op.variable,
1159                    label = op.label.as_deref().unwrap_or("*"),
1160                    prop = op.property,
1161                );
1162                if let Some(input) = &op.input {
1163                    input.fmt_tree(out, depth + 1);
1164                }
1165            }
1166            Self::TextScan(op) => {
1167                let mode = match (op.k, op.threshold) {
1168                    (Some(k), _) => format!("top-{k}"),
1169                    (None, Some(t)) => format!("threshold>={t}"),
1170                    (None, None) => "default-top-100".to_string(),
1171                };
1172                let query = fmt_expr(&op.query);
1173                let _ = writeln!(
1174                    out,
1175                    "{indent}TextScan ({var}:{label}.{prop}, query={query}, {mode})",
1176                    var = op.variable,
1177                    label = op.label,
1178                    prop = op.property,
1179                );
1180            }
1181            Self::Empty => {
1182                let _ = writeln!(out, "{indent}Empty");
1183            }
1184            // Remaining operators: show a simple name
1185            _ => {
1186                let _ = writeln!(out, "{indent}{:?}", std::mem::discriminant(self));
1187            }
1188        }
1189    }
1190}
1191
1192/// Format a logical expression compactly for EXPLAIN output.
1193fn fmt_expr(expr: &LogicalExpression) -> String {
1194    match expr {
1195        LogicalExpression::Variable(name) => name.clone(),
1196        LogicalExpression::Property { variable, property } => format!("{variable}.{property}"),
1197        LogicalExpression::Literal(val) => format!("{val}"),
1198        LogicalExpression::Binary { left, op, right } => {
1199            format!("{} {op:?} {}", fmt_expr(left), fmt_expr(right))
1200        }
1201        LogicalExpression::Unary { op, operand } => {
1202            format!("{op:?} {}", fmt_expr(operand))
1203        }
1204        LogicalExpression::FunctionCall { name, args, .. } => {
1205            let arg_strs: Vec<String> = args.iter().map(fmt_expr).collect();
1206            format!("{name}({})", arg_strs.join(", "))
1207        }
1208        _ => format!("{expr:?}"),
1209    }
1210}
1211
1212/// Format a triple component for EXPLAIN output.
1213fn fmt_triple_component(comp: &TripleComponent) -> String {
1214    match comp {
1215        TripleComponent::Variable(name) => format!("?{name}"),
1216        TripleComponent::Iri(iri) => format!("<{iri}>"),
1217        TripleComponent::Literal(val) => format!("{val}"),
1218        TripleComponent::LangLiteral { value, lang } => format!("\"{value}\"@{lang}"),
1219        TripleComponent::BlankNode(label) => format!("_:{label}"),
1220    }
1221}
1222
1223/// Scan nodes from the graph.
1224#[derive(Debug, Clone)]
1225pub struct NodeScanOp {
1226    /// Variable name to bind the node to.
1227    pub variable: String,
1228    /// Optional label filter.
1229    pub label: Option<String>,
1230    /// Child operator (if any, for chained patterns).
1231    pub input: Option<Box<LogicalOperator>>,
1232}
1233
1234/// Scan edges from the graph.
1235#[derive(Debug, Clone)]
1236pub struct EdgeScanOp {
1237    /// Variable name to bind the edge to.
1238    pub variable: String,
1239    /// Edge type filter (empty = match all types).
1240    pub edge_types: Vec<String>,
1241    /// Child operator (if any).
1242    pub input: Option<Box<LogicalOperator>>,
1243}
1244
1245/// Path traversal mode for variable-length expansion.
1246#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1247#[non_exhaustive]
1248pub enum PathMode {
1249    /// Allows repeated nodes and edges (default).
1250    #[default]
1251    Walk,
1252    /// No repeated edges.
1253    Trail,
1254    /// No repeated nodes except endpoints.
1255    Simple,
1256    /// No repeated nodes at all.
1257    Acyclic,
1258}
1259
1260/// Expand from nodes to their neighbors.
1261#[derive(Debug, Clone)]
1262pub struct ExpandOp {
1263    /// Source node variable.
1264    pub from_variable: String,
1265    /// Target node variable to bind.
1266    pub to_variable: String,
1267    /// Edge variable to bind (optional).
1268    pub edge_variable: Option<String>,
1269    /// Direction of expansion.
1270    pub direction: ExpandDirection,
1271    /// Edge type filter (empty = match all types, multiple = match any).
1272    pub edge_types: Vec<String>,
1273    /// Minimum hops (for variable-length patterns).
1274    pub min_hops: u32,
1275    /// Maximum hops (for variable-length patterns).
1276    pub max_hops: Option<u32>,
1277    /// Input operator.
1278    pub input: Box<LogicalOperator>,
1279    /// Path alias for variable-length patterns (e.g., `p` in `p = (a)-[*1..3]->(b)`).
1280    /// When set, a path length column will be output under this name.
1281    pub path_alias: Option<String>,
1282    /// Path traversal mode (WALK, TRAIL, SIMPLE, ACYCLIC).
1283    pub path_mode: PathMode,
1284}
1285
1286/// Direction for edge expansion.
1287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1288#[non_exhaustive]
1289pub enum ExpandDirection {
1290    /// Follow outgoing edges.
1291    Outgoing,
1292    /// Follow incoming edges.
1293    Incoming,
1294    /// Follow edges in either direction.
1295    Both,
1296}
1297
1298/// Join two inputs.
1299#[derive(Debug, Clone)]
1300pub struct JoinOp {
1301    /// Left input.
1302    pub left: Box<LogicalOperator>,
1303    /// Right input.
1304    pub right: Box<LogicalOperator>,
1305    /// Join type.
1306    pub join_type: JoinType,
1307    /// Join conditions.
1308    pub conditions: Vec<JoinCondition>,
1309}
1310
1311/// Join type.
1312#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1313#[non_exhaustive]
1314pub enum JoinType {
1315    /// Inner join.
1316    Inner,
1317    /// Left outer join.
1318    Left,
1319    /// Right outer join.
1320    Right,
1321    /// Full outer join.
1322    Full,
1323    /// Cross join (Cartesian product).
1324    Cross,
1325    /// Semi join (returns left rows with matching right rows).
1326    Semi,
1327    /// Anti join (returns left rows without matching right rows).
1328    Anti,
1329}
1330
1331/// A join condition.
1332#[derive(Debug, Clone)]
1333pub struct JoinCondition {
1334    /// Left expression.
1335    pub left: LogicalExpression,
1336    /// Right expression.
1337    pub right: LogicalExpression,
1338}
1339
1340/// Multi-way join for worst-case optimal joins (leapfrog).
1341///
1342/// Unlike binary `JoinOp`, this joins 3+ relations simultaneously
1343/// using the leapfrog trie join algorithm. Preferred for cyclic patterns
1344/// (triangles, cliques) where cascading binary joins hit O(N^2).
1345#[derive(Debug, Clone)]
1346pub struct MultiWayJoinOp {
1347    /// Input relations (one per relation in the join).
1348    pub inputs: Vec<LogicalOperator>,
1349    /// All pairwise join conditions.
1350    pub conditions: Vec<JoinCondition>,
1351    /// Variables shared across multiple inputs (intersection keys).
1352    pub shared_variables: Vec<String>,
1353}
1354
1355/// Aggregate with grouping.
1356#[derive(Debug, Clone)]
1357pub struct AggregateOp {
1358    /// Group by expressions.
1359    pub group_by: Vec<LogicalExpression>,
1360    /// Aggregate functions.
1361    pub aggregates: Vec<AggregateExpr>,
1362    /// Input operator.
1363    pub input: Box<LogicalOperator>,
1364    /// HAVING clause filter (applied after aggregation).
1365    pub having: Option<LogicalExpression>,
1366}
1367
1368/// Whether a horizontal aggregate operates on edges or nodes.
1369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1370#[non_exhaustive]
1371pub enum EntityKind {
1372    /// Aggregate over edges in a path.
1373    Edge,
1374    /// Aggregate over nodes in a path.
1375    Node,
1376}
1377
1378/// Per-row aggregation over a list-valued column (horizontal aggregation, GE09).
1379///
1380/// For each input row, reads a list of entity IDs from `list_column`, accesses
1381/// `property` on each entity, computes the aggregate, and emits the scalar result.
1382#[derive(Debug, Clone)]
1383pub struct HorizontalAggregateOp {
1384    /// The list column name (e.g., `_path_edges_p`).
1385    pub list_column: String,
1386    /// Whether the list contains edge IDs or node IDs.
1387    pub entity_kind: EntityKind,
1388    /// The aggregate function to apply.
1389    pub function: AggregateFunction,
1390    /// The property to access on each entity.
1391    pub property: String,
1392    /// Output alias for the result column.
1393    pub alias: String,
1394    /// Input operator.
1395    pub input: Box<LogicalOperator>,
1396}
1397
1398/// An aggregate expression.
1399#[derive(Debug, Clone)]
1400pub struct AggregateExpr {
1401    /// Aggregate function.
1402    pub function: AggregateFunction,
1403    /// Expression to aggregate (first/only argument, y for binary set functions).
1404    pub expression: Option<LogicalExpression>,
1405    /// Second expression for binary set functions (x for COVAR, CORR, REGR_*).
1406    pub expression2: Option<LogicalExpression>,
1407    /// Whether to use DISTINCT.
1408    pub distinct: bool,
1409    /// Alias for the result.
1410    pub alias: Option<String>,
1411    /// Percentile parameter for PERCENTILE_DISC/PERCENTILE_CONT (0.0 to 1.0).
1412    pub percentile: Option<f64>,
1413    /// Separator string for GROUP_CONCAT / LISTAGG (defaults to space for GROUP_CONCAT, comma for LISTAGG).
1414    pub separator: Option<String>,
1415}
1416
1417/// Aggregate function.
1418#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1419#[non_exhaustive]
1420pub enum AggregateFunction {
1421    /// Count all rows (COUNT(*)).
1422    Count,
1423    /// Count non-null values (COUNT(expr)).
1424    CountNonNull,
1425    /// Sum values.
1426    Sum,
1427    /// Average values.
1428    Avg,
1429    /// Minimum value.
1430    Min,
1431    /// Maximum value.
1432    Max,
1433    /// Collect into list.
1434    Collect,
1435    /// Sample standard deviation (STDEV).
1436    StdDev,
1437    /// Population standard deviation (STDEVP).
1438    StdDevPop,
1439    /// Sample variance (VAR_SAMP / VARIANCE).
1440    Variance,
1441    /// Population variance (VAR_POP).
1442    VariancePop,
1443    /// Discrete percentile (PERCENTILE_DISC).
1444    PercentileDisc,
1445    /// Continuous percentile (PERCENTILE_CONT).
1446    PercentileCont,
1447    /// Concatenate values with separator (GROUP_CONCAT).
1448    GroupConcat,
1449    /// Return an arbitrary value from the group (SAMPLE).
1450    Sample,
1451    /// Sample covariance (COVAR_SAMP(y, x)).
1452    CovarSamp,
1453    /// Population covariance (COVAR_POP(y, x)).
1454    CovarPop,
1455    /// Pearson correlation coefficient (CORR(y, x)).
1456    Corr,
1457    /// Regression slope (REGR_SLOPE(y, x)).
1458    RegrSlope,
1459    /// Regression intercept (REGR_INTERCEPT(y, x)).
1460    RegrIntercept,
1461    /// Coefficient of determination (REGR_R2(y, x)).
1462    RegrR2,
1463    /// Regression count of non-null pairs (REGR_COUNT(y, x)).
1464    RegrCount,
1465    /// Regression sum of squares for x (REGR_SXX(y, x)).
1466    RegrSxx,
1467    /// Regression sum of squares for y (REGR_SYY(y, x)).
1468    RegrSyy,
1469    /// Regression sum of cross-products (REGR_SXY(y, x)).
1470    RegrSxy,
1471    /// Regression average of x (REGR_AVGX(y, x)).
1472    RegrAvgx,
1473    /// Regression average of y (REGR_AVGY(y, x)).
1474    RegrAvgy,
1475}
1476
1477/// Hint about how a filter will be executed at the physical level.
1478///
1479/// Set during EXPLAIN annotation to communicate pushdown decisions.
1480#[derive(Debug, Clone)]
1481#[non_exhaustive]
1482pub enum PushdownHint {
1483    /// Equality predicate resolved via a property index.
1484    IndexLookup {
1485        /// The indexed property name.
1486        property: String,
1487    },
1488    /// Range predicate resolved via a range/btree index.
1489    RangeScan {
1490        /// The indexed property name.
1491        property: String,
1492    },
1493    /// No index available, but label narrows the scan before filtering.
1494    LabelFirst,
1495}
1496
1497/// Filter rows based on a predicate.
1498#[derive(Debug, Clone)]
1499pub struct FilterOp {
1500    /// The filter predicate.
1501    pub predicate: LogicalExpression,
1502    /// Input operator.
1503    pub input: Box<LogicalOperator>,
1504    /// Optional hint about pushdown strategy (populated by EXPLAIN).
1505    pub pushdown_hint: Option<PushdownHint>,
1506}
1507
1508/// Project specific columns.
1509#[derive(Debug, Clone)]
1510pub struct ProjectOp {
1511    /// Columns to project.
1512    pub projections: Vec<Projection>,
1513    /// Input operator.
1514    pub input: Box<LogicalOperator>,
1515    /// When true, all input columns are passed through and the explicit
1516    /// projections are appended as additional output columns. Used by GQL
1517    /// LET clauses which add bindings without replacing the existing scope.
1518    pub pass_through_input: bool,
1519}
1520
1521/// A single projection (column selection or computation).
1522#[derive(Debug, Clone)]
1523pub struct Projection {
1524    /// Expression to compute.
1525    pub expression: LogicalExpression,
1526    /// Alias for the result.
1527    pub alias: Option<String>,
1528}
1529
1530/// Limit the number of results.
1531#[derive(Debug, Clone)]
1532pub struct LimitOp {
1533    /// Maximum number of rows to return (literal or parameter reference).
1534    pub count: CountExpr,
1535    /// Input operator.
1536    pub input: Box<LogicalOperator>,
1537}
1538
1539/// Skip a number of results.
1540#[derive(Debug, Clone)]
1541pub struct SkipOp {
1542    /// Number of rows to skip (literal or parameter reference).
1543    pub count: CountExpr,
1544    /// Input operator.
1545    pub input: Box<LogicalOperator>,
1546}
1547
1548/// Sort results.
1549#[derive(Debug, Clone)]
1550pub struct SortOp {
1551    /// Sort keys.
1552    pub keys: Vec<SortKey>,
1553    /// Input operator.
1554    pub input: Box<LogicalOperator>,
1555}
1556
1557/// A sort key.
1558#[derive(Debug, Clone)]
1559pub struct SortKey {
1560    /// Expression to sort by.
1561    pub expression: LogicalExpression,
1562    /// Sort order.
1563    pub order: SortOrder,
1564    /// Optional null ordering (NULLS FIRST / NULLS LAST).
1565    pub nulls: Option<NullsOrdering>,
1566}
1567
1568/// Sort order.
1569#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1570#[non_exhaustive]
1571pub enum SortOrder {
1572    /// Ascending order.
1573    Ascending,
1574    /// Descending order.
1575    Descending,
1576}
1577
1578/// Null ordering for sort operations.
1579#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1580#[non_exhaustive]
1581pub enum NullsOrdering {
1582    /// Nulls sort before all non-null values.
1583    First,
1584    /// Nulls sort after all non-null values.
1585    Last,
1586}
1587
1588/// Remove duplicate results.
1589#[derive(Debug, Clone)]
1590pub struct DistinctOp {
1591    /// Input operator.
1592    pub input: Box<LogicalOperator>,
1593    /// Optional columns to use for deduplication.
1594    /// If None, all columns are used.
1595    pub columns: Option<Vec<String>>,
1596}
1597
1598/// Create a new node.
1599#[derive(Debug, Clone)]
1600pub struct CreateNodeOp {
1601    /// Variable name to bind the created node to.
1602    pub variable: String,
1603    /// Labels for the new node.
1604    pub labels: Vec<String>,
1605    /// Properties for the new node.
1606    pub properties: Vec<(String, LogicalExpression)>,
1607    /// Input operator (for chained creates).
1608    pub input: Option<Box<LogicalOperator>>,
1609}
1610
1611/// Create a new edge.
1612#[derive(Debug, Clone)]
1613pub struct CreateEdgeOp {
1614    /// Variable name to bind the created edge to.
1615    pub variable: Option<String>,
1616    /// Source node variable.
1617    pub from_variable: String,
1618    /// Target node variable.
1619    pub to_variable: String,
1620    /// Edge type.
1621    pub edge_type: String,
1622    /// Properties for the new edge.
1623    pub properties: Vec<(String, LogicalExpression)>,
1624    /// Input operator.
1625    pub input: Box<LogicalOperator>,
1626}
1627
1628/// Delete a node.
1629#[derive(Debug, Clone)]
1630pub struct DeleteNodeOp {
1631    /// Variable of the node to delete.
1632    pub variable: String,
1633    /// Whether to detach (delete connected edges) before deleting.
1634    pub detach: bool,
1635    /// Input operator.
1636    pub input: Box<LogicalOperator>,
1637}
1638
1639/// Delete an edge.
1640#[derive(Debug, Clone)]
1641pub struct DeleteEdgeOp {
1642    /// Variable of the edge to delete.
1643    pub variable: String,
1644    /// Input operator.
1645    pub input: Box<LogicalOperator>,
1646}
1647
1648/// Set properties on a node or edge.
1649#[derive(Debug, Clone)]
1650pub struct SetPropertyOp {
1651    /// Variable of the entity to update.
1652    pub variable: String,
1653    /// Properties to set (name -> expression).
1654    pub properties: Vec<(String, LogicalExpression)>,
1655    /// Whether to replace all properties (vs. merge).
1656    pub replace: bool,
1657    /// Whether the target variable is an edge (vs. node).
1658    pub is_edge: bool,
1659    /// Input operator.
1660    pub input: Box<LogicalOperator>,
1661}
1662
1663/// Add labels to a node.
1664#[derive(Debug, Clone)]
1665pub struct AddLabelOp {
1666    /// Variable of the node to update.
1667    pub variable: String,
1668    /// Labels to add.
1669    pub labels: Vec<String>,
1670    /// Input operator.
1671    pub input: Box<LogicalOperator>,
1672}
1673
1674/// Remove labels from a node.
1675#[derive(Debug, Clone)]
1676pub struct RemoveLabelOp {
1677    /// Variable of the node to update.
1678    pub variable: String,
1679    /// Labels to remove.
1680    pub labels: Vec<String>,
1681    /// Input operator.
1682    pub input: Box<LogicalOperator>,
1683}
1684
1685// ==================== RDF/SPARQL Operators ====================
1686
1687/// SPARQL dataset restriction from FROM / FROM NAMED clauses.
1688///
1689/// When present, restricts which graphs are visible to a triple scan:
1690/// - `default_graphs`: IRIs whose union forms the default graph (basic patterns).
1691/// - `named_graphs`: IRIs that enumerate the available named graphs (GRAPH patterns).
1692#[derive(Debug, Clone, Default)]
1693pub struct DatasetRestriction {
1694    /// FROM IRIs: the default graph is the union of these named graphs.
1695    /// Empty means no FROM clause was specified (unrestricted default graph).
1696    pub default_graphs: Vec<String>,
1697    /// FROM NAMED IRIs: only these named graphs are available to GRAPH patterns.
1698    /// Empty means no FROM NAMED clause was specified (all named graphs visible).
1699    pub named_graphs: Vec<String>,
1700}
1701
1702/// Scan RDF triples matching a pattern.
1703#[derive(Debug, Clone)]
1704pub struct TripleScanOp {
1705    /// Subject pattern (variable name or IRI).
1706    pub subject: TripleComponent,
1707    /// Predicate pattern (variable name or IRI).
1708    pub predicate: TripleComponent,
1709    /// Object pattern (variable name, IRI, or literal).
1710    pub object: TripleComponent,
1711    /// Named graph (optional).
1712    pub graph: Option<TripleComponent>,
1713    /// Input operator (for chained patterns).
1714    pub input: Option<Box<LogicalOperator>>,
1715    /// Dataset restriction from SPARQL FROM / FROM NAMED clauses.
1716    pub dataset: Option<DatasetRestriction>,
1717}
1718
1719/// A component of a triple pattern.
1720#[derive(Debug, Clone)]
1721#[non_exhaustive]
1722pub enum TripleComponent {
1723    /// A variable to bind.
1724    Variable(String),
1725    /// A constant IRI.
1726    Iri(String),
1727    /// A constant literal value.
1728    Literal(Value),
1729    /// A language-tagged string literal (RDF `rdf:langString`).
1730    ///
1731    /// Carries the lexical value and the BCP47 language tag separately so that
1732    /// the tag survives the translator to planner to RDF store round-trip.
1733    LangLiteral {
1734        /// The lexical string value.
1735        value: String,
1736        /// BCP47 language tag, e.g. `"fr"`, `"en-GB"`.
1737        lang: String,
1738    },
1739    /// A blank node with a scoped label (used in INSERT DATA).
1740    BlankNode(String),
1741}
1742
1743impl TripleComponent {
1744    /// Returns the variable name if this component is a `Variable`, or `None`.
1745    #[must_use]
1746    pub fn as_variable(&self) -> Option<&str> {
1747        match self {
1748            Self::Variable(v) => Some(v),
1749            _ => None,
1750        }
1751    }
1752}
1753
1754/// Union of multiple result sets.
1755#[derive(Debug, Clone)]
1756pub struct UnionOp {
1757    /// Inputs to union together.
1758    pub inputs: Vec<LogicalOperator>,
1759}
1760
1761/// Set difference: rows in left that are not in right.
1762#[derive(Debug, Clone)]
1763pub struct ExceptOp {
1764    /// Left input.
1765    pub left: Box<LogicalOperator>,
1766    /// Right input (rows to exclude).
1767    pub right: Box<LogicalOperator>,
1768    /// If true, preserve duplicates (EXCEPT ALL); if false, deduplicate (EXCEPT DISTINCT).
1769    pub all: bool,
1770}
1771
1772/// Set intersection: rows common to both inputs.
1773#[derive(Debug, Clone)]
1774pub struct IntersectOp {
1775    /// Left input.
1776    pub left: Box<LogicalOperator>,
1777    /// Right input.
1778    pub right: Box<LogicalOperator>,
1779    /// If true, preserve duplicates (INTERSECT ALL); if false, deduplicate (INTERSECT DISTINCT).
1780    pub all: bool,
1781}
1782
1783/// Fallback operator: use left result if non-empty, otherwise use right.
1784#[derive(Debug, Clone)]
1785pub struct OtherwiseOp {
1786    /// Primary input (preferred).
1787    pub left: Box<LogicalOperator>,
1788    /// Fallback input (used only if left produces zero rows).
1789    pub right: Box<LogicalOperator>,
1790}
1791
1792/// Apply (lateral join): evaluate a subplan for each row of the outer input.
1793///
1794/// The subplan can reference variables bound by the outer input. Results are
1795/// concatenated (cross-product per row).
1796#[derive(Debug, Clone)]
1797pub struct ApplyOp {
1798    /// Outer input providing rows.
1799    pub input: Box<LogicalOperator>,
1800    /// Subplan to evaluate per outer row.
1801    pub subplan: Box<LogicalOperator>,
1802    /// Variables imported from the outer scope into the inner plan.
1803    /// When non-empty, the planner injects these via `ParameterState`.
1804    pub shared_variables: Vec<String>,
1805    /// When true, uses left-join semantics: outer rows with no matching inner
1806    /// rows are emitted with NULLs for the inner columns (OPTIONAL CALL).
1807    pub optional: bool,
1808}
1809
1810/// Parameter scan: leaf operator for correlated subquery inner plans.
1811///
1812/// Emits a single row containing the values injected from the outer Apply.
1813/// Column names correspond to the outer variables imported via WITH.
1814#[derive(Debug, Clone)]
1815pub struct ParameterScanOp {
1816    /// Column names for the injected parameters.
1817    pub columns: Vec<String>,
1818}
1819
1820/// Left outer join for OPTIONAL patterns.
1821#[derive(Debug, Clone)]
1822pub struct LeftJoinOp {
1823    /// Left (required) input.
1824    pub left: Box<LogicalOperator>,
1825    /// Right (optional) input.
1826    pub right: Box<LogicalOperator>,
1827    /// Optional filter condition.
1828    pub condition: Option<LogicalExpression>,
1829}
1830
1831/// Anti-join for MINUS patterns.
1832#[derive(Debug, Clone)]
1833pub struct AntiJoinOp {
1834    /// Left input (results to keep if no match on right).
1835    pub left: Box<LogicalOperator>,
1836    /// Right input (patterns to exclude).
1837    pub right: Box<LogicalOperator>,
1838}
1839
1840/// Bind a variable to an expression.
1841#[derive(Debug, Clone)]
1842pub struct BindOp {
1843    /// Expression to compute.
1844    pub expression: LogicalExpression,
1845    /// Variable to bind the result to.
1846    pub variable: String,
1847    /// Input operator.
1848    pub input: Box<LogicalOperator>,
1849}
1850
1851/// Unwind a list into individual rows.
1852///
1853/// For each input row, evaluates the expression (which should return a list)
1854/// and emits one row for each element in the list.
1855#[derive(Debug, Clone)]
1856pub struct UnwindOp {
1857    /// The list expression to unwind.
1858    pub expression: LogicalExpression,
1859    /// The variable name for each element.
1860    pub variable: String,
1861    /// Optional variable for 1-based element position (ORDINALITY).
1862    pub ordinality_var: Option<String>,
1863    /// Optional variable for 0-based element position (OFFSET).
1864    pub offset_var: Option<String>,
1865    /// Input operator.
1866    pub input: Box<LogicalOperator>,
1867}
1868
1869/// Collect grouped key-value rows into a single Map value.
1870/// Used for Gremlin `groupCount()` semantics.
1871#[derive(Debug, Clone)]
1872pub struct MapCollectOp {
1873    /// Variable holding the map key.
1874    pub key_var: String,
1875    /// Variable holding the map value.
1876    pub value_var: String,
1877    /// Output variable alias.
1878    pub alias: String,
1879    /// Input operator (typically a grouped aggregate).
1880    pub input: Box<LogicalOperator>,
1881}
1882
1883/// Merge a pattern (match or create).
1884///
1885/// MERGE tries to match a pattern in the graph. If found, returns the existing
1886/// elements (optionally applying ON MATCH SET). If not found, creates the pattern
1887/// (optionally applying ON CREATE SET).
1888#[derive(Debug, Clone)]
1889pub struct MergeOp {
1890    /// The node to merge.
1891    pub variable: String,
1892    /// Labels to match/create.
1893    pub labels: Vec<String>,
1894    /// Properties that must match (used for both matching and creation).
1895    pub match_properties: Vec<(String, LogicalExpression)>,
1896    /// Properties to set on CREATE.
1897    pub on_create: Vec<(String, LogicalExpression)>,
1898    /// Properties to set on MATCH.
1899    pub on_match: Vec<(String, LogicalExpression)>,
1900    /// Input operator.
1901    pub input: Box<LogicalOperator>,
1902}
1903
1904/// Merge a relationship pattern (match or create between two bound nodes).
1905///
1906/// MERGE on a relationship tries to find an existing relationship of the given type
1907/// between the source and target nodes. If found, returns the existing relationship
1908/// (optionally applying ON MATCH SET). If not found, creates it (optionally applying
1909/// ON CREATE SET).
1910#[derive(Debug, Clone)]
1911pub struct MergeRelationshipOp {
1912    /// Variable to bind the relationship to.
1913    pub variable: String,
1914    /// Source node variable (must already be bound).
1915    pub source_variable: String,
1916    /// Target node variable (must already be bound).
1917    pub target_variable: String,
1918    /// Relationship type.
1919    pub edge_type: String,
1920    /// Properties that must match (used for both matching and creation).
1921    pub match_properties: Vec<(String, LogicalExpression)>,
1922    /// Properties to set on CREATE.
1923    pub on_create: Vec<(String, LogicalExpression)>,
1924    /// Properties to set on MATCH.
1925    pub on_match: Vec<(String, LogicalExpression)>,
1926    /// Input operator.
1927    pub input: Box<LogicalOperator>,
1928}
1929
1930/// Find shortest path between two nodes.
1931///
1932/// This operator uses Dijkstra's algorithm to find the shortest path(s)
1933/// between a source node and a target node, optionally filtered by edge type.
1934#[derive(Debug, Clone)]
1935pub struct ShortestPathOp {
1936    /// Input operator providing source/target nodes.
1937    pub input: Box<LogicalOperator>,
1938    /// Variable name for the source node.
1939    pub source_var: String,
1940    /// Variable name for the target node.
1941    pub target_var: String,
1942    /// Edge type filter (empty = match all types, multiple = match any).
1943    pub edge_types: Vec<String>,
1944    /// Direction of edge traversal.
1945    pub direction: ExpandDirection,
1946    /// Variable name to bind the path result.
1947    pub path_alias: String,
1948    /// Whether to find all shortest paths (vs. just one).
1949    pub all_paths: bool,
1950}
1951
1952// ==================== SPARQL Update Operators ====================
1953
1954/// Insert RDF triples.
1955#[derive(Debug, Clone)]
1956pub struct InsertTripleOp {
1957    /// Subject of the triple.
1958    pub subject: TripleComponent,
1959    /// Predicate of the triple.
1960    pub predicate: TripleComponent,
1961    /// Object of the triple.
1962    pub object: TripleComponent,
1963    /// Named graph (optional).
1964    pub graph: Option<String>,
1965    /// Input operator (provides variable bindings).
1966    pub input: Option<Box<LogicalOperator>>,
1967}
1968
1969/// Delete RDF triples.
1970#[derive(Debug, Clone)]
1971pub struct DeleteTripleOp {
1972    /// Subject pattern.
1973    pub subject: TripleComponent,
1974    /// Predicate pattern.
1975    pub predicate: TripleComponent,
1976    /// Object pattern.
1977    pub object: TripleComponent,
1978    /// Named graph (optional).
1979    pub graph: Option<String>,
1980    /// Input operator (provides variable bindings).
1981    pub input: Option<Box<LogicalOperator>>,
1982}
1983
1984/// SPARQL MODIFY operation (DELETE/INSERT WHERE).
1985///
1986/// Per SPARQL 1.1 Update spec, this operator:
1987/// 1. Evaluates the WHERE clause once to get bindings
1988/// 2. Applies DELETE templates using those bindings
1989/// 3. Applies INSERT templates using the SAME bindings
1990///
1991/// This ensures DELETE and INSERT see consistent data.
1992#[derive(Debug, Clone)]
1993pub struct ModifyOp {
1994    /// DELETE triple templates (patterns with variables).
1995    pub delete_templates: Vec<TripleTemplate>,
1996    /// INSERT triple templates (patterns with variables).
1997    pub insert_templates: Vec<TripleTemplate>,
1998    /// WHERE clause that provides variable bindings.
1999    pub where_clause: Box<LogicalOperator>,
2000    /// Named graph context (for WITH clause).
2001    pub graph: Option<String>,
2002}
2003
2004/// A triple template for DELETE/INSERT operations.
2005#[derive(Debug, Clone)]
2006pub struct TripleTemplate {
2007    /// Subject (may be a variable).
2008    pub subject: TripleComponent,
2009    /// Predicate (may be a variable).
2010    pub predicate: TripleComponent,
2011    /// Object (may be a variable or literal).
2012    pub object: TripleComponent,
2013    /// Named graph (optional).
2014    pub graph: Option<String>,
2015}
2016
2017/// SPARQL CONSTRUCT: evaluate WHERE, substitute bindings into template.
2018///
2019/// Produces rows with columns `subject`, `predicate`, `object` by instantiating
2020/// the template once per binding from the WHERE clause.
2021#[derive(Debug, Clone)]
2022pub struct ConstructOp {
2023    /// Triple templates to instantiate.
2024    pub templates: Vec<TripleTemplate>,
2025    /// Input operator (WHERE clause evaluation).
2026    pub input: Box<LogicalOperator>,
2027}
2028
2029/// Clear all triples from a graph.
2030#[derive(Debug, Clone)]
2031pub struct ClearGraphOp {
2032    /// Target graph (None = default graph, Some("") = all named, Some(iri) = specific graph).
2033    pub graph: Option<String>,
2034    /// Whether to silently ignore errors.
2035    pub silent: bool,
2036}
2037
2038/// Create a new named graph.
2039#[derive(Debug, Clone)]
2040pub struct CreateGraphOp {
2041    /// IRI of the graph to create.
2042    pub graph: String,
2043    /// Whether to silently ignore if graph already exists.
2044    pub silent: bool,
2045}
2046
2047/// Drop (remove) a named graph.
2048#[derive(Debug, Clone)]
2049pub struct DropGraphOp {
2050    /// Target graph (None = default graph).
2051    pub graph: Option<String>,
2052    /// Whether to silently ignore errors.
2053    pub silent: bool,
2054}
2055
2056/// Load data from a URL into a graph.
2057#[derive(Debug, Clone)]
2058pub struct LoadGraphOp {
2059    /// Source URL to load data from.
2060    pub source: String,
2061    /// Destination graph (None = default graph).
2062    pub destination: Option<String>,
2063    /// Whether to silently ignore errors.
2064    pub silent: bool,
2065}
2066
2067/// Copy triples from one graph to another.
2068#[derive(Debug, Clone)]
2069pub struct CopyGraphOp {
2070    /// Source graph.
2071    pub source: Option<String>,
2072    /// Destination graph.
2073    pub destination: Option<String>,
2074    /// Whether to silently ignore errors.
2075    pub silent: bool,
2076}
2077
2078/// Move triples from one graph to another.
2079#[derive(Debug, Clone)]
2080pub struct MoveGraphOp {
2081    /// Source graph.
2082    pub source: Option<String>,
2083    /// Destination graph.
2084    pub destination: Option<String>,
2085    /// Whether to silently ignore errors.
2086    pub silent: bool,
2087}
2088
2089/// Add (merge) triples from one graph to another.
2090#[derive(Debug, Clone)]
2091pub struct AddGraphOp {
2092    /// Source graph.
2093    pub source: Option<String>,
2094    /// Destination graph.
2095    pub destination: Option<String>,
2096    /// Whether to silently ignore errors.
2097    pub silent: bool,
2098}
2099
2100// ==================== Vector Search Operators ====================
2101
2102/// Vector similarity scan operation.
2103///
2104/// Performs approximate nearest neighbor search using a vector index (HNSW)
2105/// or brute-force search for small datasets. Returns nodes/edges whose
2106/// embeddings are similar to the query vector.
2107///
2108/// # Example GQL
2109///
2110/// ```gql
2111/// MATCH (m:Movie)
2112/// WHERE vector_similarity(m.embedding, $query_vector) > 0.8
2113/// RETURN m.title
2114/// ```
2115#[derive(Debug, Clone)]
2116pub struct VectorScanOp {
2117    /// Variable name to bind matching entities to.
2118    pub variable: String,
2119    /// Name of the vector index to use (None = brute-force).
2120    pub index_name: Option<String>,
2121    /// Property containing the vector embedding.
2122    pub property: String,
2123    /// Optional label filter (scan only nodes with this label).
2124    pub label: Option<String>,
2125    /// The query vector expression.
2126    pub query_vector: LogicalExpression,
2127    /// Number of nearest neighbors to return (None = threshold mode only).
2128    pub k: Option<usize>,
2129    /// Distance metric (None = use index default, typically cosine).
2130    pub metric: Option<VectorMetric>,
2131    /// Minimum similarity threshold (filters results below this).
2132    pub min_similarity: Option<f32>,
2133    /// Maximum distance threshold (filters results above this).
2134    pub max_distance: Option<f32>,
2135    /// Input operator (for hybrid queries combining graph + vector).
2136    pub input: Option<Box<LogicalOperator>>,
2137}
2138
2139/// Vector distance/similarity metric for vector scan operations.
2140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2141#[non_exhaustive]
2142pub enum VectorMetric {
2143    /// Cosine similarity (1 - cosine_distance). Best for normalized embeddings.
2144    Cosine,
2145    /// Euclidean (L2) distance. Best when magnitude matters.
2146    Euclidean,
2147    /// Dot product. Best for maximum inner product search.
2148    DotProduct,
2149    /// Manhattan (L1) distance. Less sensitive to outliers.
2150    Manhattan,
2151}
2152
2153/// Join graph patterns with vector similarity search.
2154///
2155/// This operator takes entities from the left input and computes vector
2156/// similarity against a query vector, outputting (entity, distance) pairs.
2157///
2158/// # Use Cases
2159///
2160/// 1. **Hybrid graph + vector queries**: Find similar nodes after graph traversal
2161/// 2. **Aggregated embeddings**: Use AVG(embeddings) as query vector
2162/// 3. **Filtering by similarity**: Join with threshold-based filtering
2163///
2164/// # Example
2165///
2166/// ```gql
2167/// // Find movies similar to what the user liked
2168/// MATCH (u:User {id: $user_id})-[:LIKED]->(liked:Movie)
2169/// WITH avg(liked.embedding) AS user_taste
2170/// VECTOR JOIN (m:Movie) ON m.embedding
2171/// WHERE vector_similarity(m.embedding, user_taste) > 0.7
2172/// RETURN m.title
2173/// ```
2174#[derive(Debug, Clone)]
2175pub struct VectorJoinOp {
2176    /// Input operator providing entities to match against.
2177    pub input: Box<LogicalOperator>,
2178    /// Variable from input to extract vectors from (for entity-to-entity similarity).
2179    /// If None, uses `query_vector` directly.
2180    pub left_vector_variable: Option<String>,
2181    /// Property containing the left vector (used with `left_vector_variable`).
2182    pub left_property: Option<String>,
2183    /// The query vector expression (constant or computed).
2184    pub query_vector: LogicalExpression,
2185    /// Variable name to bind the right-side matching entities.
2186    pub right_variable: String,
2187    /// Property containing the right-side vector embeddings.
2188    pub right_property: String,
2189    /// Optional label filter for right-side entities.
2190    pub right_label: Option<String>,
2191    /// Name of vector index on right side (None = brute-force).
2192    pub index_name: Option<String>,
2193    /// Number of nearest neighbors per left-side entity.
2194    pub k: usize,
2195    /// Distance metric.
2196    pub metric: Option<VectorMetric>,
2197    /// Minimum similarity threshold.
2198    pub min_similarity: Option<f32>,
2199    /// Maximum distance threshold.
2200    pub max_distance: Option<f32>,
2201    /// Variable to bind the distance/similarity score.
2202    pub score_variable: Option<String>,
2203}
2204
2205/// Text search scan using BM25 inverted index.
2206#[derive(Debug, Clone)]
2207pub struct TextScanOp {
2208    /// Variable to bind matched nodes.
2209    pub variable: String,
2210    /// Label of nodes to search.
2211    pub label: String,
2212    /// Property holding the text to search.
2213    pub property: String,
2214    /// The search query expression (must resolve to a string).
2215    pub query: LogicalExpression,
2216    /// Top-k limit (None = threshold mode or default 100).
2217    pub k: Option<usize>,
2218    /// Minimum score threshold (None = top-k mode).
2219    pub threshold: Option<f64>,
2220    /// Optional column name to bind the BM25 score.
2221    pub score_column: Option<String>,
2222}
2223
2224/// Return results (terminal operator).
2225#[derive(Debug, Clone)]
2226pub struct ReturnOp {
2227    /// Items to return.
2228    pub items: Vec<ReturnItem>,
2229    /// Whether to return distinct results.
2230    pub distinct: bool,
2231    /// Input operator.
2232    pub input: Box<LogicalOperator>,
2233}
2234
2235/// A single return item.
2236#[derive(Debug, Clone)]
2237pub struct ReturnItem {
2238    /// Expression to return.
2239    pub expression: LogicalExpression,
2240    /// Alias for the result column.
2241    pub alias: Option<String>,
2242}
2243
2244/// Define a property graph schema (SQL/PGQ DDL).
2245#[derive(Debug, Clone)]
2246pub struct CreatePropertyGraphOp {
2247    /// Graph name.
2248    pub name: String,
2249    /// Node table schemas (label name + column definitions).
2250    pub node_tables: Vec<PropertyGraphNodeTable>,
2251    /// Edge table schemas (type name + column definitions + references).
2252    pub edge_tables: Vec<PropertyGraphEdgeTable>,
2253}
2254
2255/// A node table in a property graph definition.
2256#[derive(Debug, Clone)]
2257pub struct PropertyGraphNodeTable {
2258    /// Table name (maps to a node label).
2259    pub name: String,
2260    /// Column definitions as (name, type_name) pairs.
2261    pub columns: Vec<(String, String)>,
2262}
2263
2264/// An edge table in a property graph definition.
2265#[derive(Debug, Clone)]
2266pub struct PropertyGraphEdgeTable {
2267    /// Table name (maps to an edge type).
2268    pub name: String,
2269    /// Column definitions as (name, type_name) pairs.
2270    pub columns: Vec<(String, String)>,
2271    /// Source node table name.
2272    pub source_table: String,
2273    /// Target node table name.
2274    pub target_table: String,
2275}
2276
2277// ==================== Procedure Call Types ====================
2278
2279/// A CALL procedure operation.
2280///
2281/// ```text
2282/// CALL grafeo.pagerank({damping: 0.85}) YIELD nodeId, score
2283/// ```
2284#[derive(Debug, Clone)]
2285pub struct CallProcedureOp {
2286    /// Dotted procedure name, e.g. `["grafeo", "pagerank"]`.
2287    pub name: Vec<String>,
2288    /// Argument expressions (constants in Phase 1).
2289    pub arguments: Vec<LogicalExpression>,
2290    /// Optional YIELD clause: which columns to expose + aliases.
2291    pub yield_items: Option<Vec<ProcedureYield>>,
2292}
2293
2294/// A single YIELD item in a procedure call.
2295#[derive(Debug, Clone)]
2296pub struct ProcedureYield {
2297    /// Column name from the procedure result.
2298    pub field_name: String,
2299    /// Optional alias (YIELD score AS rank).
2300    pub alias: Option<String>,
2301}
2302
2303/// Re-export format enum from the physical operator.
2304pub use grafeo_core::execution::operators::LoadDataFormat;
2305
2306/// LOAD DATA operator: reads a file and produces rows.
2307///
2308/// With headers (CSV), each row is bound as a `Value::Map` with column names as keys.
2309/// Without headers (CSV), each row is bound as a `Value::List` of string values.
2310/// JSONL always produces `Value::Map`. Parquet always produces `Value::Map`.
2311#[derive(Debug, Clone)]
2312pub struct LoadDataOp {
2313    /// File format.
2314    pub format: LoadDataFormat,
2315    /// Whether the file has a header row (CSV only, ignored for JSONL/Parquet).
2316    pub with_headers: bool,
2317    /// File path (local filesystem).
2318    pub path: String,
2319    /// Variable name to bind each row to.
2320    pub variable: String,
2321    /// Field separator character (CSV only, default: comma).
2322    pub field_terminator: Option<char>,
2323}
2324
2325/// A logical expression.
2326#[derive(Debug, Clone)]
2327#[non_exhaustive]
2328pub enum LogicalExpression {
2329    /// A literal value.
2330    Literal(Value),
2331
2332    /// A variable reference.
2333    Variable(String),
2334
2335    /// Property access (e.g., n.name).
2336    Property {
2337        /// The variable to access.
2338        variable: String,
2339        /// The property name.
2340        property: String,
2341    },
2342
2343    /// Binary operation.
2344    Binary {
2345        /// Left operand.
2346        left: Box<LogicalExpression>,
2347        /// Operator.
2348        op: BinaryOp,
2349        /// Right operand.
2350        right: Box<LogicalExpression>,
2351    },
2352
2353    /// Unary operation.
2354    Unary {
2355        /// Operator.
2356        op: UnaryOp,
2357        /// Operand.
2358        operand: Box<LogicalExpression>,
2359    },
2360
2361    /// Function call.
2362    FunctionCall {
2363        /// Function name.
2364        name: String,
2365        /// Arguments.
2366        args: Vec<LogicalExpression>,
2367        /// Whether DISTINCT is applied (e.g., COUNT(DISTINCT x)).
2368        distinct: bool,
2369    },
2370
2371    /// List literal.
2372    List(Vec<LogicalExpression>),
2373
2374    /// Map literal (e.g., {name: 'Alix', age: 30}).
2375    Map(Vec<(String, LogicalExpression)>),
2376
2377    /// Index access (e.g., `list[0]`).
2378    IndexAccess {
2379        /// The base expression (typically a list or string).
2380        base: Box<LogicalExpression>,
2381        /// The index expression.
2382        index: Box<LogicalExpression>,
2383    },
2384
2385    /// Slice access (e.g., list[1..3]).
2386    SliceAccess {
2387        /// The base expression (typically a list or string).
2388        base: Box<LogicalExpression>,
2389        /// Start index (None means from beginning).
2390        start: Option<Box<LogicalExpression>>,
2391        /// End index (None means to end).
2392        end: Option<Box<LogicalExpression>>,
2393    },
2394
2395    /// CASE expression.
2396    Case {
2397        /// Test expression (for simple CASE).
2398        operand: Option<Box<LogicalExpression>>,
2399        /// WHEN clauses.
2400        when_clauses: Vec<(LogicalExpression, LogicalExpression)>,
2401        /// ELSE clause.
2402        else_clause: Option<Box<LogicalExpression>>,
2403    },
2404
2405    /// Parameter reference.
2406    Parameter(String),
2407
2408    /// Labels of a node.
2409    Labels(String),
2410
2411    /// Type of an edge.
2412    Type(String),
2413
2414    /// ID of a node or edge.
2415    Id(String),
2416
2417    /// List comprehension: [x IN list WHERE predicate | expression]
2418    ListComprehension {
2419        /// Variable name for each element.
2420        variable: String,
2421        /// The source list expression.
2422        list_expr: Box<LogicalExpression>,
2423        /// Optional filter predicate.
2424        filter_expr: Option<Box<LogicalExpression>>,
2425        /// The mapping expression for each element.
2426        map_expr: Box<LogicalExpression>,
2427    },
2428
2429    /// List predicate: all/any/none/single(x IN list WHERE pred).
2430    ListPredicate {
2431        /// The kind of list predicate.
2432        kind: ListPredicateKind,
2433        /// The iteration variable name.
2434        variable: String,
2435        /// The source list expression.
2436        list_expr: Box<LogicalExpression>,
2437        /// The predicate to test for each element.
2438        predicate: Box<LogicalExpression>,
2439    },
2440
2441    /// EXISTS subquery.
2442    ExistsSubquery(Box<LogicalOperator>),
2443
2444    /// COUNT subquery.
2445    CountSubquery(Box<LogicalOperator>),
2446
2447    /// VALUE subquery: returns scalar value from first row of inner query.
2448    ValueSubquery(Box<LogicalOperator>),
2449
2450    /// Map projection: `node { .prop1, .prop2, key: expr, .* }`.
2451    MapProjection {
2452        /// The base variable name.
2453        base: String,
2454        /// Projection entries (property selectors, literal entries, all-properties).
2455        entries: Vec<MapProjectionEntry>,
2456    },
2457
2458    /// reduce() accumulator: `reduce(acc = init, x IN list | expr)`.
2459    Reduce {
2460        /// Accumulator variable name.
2461        accumulator: String,
2462        /// Initial value for the accumulator.
2463        initial: Box<LogicalExpression>,
2464        /// Iteration variable name.
2465        variable: String,
2466        /// List to iterate over.
2467        list: Box<LogicalExpression>,
2468        /// Body expression evaluated per iteration (references both accumulator and variable).
2469        expression: Box<LogicalExpression>,
2470    },
2471
2472    /// Pattern comprehension: `[(pattern) WHERE pred | expr]`.
2473    ///
2474    /// Executes the inner subplan, evaluates the projection for each row,
2475    /// and collects the results into a list.
2476    PatternComprehension {
2477        /// The subplan produced by translating the pattern (+optional WHERE).
2478        subplan: Box<LogicalOperator>,
2479        /// The projection expression evaluated for each match.
2480        projection: Box<LogicalExpression>,
2481    },
2482}
2483
2484/// An entry in a map projection.
2485#[derive(Debug, Clone)]
2486#[non_exhaustive]
2487pub enum MapProjectionEntry {
2488    /// `.propertyName`: shorthand for `propertyName: base.propertyName`.
2489    PropertySelector(String),
2490    /// `key: expression`: explicit key-value pair.
2491    LiteralEntry(String, LogicalExpression),
2492    /// `.*`: include all properties of the base entity.
2493    AllProperties,
2494}
2495
2496/// The kind of list predicate function.
2497#[derive(Debug, Clone, PartialEq, Eq)]
2498#[non_exhaustive]
2499pub enum ListPredicateKind {
2500    /// all(x IN list WHERE pred): true if pred holds for every element.
2501    All,
2502    /// any(x IN list WHERE pred): true if pred holds for at least one element.
2503    Any,
2504    /// none(x IN list WHERE pred): true if pred holds for no element.
2505    None,
2506    /// single(x IN list WHERE pred): true if pred holds for exactly one element.
2507    Single,
2508}
2509
2510/// Binary operator.
2511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2512#[non_exhaustive]
2513pub enum BinaryOp {
2514    /// Equality comparison (=).
2515    Eq,
2516    /// Inequality comparison (<>).
2517    Ne,
2518    /// Less than (<).
2519    Lt,
2520    /// Less than or equal (<=).
2521    Le,
2522    /// Greater than (>).
2523    Gt,
2524    /// Greater than or equal (>=).
2525    Ge,
2526
2527    /// Logical AND.
2528    And,
2529    /// Logical OR.
2530    Or,
2531    /// Logical XOR.
2532    Xor,
2533
2534    /// Addition (+).
2535    Add,
2536    /// Subtraction (-).
2537    Sub,
2538    /// Multiplication (*).
2539    Mul,
2540    /// Division (/).
2541    Div,
2542    /// Modulo (%).
2543    Mod,
2544
2545    /// String concatenation.
2546    Concat,
2547    /// String starts with.
2548    StartsWith,
2549    /// String ends with.
2550    EndsWith,
2551    /// String contains.
2552    Contains,
2553
2554    /// Collection membership (IN).
2555    In,
2556    /// Pattern matching (LIKE).
2557    Like,
2558    /// Regex matching (=~).
2559    Regex,
2560    /// Power/exponentiation (^).
2561    Pow,
2562}
2563
2564/// Unary operator.
2565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2566#[non_exhaustive]
2567pub enum UnaryOp {
2568    /// Logical NOT.
2569    Not,
2570    /// Numeric negation.
2571    Neg,
2572    /// IS NULL check.
2573    IsNull,
2574    /// IS NOT NULL check.
2575    IsNotNull,
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580    use super::*;
2581
2582    #[test]
2583    fn test_simple_node_scan_plan() {
2584        let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2585            items: vec![ReturnItem {
2586                expression: LogicalExpression::Variable("n".into()),
2587                alias: None,
2588            }],
2589            distinct: false,
2590            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2591                variable: "n".into(),
2592                label: Some("Person".into()),
2593                input: None,
2594            })),
2595        }));
2596
2597        // Verify structure
2598        if let LogicalOperator::Return(ret) = &plan.root {
2599            assert_eq!(ret.items.len(), 1);
2600            assert!(!ret.distinct);
2601            if let LogicalOperator::NodeScan(scan) = ret.input.as_ref() {
2602                assert_eq!(scan.variable, "n");
2603                assert_eq!(scan.label, Some("Person".into()));
2604            } else {
2605                panic!("Expected NodeScan");
2606            }
2607        } else {
2608            panic!("Expected Return");
2609        }
2610    }
2611
2612    #[test]
2613    fn test_filter_plan() {
2614        let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2615            items: vec![ReturnItem {
2616                expression: LogicalExpression::Property {
2617                    variable: "n".into(),
2618                    property: "name".into(),
2619                },
2620                alias: Some("name".into()),
2621            }],
2622            distinct: false,
2623            input: Box::new(LogicalOperator::Filter(FilterOp {
2624                predicate: LogicalExpression::Binary {
2625                    left: Box::new(LogicalExpression::Property {
2626                        variable: "n".into(),
2627                        property: "age".into(),
2628                    }),
2629                    op: BinaryOp::Gt,
2630                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2631                },
2632                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2633                    variable: "n".into(),
2634                    label: Some("Person".into()),
2635                    input: None,
2636                })),
2637                pushdown_hint: None,
2638            })),
2639        }));
2640
2641        if let LogicalOperator::Return(ret) = &plan.root {
2642            if let LogicalOperator::Filter(filter) = ret.input.as_ref() {
2643                if let LogicalExpression::Binary { op, .. } = &filter.predicate {
2644                    assert_eq!(*op, BinaryOp::Gt);
2645                } else {
2646                    panic!("Expected Binary expression");
2647                }
2648            } else {
2649                panic!("Expected Filter");
2650            }
2651        } else {
2652            panic!("Expected Return");
2653        }
2654    }
2655
2656    // ========================================================================
2657    // has_mutations(): the index-scan operators carry an `input` subtree
2658    // (used to combine graph patterns with vector/text scoring) and must
2659    // recurse into it so a mutation buried under one is not misclassified
2660    // as read-only.
2661    // ========================================================================
2662
2663    fn read_only_scan() -> LogicalOperator {
2664        LogicalOperator::NodeScan(NodeScanOp {
2665            variable: "n".into(),
2666            label: Some("Article".into()),
2667            input: None,
2668        })
2669    }
2670
2671    fn mutating_create_node() -> LogicalOperator {
2672        LogicalOperator::CreateNode(CreateNodeOp {
2673            variable: "n".into(),
2674            labels: vec!["Article".into()],
2675            properties: vec![],
2676            input: None,
2677        })
2678    }
2679
2680    #[test]
2681    fn test_text_scan_is_leaf_no_mutations() {
2682        let op = LogicalOperator::TextScan(TextScanOp {
2683            variable: "doc".into(),
2684            label: "Article".into(),
2685            property: "body".into(),
2686            query: LogicalExpression::Literal(Value::String("rust".into())),
2687            k: Some(10),
2688            threshold: None,
2689            score_column: None,
2690        });
2691        assert!(!op.has_mutations(), "TextScan is a leaf and never mutates");
2692    }
2693
2694    #[test]
2695    fn test_vector_scan_no_input_no_mutations() {
2696        let op = LogicalOperator::VectorScan(VectorScanOp {
2697            variable: "doc".into(),
2698            index_name: None,
2699            property: "embedding".into(),
2700            label: Some("Article".into()),
2701            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2702            k: Some(10),
2703            metric: None,
2704            min_similarity: None,
2705            max_distance: None,
2706            input: None,
2707        });
2708        assert!(!op.has_mutations(), "VectorScan with no input is read-only");
2709    }
2710
2711    #[test]
2712    fn test_vector_scan_recurses_into_mutating_input() {
2713        let op = LogicalOperator::VectorScan(VectorScanOp {
2714            variable: "doc".into(),
2715            index_name: None,
2716            property: "embedding".into(),
2717            label: Some("Article".into()),
2718            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2719            k: Some(10),
2720            metric: None,
2721            min_similarity: None,
2722            max_distance: None,
2723            input: Some(Box::new(mutating_create_node())),
2724        });
2725        assert!(
2726            op.has_mutations(),
2727            "VectorScan must propagate mutations from its input subtree"
2728        );
2729    }
2730
2731    #[test]
2732    fn test_vector_scan_recurses_into_read_only_input() {
2733        let op = LogicalOperator::VectorScan(VectorScanOp {
2734            variable: "doc".into(),
2735            index_name: None,
2736            property: "embedding".into(),
2737            label: Some("Article".into()),
2738            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2739            k: Some(10),
2740            metric: None,
2741            min_similarity: None,
2742            max_distance: None,
2743            input: Some(Box::new(read_only_scan())),
2744        });
2745        assert!(
2746            !op.has_mutations(),
2747            "VectorScan with read-only input is read-only"
2748        );
2749    }
2750
2751    #[test]
2752    fn test_vector_join_recurses_into_mutating_input() {
2753        let op = LogicalOperator::VectorJoin(VectorJoinOp {
2754            input: Box::new(mutating_create_node()),
2755            left_vector_variable: None,
2756            left_property: None,
2757            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2758            right_variable: "m".into(),
2759            right_property: "embedding".into(),
2760            right_label: Some("Movie".into()),
2761            index_name: None,
2762            k: 10,
2763            metric: Some(VectorMetric::Cosine),
2764            min_similarity: None,
2765            max_distance: None,
2766            score_variable: None,
2767        });
2768        assert!(
2769            op.has_mutations(),
2770            "VectorJoin must recurse into input, was previously hard-coded false"
2771        );
2772    }
2773
2774    #[test]
2775    fn test_vector_join_with_read_only_input_is_read_only() {
2776        let op = LogicalOperator::VectorJoin(VectorJoinOp {
2777            input: Box::new(read_only_scan()),
2778            left_vector_variable: None,
2779            left_property: None,
2780            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2781            right_variable: "m".into(),
2782            right_property: "embedding".into(),
2783            right_label: Some("Movie".into()),
2784            index_name: None,
2785            k: 10,
2786            metric: Some(VectorMetric::Cosine),
2787            min_similarity: None,
2788            max_distance: None,
2789            score_variable: None,
2790        });
2791        assert!(!op.has_mutations());
2792    }
2793
2794    // ========================================================================
2795    // TextScan EXPLAIN/fmt_tree labeling: distinguishes top-k, threshold, and
2796    // the default-top-100 path (when both k and threshold are None).
2797    // ========================================================================
2798
2799    fn text_scan_with_modes(k: Option<usize>, threshold: Option<f64>) -> String {
2800        let plan = LogicalPlan::new(LogicalOperator::TextScan(TextScanOp {
2801            variable: "doc".into(),
2802            label: "Article".into(),
2803            property: "body".into(),
2804            query: LogicalExpression::Literal(Value::String("rust".into())),
2805            k,
2806            threshold,
2807            score_column: None,
2808        }));
2809        let mut out = String::new();
2810        plan.root.fmt_tree(&mut out, 0);
2811        out
2812    }
2813
2814    #[test]
2815    fn test_text_scan_display_top_k_mode() {
2816        let out = text_scan_with_modes(Some(10), None);
2817        assert!(out.contains("top-10"), "expected top-10 in:\n{out}");
2818        assert!(
2819            !out.contains("threshold"),
2820            "top-k mode should not say threshold:\n{out}"
2821        );
2822    }
2823
2824    #[test]
2825    fn test_text_scan_display_threshold_mode() {
2826        let out = text_scan_with_modes(None, Some(0.5));
2827        assert!(
2828            out.contains("threshold>=0.5"),
2829            "expected threshold>=0.5 in:\n{out}"
2830        );
2831        assert!(
2832            !out.contains("top-"),
2833            "threshold mode should not say top-:\n{out}"
2834        );
2835    }
2836
2837    #[test]
2838    fn test_text_scan_display_default_mode_when_both_none() {
2839        let out = text_scan_with_modes(None, None);
2840        assert!(
2841            out.contains("default-top-100"),
2842            "expected default-top-100 (both k and threshold None) in:\n{out}"
2843        );
2844    }
2845
2846    #[test]
2847    fn test_text_scan_display_k_takes_precedence_over_threshold() {
2848        // When both are set, k wins (top-k mode is what the planner actually executes).
2849        let out = text_scan_with_modes(Some(5), Some(0.3));
2850        assert!(out.contains("top-5"), "expected top-5 in:\n{out}");
2851        assert!(
2852            !out.contains("threshold"),
2853            "k should take precedence over threshold:\n{out}"
2854        );
2855    }
2856
2857    /// EXPLAIN tree for Project(Filter(Expand(NodeScan))) includes each
2858    /// operator name, uses 2-space indentation per depth, and calls
2859    /// `display_label` semantics (labels appear in the tree).
2860    #[test]
2861    fn test_explain_tree_basic_operators() {
2862        let plan = LogicalOperator::Project(ProjectOp {
2863            projections: vec![Projection {
2864                expression: LogicalExpression::Property {
2865                    variable: "b".into(),
2866                    property: "name".into(),
2867                },
2868                alias: Some("name".into()),
2869            }],
2870            input: Box::new(LogicalOperator::Filter(FilterOp {
2871                predicate: LogicalExpression::Binary {
2872                    left: Box::new(LogicalExpression::Property {
2873                        variable: "b".into(),
2874                        property: "age".into(),
2875                    }),
2876                    op: BinaryOp::Gt,
2877                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2878                },
2879                input: Box::new(LogicalOperator::Expand(ExpandOp {
2880                    from_variable: "a".into(),
2881                    to_variable: "b".into(),
2882                    edge_variable: None,
2883                    direction: ExpandDirection::Outgoing,
2884                    edge_types: vec!["KNOWS".into()],
2885                    min_hops: 1,
2886                    max_hops: Some(1),
2887                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2888                        variable: "a".into(),
2889                        label: Some("Person".into()),
2890                        input: None,
2891                    })),
2892                    path_alias: None,
2893                    path_mode: PathMode::Walk,
2894                })),
2895                pushdown_hint: Some(PushdownHint::LabelFirst),
2896            })),
2897            pass_through_input: false,
2898        });
2899
2900        let tree = plan.explain_tree();
2901
2902        // Each operator appears with the expected name
2903        assert!(tree.contains("Project"), "missing Project in:\n{tree}");
2904        assert!(tree.contains("Filter"), "missing Filter in:\n{tree}");
2905        assert!(tree.contains("Expand"), "missing Expand in:\n{tree}");
2906        assert!(tree.contains("NodeScan"), "missing NodeScan in:\n{tree}");
2907
2908        // Indentation: Project at depth 0, Filter at depth 1 (2 spaces),
2909        // Expand at depth 2 (4 spaces), NodeScan at depth 3 (6 spaces).
2910        assert!(tree.starts_with("Project"));
2911        assert!(
2912            tree.contains("\n  Filter"),
2913            "Filter should be indented by 2 spaces"
2914        );
2915        assert!(
2916            tree.contains("\n    Expand"),
2917            "Expand should be indented by 4 spaces"
2918        );
2919        assert!(
2920            tree.contains("\n      NodeScan"),
2921            "NodeScan should be indented by 6 spaces"
2922        );
2923
2924        // Labels from display_label-style rendering appear: Person label,
2925        // KNOWS edge type, label-first pushdown hint, projection alias.
2926        assert!(tree.contains("Person"));
2927        assert!(tree.contains("KNOWS"));
2928        assert!(tree.contains("[label-first]"));
2929        assert!(tree.contains("AS name"));
2930    }
2931
2932    /// `has_mutations` recurses through Project/Filter into their inputs.
2933    #[test]
2934    fn test_has_mutations_recursive() {
2935        // Project(Filter(CreateNode)) ⇒ true
2936        let with_mutation = LogicalOperator::Project(ProjectOp {
2937            projections: vec![],
2938            input: Box::new(LogicalOperator::Filter(FilterOp {
2939                predicate: LogicalExpression::Literal(Value::Bool(true)),
2940                input: Box::new(LogicalOperator::CreateNode(CreateNodeOp {
2941                    variable: "n".into(),
2942                    labels: vec!["Person".into()],
2943                    properties: vec![],
2944                    input: None,
2945                })),
2946                pushdown_hint: None,
2947            })),
2948            pass_through_input: false,
2949        });
2950        assert!(with_mutation.has_mutations());
2951
2952        // Project(Filter(NodeScan)) ⇒ false
2953        let read_only = LogicalOperator::Project(ProjectOp {
2954            projections: vec![],
2955            input: Box::new(LogicalOperator::Filter(FilterOp {
2956                predicate: LogicalExpression::Literal(Value::Bool(true)),
2957                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2958                    variable: "n".into(),
2959                    label: None,
2960                    input: None,
2961                })),
2962                pushdown_hint: None,
2963            })),
2964            pass_through_input: false,
2965        });
2966        assert!(!read_only.has_mutations());
2967    }
2968
2969    /// Union returns all its branches in order via `children()`, and
2970    /// Apply returns both input and subplan.
2971    #[test]
2972    fn test_children_collection_for_union_and_apply() {
2973        let leaf = |label: &str| {
2974            LogicalOperator::NodeScan(NodeScanOp {
2975                variable: "n".into(),
2976                label: Some(label.into()),
2977                input: None,
2978            })
2979        };
2980
2981        let union = LogicalOperator::Union(UnionOp {
2982            inputs: vec![leaf("Amsterdam"), leaf("Berlin"), leaf("Prague")],
2983        });
2984        let children = union.children();
2985        assert_eq!(children.len(), 3);
2986        match children[0] {
2987            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Amsterdam")),
2988            _ => panic!("Expected NodeScan"),
2989        }
2990        match children[2] {
2991            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Prague")),
2992            _ => panic!("Expected NodeScan"),
2993        }
2994
2995        let apply = LogicalOperator::Apply(ApplyOp {
2996            input: Box::new(leaf("Person")),
2997            subplan: Box::new(leaf("Company")),
2998            shared_variables: vec![],
2999            optional: false,
3000        });
3001        let apply_children = apply.children();
3002        assert_eq!(apply_children.len(), 2);
3003        match apply_children[0] {
3004            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Person")),
3005            _ => panic!("Expected input NodeScan"),
3006        }
3007        match apply_children[1] {
3008            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Company")),
3009            _ => panic!("Expected subplan NodeScan"),
3010        }
3011    }
3012
3013    /// Unresolved `CountExpr::Parameter` falls back to a default estimate of 10.0.
3014    #[test]
3015    fn test_count_expr_parameter_default() {
3016        let param = CountExpr::Parameter("limit".to_string());
3017        assert!((param.estimate() - 10.0).abs() < f64::EPSILON);
3018
3019        let literal = CountExpr::Literal(42);
3020        assert!((literal.estimate() - 42.0).abs() < f64::EPSILON);
3021        assert_eq!(literal.value(), 42);
3022        assert_eq!(literal.try_value(), Ok(42));
3023
3024        // try_value returns an error for unresolved parameters,
3025        // preserving the parameter name in the message.
3026        let err = param.try_value().unwrap_err();
3027        assert!(err.contains("$limit"), "error should mention $limit: {err}");
3028
3029        // Display/Equality sanity
3030        assert_eq!(format!("{literal}"), "42");
3031        assert_eq!(format!("{param}"), "$limit");
3032        assert!(literal == 42usize);
3033    }
3034
3035    // ==================== CountExpr ====================
3036
3037    #[test]
3038    fn count_expr_literal_value() {
3039        let count = CountExpr::Literal(42);
3040        assert_eq!(count.value(), 42);
3041        assert_eq!(count.try_value(), Ok(42));
3042        assert!((count.estimate() - 42.0).abs() < f64::EPSILON);
3043    }
3044
3045    #[test]
3046    fn count_expr_parameter_try_value_errors() {
3047        let count = CountExpr::Parameter("limit".into());
3048        let err = count.try_value().unwrap_err();
3049        assert!(err.contains("$limit"));
3050        // Estimate falls back to default for unresolved parameters.
3051        assert!((count.estimate() - 10.0).abs() < f64::EPSILON);
3052    }
3053
3054    #[test]
3055    #[should_panic(expected = "Unresolved parameter: $rows")]
3056    fn count_expr_parameter_value_panics() {
3057        let count = CountExpr::Parameter("rows".into());
3058        let _ = count.value();
3059    }
3060
3061    #[test]
3062    fn count_expr_display_and_conversions() {
3063        assert_eq!(format!("{}", CountExpr::Literal(7)), "7");
3064        assert_eq!(format!("{}", CountExpr::Parameter("n".into())), "$n");
3065        let from_usize: CountExpr = 3usize.into();
3066        assert_eq!(from_usize, CountExpr::Literal(3));
3067        assert_eq!(CountExpr::Literal(5), 5usize);
3068        assert!(CountExpr::Parameter("x".into()) != 5usize);
3069    }
3070
3071    // ==================== LogicalPlan constructors ====================
3072
3073    #[test]
3074    fn logical_plan_constructors() {
3075        let leaf = || LogicalOperator::Empty;
3076
3077        let normal = LogicalPlan::new(leaf());
3078        assert!(!normal.explain);
3079        assert!(!normal.profile);
3080        assert!(normal.default_params.is_empty());
3081
3082        let explained = LogicalPlan::explain(leaf());
3083        assert!(explained.explain);
3084        assert!(!explained.profile);
3085
3086        let profiled = LogicalPlan::profile(leaf());
3087        assert!(!profiled.explain);
3088        assert!(profiled.profile);
3089    }
3090
3091    // ==================== Helpers for tests ====================
3092
3093    fn var(name: &str) -> LogicalExpression {
3094        LogicalExpression::Variable(name.into())
3095    }
3096
3097    fn leaf_empty() -> Box<LogicalOperator> {
3098        Box::new(LogicalOperator::Empty)
3099    }
3100
3101    fn leaf_node_scan(v: &str) -> Box<LogicalOperator> {
3102        Box::new(LogicalOperator::NodeScan(NodeScanOp {
3103            variable: v.into(),
3104            label: None,
3105            input: None,
3106        }))
3107    }
3108
3109    fn leaf_create_node(v: &str) -> Box<LogicalOperator> {
3110        Box::new(LogicalOperator::CreateNode(CreateNodeOp {
3111            variable: v.into(),
3112            labels: vec!["Person".into()],
3113            properties: vec![],
3114            input: None,
3115        }))
3116    }
3117
3118    // ==================== has_mutations ====================
3119
3120    #[test]
3121    fn has_mutations_direct_operators_are_mutating() {
3122        // A representative direct mutation operator.
3123        let op = LogicalOperator::CreateNode(CreateNodeOp {
3124            variable: "vincent".into(),
3125            labels: vec!["Person".into()],
3126            properties: vec![],
3127            input: None,
3128        });
3129        assert!(op.has_mutations());
3130
3131        let delete = LogicalOperator::DeleteNode(DeleteNodeOp {
3132            variable: "vincent".into(),
3133            detach: true,
3134            input: leaf_node_scan("vincent"),
3135        });
3136        assert!(delete.has_mutations());
3137
3138        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
3139            variable: "mia".into(),
3140            properties: vec![("city".into(), LogicalExpression::Literal(Value::Null))],
3141            replace: false,
3142            is_edge: false,
3143            input: leaf_node_scan("mia"),
3144        });
3145        assert!(set_prop.has_mutations());
3146
3147        let insert_triple = LogicalOperator::InsertTriple(InsertTripleOp {
3148            subject: TripleComponent::Iri("s".into()),
3149            predicate: TripleComponent::Iri("p".into()),
3150            object: TripleComponent::Iri("o".into()),
3151            graph: None,
3152            input: None,
3153        });
3154        assert!(insert_triple.has_mutations());
3155
3156        let clear = LogicalOperator::ClearGraph(ClearGraphOp {
3157            graph: None,
3158            silent: false,
3159        });
3160        assert!(clear.has_mutations());
3161
3162        let ddl = LogicalOperator::CreatePropertyGraph(CreatePropertyGraphOp {
3163            name: "g".into(),
3164            node_tables: vec![],
3165            edge_tables: vec![],
3166        });
3167        assert!(ddl.has_mutations());
3168    }
3169
3170    #[test]
3171    fn has_mutations_propagates_through_single_input_operators() {
3172        let base = || {
3173            LogicalOperator::SetProperty(SetPropertyOp {
3174                variable: "butch".into(),
3175                properties: vec![],
3176                replace: false,
3177                is_edge: false,
3178                input: leaf_node_scan("butch"),
3179            })
3180        };
3181
3182        // Filter, Project, Limit, Skip, Sort, Distinct, Unwind, Bind, MapCollect,
3183        // Return, HorizontalAggregate all wrap the input.
3184        let filter = LogicalOperator::Filter(FilterOp {
3185            predicate: var("x"),
3186            input: Box::new(base()),
3187            pushdown_hint: None,
3188        });
3189        assert!(filter.has_mutations());
3190
3191        let project = LogicalOperator::Project(ProjectOp {
3192            projections: vec![],
3193            input: Box::new(base()),
3194            pass_through_input: false,
3195        });
3196        assert!(project.has_mutations());
3197
3198        let agg = LogicalOperator::Aggregate(AggregateOp {
3199            group_by: vec![],
3200            aggregates: vec![],
3201            input: Box::new(base()),
3202            having: None,
3203        });
3204        assert!(agg.has_mutations());
3205
3206        let limit = LogicalOperator::Limit(LimitOp {
3207            count: CountExpr::Literal(10),
3208            input: Box::new(base()),
3209        });
3210        assert!(limit.has_mutations());
3211
3212        let skip = LogicalOperator::Skip(SkipOp {
3213            count: CountExpr::Literal(5),
3214            input: Box::new(base()),
3215        });
3216        assert!(skip.has_mutations());
3217
3218        let sort = LogicalOperator::Sort(SortOp {
3219            keys: vec![],
3220            input: Box::new(base()),
3221        });
3222        assert!(sort.has_mutations());
3223
3224        let distinct = LogicalOperator::Distinct(DistinctOp {
3225            input: Box::new(base()),
3226            columns: None,
3227        });
3228        assert!(distinct.has_mutations());
3229
3230        let unwind = LogicalOperator::Unwind(UnwindOp {
3231            expression: var("xs"),
3232            variable: "x".into(),
3233            ordinality_var: None,
3234            offset_var: None,
3235            input: Box::new(base()),
3236        });
3237        assert!(unwind.has_mutations());
3238
3239        let bind = LogicalOperator::Bind(BindOp {
3240            expression: var("x"),
3241            variable: "y".into(),
3242            input: Box::new(base()),
3243        });
3244        assert!(bind.has_mutations());
3245
3246        let map_collect = LogicalOperator::MapCollect(MapCollectOp {
3247            key_var: "k".into(),
3248            value_var: "v".into(),
3249            alias: "m".into(),
3250            input: Box::new(base()),
3251        });
3252        assert!(map_collect.has_mutations());
3253
3254        let ret = LogicalOperator::Return(ReturnOp {
3255            items: vec![],
3256            distinct: false,
3257            input: Box::new(base()),
3258        });
3259        assert!(ret.has_mutations());
3260
3261        let hagg = LogicalOperator::HorizontalAggregate(HorizontalAggregateOp {
3262            list_column: "_path".into(),
3263            entity_kind: EntityKind::Edge,
3264            function: AggregateFunction::Sum,
3265            property: "weight".into(),
3266            alias: "total".into(),
3267            input: Box::new(base()),
3268        });
3269        assert!(hagg.has_mutations());
3270
3271        let construct = LogicalOperator::Construct(ConstructOp {
3272            templates: vec![],
3273            input: Box::new(base()),
3274        });
3275        assert!(construct.has_mutations());
3276    }
3277
3278    #[test]
3279    fn has_mutations_vector_operators_are_readonly() {
3280        let vscan = LogicalOperator::VectorScan(VectorScanOp {
3281            variable: "m".into(),
3282            index_name: None,
3283            property: "embedding".into(),
3284            label: None,
3285            query_vector: LogicalExpression::Literal(Value::Null),
3286            k: Some(5),
3287            metric: Some(VectorMetric::Cosine),
3288            min_similarity: None,
3289            max_distance: None,
3290            input: None,
3291        });
3292        assert!(!vscan.has_mutations());
3293
3294        let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
3295            input: leaf_node_scan("m"),
3296            left_vector_variable: None,
3297            left_property: None,
3298            query_vector: LogicalExpression::Literal(Value::Null),
3299            right_variable: "n".into(),
3300            right_property: "embedding".into(),
3301            right_label: None,
3302            index_name: None,
3303            k: 3,
3304            metric: None,
3305            min_similarity: None,
3306            max_distance: None,
3307            score_variable: None,
3308        });
3309        assert!(!vjoin.has_mutations());
3310    }
3311
3312    #[test]
3313    fn has_mutations_two_children_and_union_apply() {
3314        let mutating = || *leaf_create_node("jules");
3315        let read = || *leaf_node_scan("jules");
3316
3317        let join_readonly = LogicalOperator::Join(JoinOp {
3318            left: Box::new(read()),
3319            right: Box::new(read()),
3320            join_type: JoinType::Inner,
3321            conditions: vec![],
3322        });
3323        assert!(!join_readonly.has_mutations());
3324
3325        let join_right_mutates = LogicalOperator::Join(JoinOp {
3326            left: Box::new(read()),
3327            right: Box::new(mutating()),
3328            join_type: JoinType::Left,
3329            conditions: vec![],
3330        });
3331        assert!(join_right_mutates.has_mutations());
3332
3333        let left_join = LogicalOperator::LeftJoin(LeftJoinOp {
3334            left: Box::new(mutating()),
3335            right: Box::new(read()),
3336            condition: None,
3337        });
3338        assert!(left_join.has_mutations());
3339
3340        let anti_join = LogicalOperator::AntiJoin(AntiJoinOp {
3341            left: Box::new(read()),
3342            right: Box::new(mutating()),
3343        });
3344        assert!(anti_join.has_mutations());
3345
3346        let except = LogicalOperator::Except(ExceptOp {
3347            left: Box::new(read()),
3348            right: Box::new(read()),
3349            all: true,
3350        });
3351        assert!(!except.has_mutations());
3352
3353        let intersect = LogicalOperator::Intersect(IntersectOp {
3354            left: Box::new(mutating()),
3355            right: Box::new(read()),
3356            all: false,
3357        });
3358        assert!(intersect.has_mutations());
3359
3360        let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
3361            left: Box::new(read()),
3362            right: Box::new(mutating()),
3363        });
3364        assert!(otherwise.has_mutations());
3365
3366        let union = LogicalOperator::Union(UnionOp {
3367            inputs: vec![read(), mutating(), read()],
3368        });
3369        assert!(union.has_mutations());
3370
3371        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3372            inputs: vec![read(), read()],
3373            conditions: vec![],
3374            shared_variables: vec!["a".into()],
3375        });
3376        assert!(!mwj.has_mutations());
3377
3378        let apply_readonly = LogicalOperator::Apply(ApplyOp {
3379            input: Box::new(read()),
3380            subplan: Box::new(read()),
3381            shared_variables: vec![],
3382            optional: false,
3383        });
3384        assert!(!apply_readonly.has_mutations());
3385
3386        let apply_inner_mutates = LogicalOperator::Apply(ApplyOp {
3387            input: Box::new(read()),
3388            subplan: Box::new(mutating()),
3389            shared_variables: vec![],
3390            optional: true,
3391        });
3392        assert!(apply_inner_mutates.has_mutations());
3393    }
3394
3395    #[test]
3396    fn has_mutations_leaf_operators_are_readonly() {
3397        assert!(!LogicalOperator::Empty.has_mutations());
3398        assert!(
3399            !LogicalOperator::ParameterScan(ParameterScanOp {
3400                columns: vec!["a".into()],
3401            })
3402            .has_mutations()
3403        );
3404        assert!(
3405            !LogicalOperator::CallProcedure(CallProcedureOp {
3406                name: vec!["grafeo".into(), "pagerank".into()],
3407                arguments: vec![],
3408                yield_items: None,
3409            })
3410            .has_mutations()
3411        );
3412        assert!(
3413            !LogicalOperator::LoadData(LoadDataOp {
3414                format: LoadDataFormat::Csv,
3415                with_headers: true,
3416                path: "/tmp/x.csv".into(),
3417                variable: "row".into(),
3418                field_terminator: None,
3419            })
3420            .has_mutations()
3421        );
3422        assert!(
3423            !LogicalOperator::TripleScan(TripleScanOp {
3424                subject: TripleComponent::Variable("s".into()),
3425                predicate: TripleComponent::Variable("p".into()),
3426                object: TripleComponent::Variable("o".into()),
3427                graph: None,
3428                input: None,
3429                dataset: None,
3430            })
3431            .has_mutations()
3432        );
3433    }
3434
3435    // ==================== children() ====================
3436
3437    #[test]
3438    fn children_of_leaf_operators() {
3439        assert!(LogicalOperator::Empty.children().is_empty());
3440        assert!(
3441            LogicalOperator::CallProcedure(CallProcedureOp {
3442                name: vec!["p".into()],
3443                arguments: vec![],
3444                yield_items: None,
3445            })
3446            .children()
3447            .is_empty()
3448        );
3449        assert!(
3450            LogicalOperator::CreateGraph(CreateGraphOp {
3451                graph: "g".into(),
3452                silent: false,
3453            })
3454            .children()
3455            .is_empty()
3456        );
3457        assert!(
3458            LogicalOperator::LoadData(LoadDataOp {
3459                format: LoadDataFormat::Jsonl,
3460                with_headers: false,
3461                path: "x.jsonl".into(),
3462                variable: "r".into(),
3463                field_terminator: None,
3464            })
3465            .children()
3466            .is_empty()
3467        );
3468    }
3469
3470    #[test]
3471    fn children_of_optional_input_operators() {
3472        let ns_no_input = LogicalOperator::NodeScan(NodeScanOp {
3473            variable: "n".into(),
3474            label: None,
3475            input: None,
3476        });
3477        assert_eq!(ns_no_input.children().len(), 0);
3478
3479        let ns_with_input = LogicalOperator::NodeScan(NodeScanOp {
3480            variable: "n".into(),
3481            label: None,
3482            input: Some(leaf_empty()),
3483        });
3484        assert_eq!(ns_with_input.children().len(), 1);
3485
3486        let edge_scan_in = LogicalOperator::EdgeScan(EdgeScanOp {
3487            variable: "e".into(),
3488            edge_types: vec![],
3489            input: Some(leaf_empty()),
3490        });
3491        assert_eq!(edge_scan_in.children().len(), 1);
3492    }
3493
3494    #[test]
3495    fn children_of_two_child_operators() {
3496        let join = LogicalOperator::Join(JoinOp {
3497            left: leaf_empty(),
3498            right: leaf_empty(),
3499            join_type: JoinType::Cross,
3500            conditions: vec![],
3501        });
3502        assert_eq!(join.children().len(), 2);
3503
3504        let apply = LogicalOperator::Apply(ApplyOp {
3505            input: leaf_empty(),
3506            subplan: leaf_empty(),
3507            shared_variables: vec![],
3508            optional: false,
3509        });
3510        assert_eq!(apply.children().len(), 2);
3511
3512        let union = LogicalOperator::Union(UnionOp {
3513            inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
3514        });
3515        assert_eq!(union.children().len(), 3);
3516    }
3517
3518    #[test]
3519    fn children_of_modify_returns_where_clause() {
3520        let modify = LogicalOperator::Modify(ModifyOp {
3521            delete_templates: vec![],
3522            insert_templates: vec![],
3523            where_clause: leaf_empty(),
3524            graph: None,
3525        });
3526        assert_eq!(modify.children().len(), 1);
3527    }
3528
3529    // ==================== display_label ====================
3530
3531    #[test]
3532    fn display_label_spot_checks() {
3533        let ns = LogicalOperator::NodeScan(NodeScanOp {
3534            variable: "vincent".into(),
3535            label: Some("Person".into()),
3536            input: None,
3537        });
3538        assert_eq!(ns.display_label(), "vincent:Person");
3539
3540        let ns_no_label = LogicalOperator::NodeScan(NodeScanOp {
3541            variable: "mia".into(),
3542            label: None,
3543            input: None,
3544        });
3545        assert_eq!(ns_no_label.display_label(), "mia:*");
3546
3547        let edge_scan = LogicalOperator::EdgeScan(EdgeScanOp {
3548            variable: "e".into(),
3549            edge_types: vec!["KNOWS".into(), "LIKES".into()],
3550            input: None,
3551        });
3552        assert_eq!(edge_scan.display_label(), "e:KNOWS|LIKES");
3553
3554        let edge_scan_any = LogicalOperator::EdgeScan(EdgeScanOp {
3555            variable: "e".into(),
3556            edge_types: vec![],
3557            input: None,
3558        });
3559        assert_eq!(edge_scan_any.display_label(), "e:*");
3560
3561        let expand = LogicalOperator::Expand(ExpandOp {
3562            from_variable: "a".into(),
3563            to_variable: "b".into(),
3564            edge_variable: None,
3565            direction: ExpandDirection::Outgoing,
3566            edge_types: vec!["KNOWS".into()],
3567            min_hops: 1,
3568            max_hops: Some(1),
3569            input: leaf_node_scan("a"),
3570            path_alias: None,
3571            path_mode: PathMode::Walk,
3572        });
3573        assert_eq!(expand.display_label(), "(a)->[:KNOWS]->(b)");
3574
3575        let expand_in = LogicalOperator::Expand(ExpandOp {
3576            from_variable: "a".into(),
3577            to_variable: "b".into(),
3578            edge_variable: None,
3579            direction: ExpandDirection::Incoming,
3580            edge_types: vec![],
3581            min_hops: 1,
3582            max_hops: Some(1),
3583            input: leaf_node_scan("a"),
3584            path_alias: None,
3585            path_mode: PathMode::Walk,
3586        });
3587        assert_eq!(expand_in.display_label(), "(a)<-[:*]<-(b)");
3588
3589        let expand_both = LogicalOperator::Expand(ExpandOp {
3590            from_variable: "a".into(),
3591            to_variable: "b".into(),
3592            edge_variable: None,
3593            direction: ExpandDirection::Both,
3594            edge_types: vec![],
3595            min_hops: 1,
3596            max_hops: Some(1),
3597            input: leaf_node_scan("a"),
3598            path_alias: None,
3599            path_mode: PathMode::Walk,
3600        });
3601        assert_eq!(expand_both.display_label(), "(a)--[:*]--(b)");
3602    }
3603
3604    #[test]
3605    fn display_label_filter_pushdown_hints() {
3606        let make = |hint: Option<PushdownHint>| {
3607            LogicalOperator::Filter(FilterOp {
3608                predicate: var("x"),
3609                input: leaf_empty(),
3610                pushdown_hint: hint,
3611            })
3612        };
3613
3614        let f_none = make(None);
3615        let s = f_none.display_label();
3616        assert!(!s.contains('['));
3617
3618        let f_index = make(Some(PushdownHint::IndexLookup {
3619            property: "name".into(),
3620        }));
3621        assert!(f_index.display_label().contains("[index: name]"));
3622
3623        let f_range = make(Some(PushdownHint::RangeScan {
3624            property: "age".into(),
3625        }));
3626        assert!(f_range.display_label().contains("[range: age]"));
3627
3628        let f_label = make(Some(PushdownHint::LabelFirst));
3629        assert!(f_label.display_label().contains("[label-first]"));
3630    }
3631
3632    #[test]
3633    fn display_label_projection_join_sort_return() {
3634        let proj = LogicalOperator::Project(ProjectOp {
3635            projections: vec![
3636                Projection {
3637                    expression: var("n"),
3638                    alias: Some("person".into()),
3639                },
3640                Projection {
3641                    expression: LogicalExpression::Property {
3642                        variable: "n".into(),
3643                        property: "city".into(),
3644                    },
3645                    alias: None,
3646                },
3647            ],
3648            input: leaf_empty(),
3649            pass_through_input: false,
3650        });
3651        let s = proj.display_label();
3652        assert!(s.contains("person"));
3653        assert!(s.contains("n.city"));
3654
3655        let join = LogicalOperator::Join(JoinOp {
3656            left: leaf_empty(),
3657            right: leaf_empty(),
3658            join_type: JoinType::Cross,
3659            conditions: vec![],
3660        });
3661        assert_eq!(join.display_label(), "Cross");
3662
3663        let agg = LogicalOperator::Aggregate(AggregateOp {
3664            group_by: vec![var("city")],
3665            aggregates: vec![],
3666            input: leaf_empty(),
3667            having: None,
3668        });
3669        assert_eq!(agg.display_label(), "group: [city]");
3670
3671        let limit = LogicalOperator::Limit(LimitOp {
3672            count: CountExpr::Literal(10),
3673            input: leaf_empty(),
3674        });
3675        assert_eq!(limit.display_label(), "10");
3676
3677        let skip = LogicalOperator::Skip(SkipOp {
3678            count: CountExpr::Parameter("off".into()),
3679            input: leaf_empty(),
3680        });
3681        assert_eq!(skip.display_label(), "$off");
3682
3683        let sort = LogicalOperator::Sort(SortOp {
3684            keys: vec![
3685                SortKey {
3686                    expression: var("a"),
3687                    order: SortOrder::Ascending,
3688                    nulls: None,
3689                },
3690                SortKey {
3691                    expression: var("b"),
3692                    order: SortOrder::Descending,
3693                    nulls: None,
3694                },
3695            ],
3696            input: leaf_empty(),
3697        });
3698        let s = sort.display_label();
3699        assert!(s.contains("a ASC"));
3700        assert!(s.contains("b DESC"));
3701
3702        let distinct = LogicalOperator::Distinct(DistinctOp {
3703            input: leaf_empty(),
3704            columns: None,
3705        });
3706        assert_eq!(distinct.display_label(), "");
3707
3708        let ret = LogicalOperator::Return(ReturnOp {
3709            items: vec![
3710                ReturnItem {
3711                    expression: var("n"),
3712                    alias: Some("node".into()),
3713                },
3714                ReturnItem {
3715                    expression: var("m"),
3716                    alias: None,
3717                },
3718            ],
3719            distinct: true,
3720            input: leaf_empty(),
3721        });
3722        let s = ret.display_label();
3723        assert!(s.contains("node"));
3724        assert!(s.contains('m'));
3725    }
3726
3727    #[test]
3728    fn display_label_remaining_operators() {
3729        let union = LogicalOperator::Union(UnionOp {
3730            inputs: vec![*leaf_empty(), *leaf_empty()],
3731        });
3732        assert_eq!(union.display_label(), "2 branches");
3733
3734        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3735            inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
3736            conditions: vec![],
3737            shared_variables: vec![],
3738        });
3739        assert_eq!(mwj.display_label(), "3 inputs");
3740
3741        let lj = LogicalOperator::LeftJoin(LeftJoinOp {
3742            left: leaf_empty(),
3743            right: leaf_empty(),
3744            condition: None,
3745        });
3746        assert_eq!(lj.display_label(), "");
3747
3748        let aj = LogicalOperator::AntiJoin(AntiJoinOp {
3749            left: leaf_empty(),
3750            right: leaf_empty(),
3751        });
3752        assert_eq!(aj.display_label(), "");
3753
3754        let unwind = LogicalOperator::Unwind(UnwindOp {
3755            expression: var("xs"),
3756            variable: "item".into(),
3757            ordinality_var: None,
3758            offset_var: None,
3759            input: leaf_empty(),
3760        });
3761        assert_eq!(unwind.display_label(), "item");
3762
3763        let bind = LogicalOperator::Bind(BindOp {
3764            expression: var("x"),
3765            variable: "y".into(),
3766            input: leaf_empty(),
3767        });
3768        assert_eq!(bind.display_label(), "y");
3769
3770        let mapc = LogicalOperator::MapCollect(MapCollectOp {
3771            key_var: "k".into(),
3772            value_var: "v".into(),
3773            alias: "counts".into(),
3774            input: leaf_empty(),
3775        });
3776        assert_eq!(mapc.display_label(), "counts");
3777
3778        let sp = LogicalOperator::ShortestPath(ShortestPathOp {
3779            input: leaf_empty(),
3780            source_var: "a".into(),
3781            target_var: "b".into(),
3782            edge_types: vec![],
3783            direction: ExpandDirection::Outgoing,
3784            path_alias: "p".into(),
3785            all_paths: false,
3786        });
3787        assert_eq!(sp.display_label(), "a -> b");
3788
3789        let merge = LogicalOperator::Merge(MergeOp {
3790            variable: "django".into(),
3791            labels: vec![],
3792            match_properties: vec![],
3793            on_create: vec![],
3794            on_match: vec![],
3795            input: leaf_empty(),
3796        });
3797        assert_eq!(merge.display_label(), "django");
3798
3799        let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
3800            variable: "r".into(),
3801            source_variable: "a".into(),
3802            target_variable: "b".into(),
3803            edge_type: "KNOWS".into(),
3804            match_properties: vec![],
3805            on_create: vec![],
3806            on_match: vec![],
3807            input: leaf_empty(),
3808        });
3809        assert_eq!(merge_rel.display_label(), "r");
3810
3811        let cnode = LogicalOperator::CreateNode(CreateNodeOp {
3812            variable: "shosanna".into(),
3813            labels: vec!["Person".into(), "Hero".into()],
3814            properties: vec![],
3815            input: None,
3816        });
3817        assert_eq!(cnode.display_label(), "shosanna:Person:Hero");
3818
3819        let cedge_with = LogicalOperator::CreateEdge(CreateEdgeOp {
3820            variable: Some("r".into()),
3821            from_variable: "a".into(),
3822            to_variable: "b".into(),
3823            edge_type: "KNOWS".into(),
3824            properties: vec![],
3825            input: leaf_empty(),
3826        });
3827        assert_eq!(cedge_with.display_label(), "[r:KNOWS]");
3828
3829        let cedge_without = LogicalOperator::CreateEdge(CreateEdgeOp {
3830            variable: None,
3831            from_variable: "a".into(),
3832            to_variable: "b".into(),
3833            edge_type: "KNOWS".into(),
3834            properties: vec![],
3835            input: leaf_empty(),
3836        });
3837        assert_eq!(cedge_without.display_label(), "[?:KNOWS]");
3838
3839        let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
3840            variable: "hans".into(),
3841            detach: false,
3842            input: leaf_empty(),
3843        });
3844        assert_eq!(dnode.display_label(), "hans");
3845
3846        let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
3847            variable: "r".into(),
3848            input: leaf_empty(),
3849        });
3850        assert_eq!(dedge.display_label(), "r");
3851
3852        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
3853            variable: "beatrix".into(),
3854            properties: vec![],
3855            replace: false,
3856            is_edge: false,
3857            input: leaf_empty(),
3858        });
3859        assert_eq!(set_prop.display_label(), "beatrix");
3860
3861        let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
3862            variable: "n".into(),
3863            labels: vec!["A".into(), "B".into()],
3864            input: leaf_empty(),
3865        });
3866        assert_eq!(add_lbl.display_label(), "n:A:B");
3867
3868        let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
3869            variable: "n".into(),
3870            labels: vec!["A".into()],
3871            input: leaf_empty(),
3872        });
3873        assert_eq!(rm_lbl.display_label(), "n:A");
3874
3875        let call = LogicalOperator::CallProcedure(CallProcedureOp {
3876            name: vec!["grafeo".into(), "pagerank".into()],
3877            arguments: vec![],
3878            yield_items: None,
3879        });
3880        assert_eq!(call.display_label(), "grafeo.pagerank");
3881
3882        let load = LogicalOperator::LoadData(LoadDataOp {
3883            format: LoadDataFormat::Csv,
3884            with_headers: true,
3885            path: "data.csv".into(),
3886            variable: "r".into(),
3887            field_terminator: None,
3888        });
3889        assert_eq!(load.display_label(), "data.csv AS r");
3890
3891        let apply = LogicalOperator::Apply(ApplyOp {
3892            input: leaf_empty(),
3893            subplan: leaf_empty(),
3894            shared_variables: vec![],
3895            optional: false,
3896        });
3897        assert_eq!(apply.display_label(), "");
3898
3899        let vscan = LogicalOperator::VectorScan(VectorScanOp {
3900            variable: "m".into(),
3901            index_name: None,
3902            property: "embedding".into(),
3903            label: None,
3904            query_vector: LogicalExpression::Literal(Value::Null),
3905            k: Some(5),
3906            metric: None,
3907            min_similarity: None,
3908            max_distance: None,
3909            input: None,
3910        });
3911        assert_eq!(vscan.display_label(), "m");
3912
3913        let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
3914            input: leaf_empty(),
3915            left_vector_variable: None,
3916            left_property: None,
3917            query_vector: LogicalExpression::Literal(Value::Null),
3918            right_variable: "t".into(),
3919            right_property: "emb".into(),
3920            right_label: None,
3921            index_name: None,
3922            k: 3,
3923            metric: None,
3924            min_similarity: None,
3925            max_distance: None,
3926            score_variable: None,
3927        });
3928        assert_eq!(vjoin.display_label(), "t");
3929
3930        // Empty / catch-all branch.
3931        assert_eq!(LogicalOperator::Empty.display_label(), "");
3932    }
3933
3934    // ==================== explain_tree / fmt_tree ====================
3935
3936    #[test]
3937    fn explain_tree_covers_all_common_arms() {
3938        // Build a deeply nested tree that exercises many arms.
3939        let ns = LogicalOperator::NodeScan(NodeScanOp {
3940            variable: "n".into(),
3941            label: Some("Person".into()),
3942            input: Some(Box::new(LogicalOperator::Empty)),
3943        });
3944        let out = ns.explain_tree();
3945        assert!(out.contains("NodeScan (n:Person)"));
3946        assert!(out.contains("Empty"));
3947
3948        let ns_star = LogicalOperator::NodeScan(NodeScanOp {
3949            variable: "n".into(),
3950            label: None,
3951            input: None,
3952        });
3953        assert!(ns_star.explain_tree().contains("NodeScan (n:*)"));
3954
3955        let es = LogicalOperator::EdgeScan(EdgeScanOp {
3956            variable: "e".into(),
3957            edge_types: vec![],
3958            input: None,
3959        });
3960        assert!(es.explain_tree().contains("EdgeScan (e:*)"));
3961    }
3962
3963    #[test]
3964    fn explain_tree_expand_variants() {
3965        let mk = |min, max, dir| {
3966            LogicalOperator::Expand(ExpandOp {
3967                from_variable: "a".into(),
3968                to_variable: "b".into(),
3969                edge_variable: None,
3970                direction: dir,
3971                edge_types: vec!["KNOWS".into()],
3972                min_hops: min,
3973                max_hops: max,
3974                input: leaf_node_scan("a"),
3975                path_alias: None,
3976                path_mode: PathMode::Walk,
3977            })
3978            .explain_tree()
3979        };
3980
3981        let s = mk(1, Some(1), ExpandDirection::Outgoing);
3982        assert!(s.contains("(a)->[:KNOWS]->(b)"));
3983        let s = mk(2, Some(2), ExpandDirection::Incoming);
3984        assert!(s.contains("*2"));
3985        assert!(s.contains("<-"));
3986        let s = mk(1, Some(3), ExpandDirection::Both);
3987        assert!(s.contains("*1..3"));
3988        assert!(s.contains("--"));
3989        let s = mk(2, None, ExpandDirection::Outgoing);
3990        assert!(s.contains("*2.."));
3991    }
3992
3993    #[test]
3994    fn explain_tree_filter_with_all_hints() {
3995        let base = || {
3996            LogicalOperator::Filter(FilterOp {
3997                predicate: LogicalExpression::Binary {
3998                    left: Box::new(LogicalExpression::Property {
3999                        variable: "n".into(),
4000                        property: "age".into(),
4001                    }),
4002                    op: BinaryOp::Eq,
4003                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4004                },
4005                input: leaf_node_scan("n"),
4006                pushdown_hint: None,
4007            })
4008        };
4009        let mut f = base();
4010        if let LogicalOperator::Filter(ref mut op) = f {
4011            op.pushdown_hint = Some(PushdownHint::IndexLookup {
4012                property: "age".into(),
4013            });
4014        }
4015        assert!(f.explain_tree().contains("[index: age]"));
4016
4017        if let LogicalOperator::Filter(ref mut op) = f {
4018            op.pushdown_hint = Some(PushdownHint::RangeScan {
4019                property: "age".into(),
4020            });
4021        }
4022        assert!(f.explain_tree().contains("[range: age]"));
4023
4024        if let LogicalOperator::Filter(ref mut op) = f {
4025            op.pushdown_hint = Some(PushdownHint::LabelFirst);
4026        }
4027        assert!(f.explain_tree().contains("[label-first]"));
4028    }
4029
4030    #[test]
4031    fn explain_tree_projection_aggregate_sort_return() {
4032        let proj = LogicalOperator::Project(ProjectOp {
4033            projections: vec![
4034                Projection {
4035                    expression: var("n"),
4036                    alias: Some("who".into()),
4037                },
4038                Projection {
4039                    expression: var("m"),
4040                    alias: None,
4041                },
4042            ],
4043            input: leaf_empty(),
4044            pass_through_input: true,
4045        });
4046        let s = proj.explain_tree();
4047        assert!(s.contains("Project"));
4048        assert!(s.contains("n AS who"));
4049
4050        let agg = LogicalOperator::Aggregate(AggregateOp {
4051            group_by: vec![var("city")],
4052            aggregates: vec![
4053                AggregateExpr {
4054                    function: AggregateFunction::Count,
4055                    expression: None,
4056                    expression2: None,
4057                    distinct: false,
4058                    alias: Some("c".into()),
4059                    percentile: None,
4060                    separator: None,
4061                },
4062                AggregateExpr {
4063                    function: AggregateFunction::Sum,
4064                    expression: Some(var("x")),
4065                    expression2: None,
4066                    distinct: false,
4067                    alias: None,
4068                    percentile: None,
4069                    separator: None,
4070                },
4071            ],
4072            input: leaf_empty(),
4073            having: None,
4074        });
4075        let s = agg.explain_tree();
4076        assert!(s.contains("Aggregate"));
4077        assert!(s.contains("count(...) AS c"));
4078        assert!(s.contains("sum(...)"));
4079
4080        let sort = LogicalOperator::Sort(SortOp {
4081            keys: vec![SortKey {
4082                expression: var("age"),
4083                order: SortOrder::Descending,
4084                nulls: None,
4085            }],
4086            input: leaf_empty(),
4087        });
4088        assert!(sort.explain_tree().contains("age DESC"));
4089
4090        let ret_distinct = LogicalOperator::Return(ReturnOp {
4091            items: vec![ReturnItem {
4092                expression: var("n"),
4093                alias: Some("who".into()),
4094            }],
4095            distinct: true,
4096            input: leaf_empty(),
4097        });
4098        let s = ret_distinct.explain_tree();
4099        assert!(s.contains("Return DISTINCT"));
4100        assert!(s.contains("n AS who"));
4101
4102        let limit = LogicalOperator::Limit(LimitOp {
4103            count: CountExpr::Literal(5),
4104            input: leaf_empty(),
4105        });
4106        assert!(limit.explain_tree().contains("Limit (5)"));
4107
4108        let skip = LogicalOperator::Skip(SkipOp {
4109            count: CountExpr::Literal(2),
4110            input: leaf_empty(),
4111        });
4112        assert!(skip.explain_tree().contains("Skip (2)"));
4113
4114        let distinct = LogicalOperator::Distinct(DistinctOp {
4115            input: leaf_empty(),
4116            columns: None,
4117        });
4118        assert!(distinct.explain_tree().contains("Distinct"));
4119    }
4120
4121    #[test]
4122    fn explain_tree_joins_and_set_ops() {
4123        let join = LogicalOperator::Join(JoinOp {
4124            left: leaf_empty(),
4125            right: leaf_empty(),
4126            join_type: JoinType::Inner,
4127            conditions: vec![],
4128        });
4129        assert!(join.explain_tree().contains("Join (Inner)"));
4130
4131        let left_join_cond = LogicalOperator::LeftJoin(LeftJoinOp {
4132            left: leaf_empty(),
4133            right: leaf_empty(),
4134            condition: Some(var("x")),
4135        });
4136        assert!(
4137            left_join_cond
4138                .explain_tree()
4139                .contains("LeftJoin (condition:")
4140        );
4141
4142        let left_join_none = LogicalOperator::LeftJoin(LeftJoinOp {
4143            left: leaf_empty(),
4144            right: leaf_empty(),
4145            condition: None,
4146        });
4147        let s = left_join_none.explain_tree();
4148        assert!(s.contains("LeftJoin"));
4149        assert!(!s.contains("condition:"));
4150
4151        let anti = LogicalOperator::AntiJoin(AntiJoinOp {
4152            left: leaf_empty(),
4153            right: leaf_empty(),
4154        });
4155        assert!(anti.explain_tree().contains("AntiJoin"));
4156
4157        let union = LogicalOperator::Union(UnionOp {
4158            inputs: vec![*leaf_empty(), *leaf_empty()],
4159        });
4160        assert!(union.explain_tree().contains("Union (2 branches)"));
4161
4162        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
4163            inputs: vec![*leaf_empty(), *leaf_empty()],
4164            conditions: vec![],
4165            shared_variables: vec!["a".into(), "b".into()],
4166        });
4167        let s = mwj.explain_tree();
4168        assert!(s.contains("MultiWayJoin"));
4169        assert!(s.contains("shared: [a, b]"));
4170
4171        let except_all = LogicalOperator::Except(ExceptOp {
4172            left: leaf_empty(),
4173            right: leaf_empty(),
4174            all: true,
4175        });
4176        assert!(except_all.explain_tree().contains("Except ALL"));
4177        let except = LogicalOperator::Except(ExceptOp {
4178            left: leaf_empty(),
4179            right: leaf_empty(),
4180            all: false,
4181        });
4182        assert!(except.explain_tree().contains("Except\n"));
4183
4184        let inter_all = LogicalOperator::Intersect(IntersectOp {
4185            left: leaf_empty(),
4186            right: leaf_empty(),
4187            all: true,
4188        });
4189        assert!(inter_all.explain_tree().contains("Intersect ALL"));
4190        let inter = LogicalOperator::Intersect(IntersectOp {
4191            left: leaf_empty(),
4192            right: leaf_empty(),
4193            all: false,
4194        });
4195        assert!(inter.explain_tree().contains("Intersect\n"));
4196
4197        let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
4198            left: leaf_empty(),
4199            right: leaf_empty(),
4200        });
4201        assert!(otherwise.explain_tree().contains("Otherwise"));
4202    }
4203
4204    #[test]
4205    fn explain_tree_unwind_bind_mapcollect_apply_sp() {
4206        let unwind = LogicalOperator::Unwind(UnwindOp {
4207            expression: var("xs"),
4208            variable: "item".into(),
4209            ordinality_var: None,
4210            offset_var: None,
4211            input: leaf_empty(),
4212        });
4213        assert!(unwind.explain_tree().contains("Unwind (item)"));
4214
4215        let bind = LogicalOperator::Bind(BindOp {
4216            expression: var("x"),
4217            variable: "y".into(),
4218            input: leaf_empty(),
4219        });
4220        assert!(bind.explain_tree().contains("Bind (y)"));
4221
4222        let mapc = LogicalOperator::MapCollect(MapCollectOp {
4223            key_var: "k".into(),
4224            value_var: "v".into(),
4225            alias: "m".into(),
4226            input: leaf_empty(),
4227        });
4228        let s = mapc.explain_tree();
4229        assert!(s.contains("MapCollect"));
4230        assert!(s.contains("k -> v AS m"));
4231
4232        let apply = LogicalOperator::Apply(ApplyOp {
4233            input: leaf_empty(),
4234            subplan: leaf_empty(),
4235            shared_variables: vec!["a".into()],
4236            optional: true,
4237        });
4238        assert!(apply.explain_tree().contains("Apply"));
4239
4240        let sp = LogicalOperator::ShortestPath(ShortestPathOp {
4241            input: leaf_empty(),
4242            source_var: "a".into(),
4243            target_var: "b".into(),
4244            edge_types: vec![],
4245            direction: ExpandDirection::Outgoing,
4246            path_alias: "p".into(),
4247            all_paths: false,
4248        });
4249        assert!(sp.explain_tree().contains("ShortestPath (a -> b)"));
4250    }
4251
4252    #[test]
4253    fn explain_tree_mutations() {
4254        let merge = LogicalOperator::Merge(MergeOp {
4255            variable: "vincent".into(),
4256            labels: vec!["Person".into()],
4257            match_properties: vec![],
4258            on_create: vec![],
4259            on_match: vec![],
4260            input: leaf_empty(),
4261        });
4262        assert!(merge.explain_tree().contains("Merge (vincent)"));
4263
4264        let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
4265            variable: "r".into(),
4266            source_variable: "a".into(),
4267            target_variable: "b".into(),
4268            edge_type: "KNOWS".into(),
4269            match_properties: vec![],
4270            on_create: vec![],
4271            on_match: vec![],
4272            input: leaf_empty(),
4273        });
4274        assert!(merge_rel.explain_tree().contains("MergeRelationship (r)"));
4275
4276        let cnode = LogicalOperator::CreateNode(CreateNodeOp {
4277            variable: "mia".into(),
4278            labels: vec!["Person".into()],
4279            properties: vec![],
4280            input: Some(leaf_empty()),
4281        });
4282        let s = cnode.explain_tree();
4283        assert!(s.contains("CreateNode (mia:Person)"));
4284        assert!(s.contains("Empty"));
4285
4286        let cnode_no_input = LogicalOperator::CreateNode(CreateNodeOp {
4287            variable: "mia".into(),
4288            labels: vec![],
4289            properties: vec![],
4290            input: None,
4291        });
4292        assert!(cnode_no_input.explain_tree().contains("CreateNode (mia:)"));
4293
4294        let cedge = LogicalOperator::CreateEdge(CreateEdgeOp {
4295            variable: Some("r".into()),
4296            from_variable: "a".into(),
4297            to_variable: "b".into(),
4298            edge_type: "KNOWS".into(),
4299            properties: vec![],
4300            input: leaf_empty(),
4301        });
4302        assert!(
4303            cedge
4304                .explain_tree()
4305                .contains("CreateEdge (a)-[r:KNOWS]->(b)")
4306        );
4307
4308        let cedge_anon = LogicalOperator::CreateEdge(CreateEdgeOp {
4309            variable: None,
4310            from_variable: "a".into(),
4311            to_variable: "b".into(),
4312            edge_type: "KNOWS".into(),
4313            properties: vec![],
4314            input: leaf_empty(),
4315        });
4316        assert!(cedge_anon.explain_tree().contains("[?:KNOWS]"));
4317
4318        let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
4319            variable: "butch".into(),
4320            detach: true,
4321            input: leaf_empty(),
4322        });
4323        assert!(dnode.explain_tree().contains("DeleteNode (butch)"));
4324
4325        let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
4326            variable: "r".into(),
4327            input: leaf_empty(),
4328        });
4329        assert!(dedge.explain_tree().contains("DeleteEdge (r)"));
4330
4331        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
4332            variable: "n".into(),
4333            properties: vec![("name".into(), var("x")), ("age".into(), var("y"))],
4334            replace: false,
4335            is_edge: false,
4336            input: leaf_empty(),
4337        });
4338        let s = set_prop.explain_tree();
4339        assert!(s.contains("SetProperty"));
4340        assert!(s.contains("n.name"));
4341        assert!(s.contains("n.age"));
4342
4343        let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
4344            variable: "n".into(),
4345            labels: vec!["A".into()],
4346            input: leaf_empty(),
4347        });
4348        assert!(add_lbl.explain_tree().contains("AddLabel (n:A)"));
4349
4350        let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
4351            variable: "n".into(),
4352            labels: vec!["A".into(), "B".into()],
4353            input: leaf_empty(),
4354        });
4355        assert!(rm_lbl.explain_tree().contains("RemoveLabel (n:A:B)"));
4356    }
4357
4358    #[test]
4359    fn explain_tree_call_and_load_data() {
4360        let call = LogicalOperator::CallProcedure(CallProcedureOp {
4361            name: vec!["grafeo".into(), "pagerank".into()],
4362            arguments: vec![],
4363            yield_items: None,
4364        });
4365        assert!(
4366            call.explain_tree()
4367                .contains("CallProcedure (grafeo.pagerank)")
4368        );
4369
4370        let csv = LogicalOperator::LoadData(LoadDataOp {
4371            format: LoadDataFormat::Csv,
4372            with_headers: true,
4373            path: "data.csv".into(),
4374            variable: "row".into(),
4375            field_terminator: None,
4376        });
4377        let s = csv.explain_tree();
4378        assert!(s.contains("LoadCsv"));
4379        assert!(s.contains("WITH HEADERS"));
4380        assert!(s.contains("data.csv"));
4381        assert!(s.contains("AS row"));
4382
4383        let csv_no_hdr = LogicalOperator::LoadData(LoadDataOp {
4384            format: LoadDataFormat::Csv,
4385            with_headers: false,
4386            path: "data.csv".into(),
4387            variable: "row".into(),
4388            field_terminator: None,
4389        });
4390        assert!(!csv_no_hdr.explain_tree().contains("WITH HEADERS"));
4391
4392        let jsonl = LogicalOperator::LoadData(LoadDataOp {
4393            format: LoadDataFormat::Jsonl,
4394            with_headers: false,
4395            path: "data.jsonl".into(),
4396            variable: "r".into(),
4397            field_terminator: None,
4398        });
4399        assert!(jsonl.explain_tree().contains("LoadJsonl"));
4400
4401        let parquet = LogicalOperator::LoadData(LoadDataOp {
4402            format: LoadDataFormat::Parquet,
4403            with_headers: false,
4404            path: "data.parquet".into(),
4405            variable: "r".into(),
4406            field_terminator: None,
4407        });
4408        assert!(parquet.explain_tree().contains("LoadParquet"));
4409    }
4410
4411    #[test]
4412    fn explain_tree_triple_scan_and_fallback() {
4413        let ts = LogicalOperator::TripleScan(TripleScanOp {
4414            subject: TripleComponent::Variable("s".into()),
4415            predicate: TripleComponent::Iri("http://ex/p".into()),
4416            object: TripleComponent::Literal(Value::Int64(5)),
4417            graph: None,
4418            input: Some(leaf_empty()),
4419            dataset: None,
4420        });
4421        let s = ts.explain_tree();
4422        assert!(s.contains("TripleScan"));
4423        assert!(s.contains("?s"));
4424        assert!(s.contains("<http://ex/p>"));
4425        assert!(s.contains("Empty"));
4426
4427        let ts_no_input = LogicalOperator::TripleScan(TripleScanOp {
4428            subject: TripleComponent::Variable("s".into()),
4429            predicate: TripleComponent::Variable("p".into()),
4430            object: TripleComponent::Variable("o".into()),
4431            graph: None,
4432            input: None,
4433            dataset: None,
4434        });
4435        assert!(ts_no_input.explain_tree().contains("TripleScan"));
4436
4437        // Fallback arm for operators without a specific formatter.
4438        let graph_op = LogicalOperator::CreateGraph(CreateGraphOp {
4439            graph: "g".into(),
4440            silent: false,
4441        });
4442        let out = graph_op.explain_tree();
4443        assert!(!out.is_empty());
4444    }
4445
4446    // ==================== fmt_expr helper ====================
4447
4448    #[test]
4449    fn fmt_expr_covers_common_variants() {
4450        let v = var("n");
4451        assert_eq!(fmt_expr(&v), "n");
4452
4453        let p = LogicalExpression::Property {
4454            variable: "n".into(),
4455            property: "age".into(),
4456        };
4457        assert_eq!(fmt_expr(&p), "n.age");
4458
4459        let lit = LogicalExpression::Literal(Value::Int64(42));
4460        assert_eq!(fmt_expr(&lit), "42");
4461
4462        let bin = LogicalExpression::Binary {
4463            left: Box::new(var("a")),
4464            op: BinaryOp::Eq,
4465            right: Box::new(LogicalExpression::Literal(Value::Int64(1))),
4466        };
4467        let s = fmt_expr(&bin);
4468        assert!(s.contains("Eq"));
4469        assert!(s.contains('a'));
4470
4471        let un = LogicalExpression::Unary {
4472            op: UnaryOp::Not,
4473            operand: Box::new(var("a")),
4474        };
4475        let s = fmt_expr(&un);
4476        assert!(s.contains("Not"));
4477
4478        let fc = LogicalExpression::FunctionCall {
4479            name: "toLower".into(),
4480            args: vec![var("name")],
4481            distinct: false,
4482        };
4483        assert_eq!(fmt_expr(&fc), "toLower(name)");
4484
4485        // Fallback arm: non-common variant hits the `_ => format!("{expr:?}")` path.
4486        let list = LogicalExpression::List(vec![var("a")]);
4487        let out = fmt_expr(&list);
4488        assert!(out.contains("List") || out.contains('['));
4489    }
4490
4491    // ==================== fmt_triple_component helper ====================
4492
4493    #[test]
4494    fn fmt_triple_component_variants() {
4495        assert_eq!(
4496            fmt_triple_component(&TripleComponent::Variable("s".into())),
4497            "?s"
4498        );
4499        assert_eq!(
4500            fmt_triple_component(&TripleComponent::Iri("http://ex/p".into())),
4501            "<http://ex/p>"
4502        );
4503        assert!(fmt_triple_component(&TripleComponent::Literal(Value::Int64(10))).contains("10"));
4504        assert_eq!(
4505            fmt_triple_component(&TripleComponent::LangLiteral {
4506                value: "hello".into(),
4507                lang: "en".into(),
4508            }),
4509            "\"hello\"@en"
4510        );
4511        assert_eq!(
4512            fmt_triple_component(&TripleComponent::BlankNode("b0".into())),
4513            "_:b0"
4514        );
4515    }
4516
4517    // ==================== TripleComponent::as_variable ====================
4518
4519    #[test]
4520    fn triple_component_as_variable() {
4521        assert_eq!(
4522            TripleComponent::Variable("s".into()).as_variable(),
4523            Some("s")
4524        );
4525        assert_eq!(
4526            TripleComponent::Iri("http://ex/p".into()).as_variable(),
4527            None
4528        );
4529        assert_eq!(
4530            TripleComponent::Literal(Value::Int64(1)).as_variable(),
4531            None
4532        );
4533        assert_eq!(TripleComponent::BlankNode("b".into()).as_variable(), None);
4534        assert_eq!(
4535            TripleComponent::LangLiteral {
4536                value: "v".into(),
4537                lang: "en".into(),
4538            }
4539            .as_variable(),
4540            None
4541        );
4542    }
4543}