Skip to main content

grafeo_engine/query/
plan.rs

1//! Logical query plan representation.
2//!
3//! The logical plan is the intermediate representation between parsed queries
4//! and physical execution. Both GQL and Cypher queries are translated to this
5//! common representation.
6
7use std::collections::HashMap;
8use std::fmt;
9
10use grafeo_common::types::Value;
11
12/// A count expression for SKIP/LIMIT: either a resolved literal or an unresolved parameter.
13#[derive(Debug, Clone, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum CountExpr {
16    /// A resolved integer count.
17    Literal(usize),
18    /// An unresolved parameter reference (e.g., `$limit`).
19    Parameter(String),
20}
21
22impl CountExpr {
23    /// Returns the resolved count, or panics if still a parameter reference.
24    ///
25    /// Call this only after parameter substitution has run.
26    ///
27    /// # Panics
28    ///
29    /// Panics if the expression is an unresolved `Parameter` reference.
30    pub fn value(&self) -> usize {
31        match self {
32            Self::Literal(n) => *n,
33            Self::Parameter(name) => panic!("Unresolved parameter: ${name}"),
34        }
35    }
36
37    /// Returns the resolved count, or an error if still a parameter reference.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error string if the expression is an unresolved `Parameter`.
42    pub fn try_value(&self) -> Result<usize, String> {
43        match self {
44            Self::Literal(n) => Ok(*n),
45            Self::Parameter(name) => Err(format!("Unresolved SKIP/LIMIT parameter: ${name}")),
46        }
47    }
48
49    /// Returns the count as f64 for cardinality estimation (defaults to 10 for unresolved params).
50    pub fn estimate(&self) -> f64 {
51        match self {
52            Self::Literal(n) => *n as f64,
53            Self::Parameter(_) => 10.0, // reasonable default for unresolved params
54        }
55    }
56}
57
58impl fmt::Display for CountExpr {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        match self {
61            Self::Literal(n) => write!(f, "{n}"),
62            Self::Parameter(name) => write!(f, "${name}"),
63        }
64    }
65}
66
67impl From<usize> for CountExpr {
68    fn from(n: usize) -> Self {
69        Self::Literal(n)
70    }
71}
72
73impl PartialEq<usize> for CountExpr {
74    fn eq(&self, other: &usize) -> bool {
75        matches!(self, Self::Literal(n) if n == other)
76    }
77}
78
79/// A logical query plan.
80#[derive(Debug, Clone)]
81pub struct LogicalPlan {
82    /// The root operator of the plan.
83    pub root: LogicalOperator,
84    /// When true, return the plan tree as text instead of executing.
85    pub explain: bool,
86    /// When true, execute the query and return per-operator runtime metrics.
87    pub profile: bool,
88    /// Default parameter values from variable declarations (e.g., GraphQL
89    /// `query($limit: Int = 2)`). The processor merges these with caller-supplied
90    /// params, giving caller values higher precedence.
91    pub default_params: HashMap<String, Value>,
92}
93
94impl LogicalPlan {
95    /// Creates a new logical plan with the given root operator.
96    pub fn new(root: LogicalOperator) -> Self {
97        Self {
98            root,
99            explain: false,
100            profile: false,
101            default_params: HashMap::new(),
102        }
103    }
104
105    /// Creates an EXPLAIN plan that returns the plan tree without executing.
106    pub fn explain(root: LogicalOperator) -> Self {
107        Self {
108            root,
109            explain: true,
110            profile: false,
111            default_params: HashMap::new(),
112        }
113    }
114
115    /// Creates a PROFILE plan that executes and returns per-operator metrics.
116    pub fn profile(root: LogicalOperator) -> Self {
117        Self {
118            root,
119            explain: false,
120            profile: true,
121            default_params: HashMap::new(),
122        }
123    }
124}
125
126/// A logical operator in the query plan.
127#[derive(Debug, Clone)]
128#[non_exhaustive]
129pub enum LogicalOperator {
130    /// Scan all nodes, optionally filtered by label.
131    NodeScan(NodeScanOp),
132
133    /// Scan all edges, optionally filtered by type.
134    EdgeScan(EdgeScanOp),
135
136    /// Expand from nodes to neighbors via edges.
137    Expand(ExpandOp),
138
139    /// Filter rows based on a predicate.
140    Filter(FilterOp),
141
142    /// Project specific columns.
143    Project(ProjectOp),
144
145    /// Join two inputs.
146    Join(JoinOp),
147
148    /// Aggregate with grouping.
149    Aggregate(AggregateOp),
150
151    /// Limit the number of results.
152    Limit(LimitOp),
153
154    /// Skip a number of results.
155    Skip(SkipOp),
156
157    /// Sort results.
158    Sort(SortOp),
159
160    /// Remove duplicate results.
161    Distinct(DistinctOp),
162
163    /// Create a new node.
164    CreateNode(CreateNodeOp),
165
166    /// Create a new edge.
167    CreateEdge(CreateEdgeOp),
168
169    /// Delete a node.
170    DeleteNode(DeleteNodeOp),
171
172    /// Delete an edge.
173    DeleteEdge(DeleteEdgeOp),
174
175    /// Set properties on a node or edge.
176    SetProperty(SetPropertyOp),
177
178    /// Add labels to a node.
179    AddLabel(AddLabelOp),
180
181    /// Remove labels from a node.
182    RemoveLabel(RemoveLabelOp),
183
184    /// Return results (terminal operator).
185    Return(ReturnOp),
186
187    /// Empty result set.
188    Empty,
189
190    // ==================== RDF/SPARQL Operators ====================
191    /// Scan RDF triples matching a pattern.
192    TripleScan(TripleScanOp),
193
194    /// Union of multiple result sets.
195    Union(UnionOp),
196
197    /// Left outer join for OPTIONAL patterns.
198    LeftJoin(LeftJoinOp),
199
200    /// Anti-join for MINUS patterns.
201    AntiJoin(AntiJoinOp),
202
203    /// SPARQL CONSTRUCT: evaluate WHERE, substitute bindings into template,
204    /// output (subject, predicate, object) columns.
205    Construct(ConstructOp),
206
207    /// Bind a variable to an expression.
208    Bind(BindOp),
209
210    /// Unwind a list into individual rows.
211    Unwind(UnwindOp),
212
213    /// Collect grouped key-value rows into a single Map value.
214    /// Used for Gremlin `groupCount()` semantics.
215    MapCollect(MapCollectOp),
216
217    /// Merge a node pattern (match or create).
218    Merge(MergeOp),
219
220    /// Merge a relationship pattern (match or create).
221    MergeRelationship(MergeRelationshipOp),
222
223    /// Find shortest path between nodes.
224    ShortestPath(ShortestPathOp),
225
226    // ==================== SPARQL Update Operators ====================
227    /// Insert RDF triples.
228    InsertTriple(InsertTripleOp),
229
230    /// Delete RDF triples.
231    DeleteTriple(DeleteTripleOp),
232
233    /// SPARQL MODIFY operation (DELETE/INSERT WHERE).
234    /// Evaluates WHERE once, applies DELETE templates, then INSERT templates.
235    Modify(ModifyOp),
236
237    /// Clear a graph (remove all triples).
238    ClearGraph(ClearGraphOp),
239
240    /// Create a new named graph.
241    CreateGraph(CreateGraphOp),
242
243    /// Drop (remove) a named graph.
244    DropGraph(DropGraphOp),
245
246    /// Load data from a URL into a graph.
247    LoadGraph(LoadGraphOp),
248
249    /// Copy triples from one graph to another.
250    CopyGraph(CopyGraphOp),
251
252    /// Move triples from one graph to another.
253    MoveGraph(MoveGraphOp),
254
255    /// Add (merge) triples from one graph to another.
256    AddGraph(AddGraphOp),
257
258    /// Per-row aggregation over a list-valued column (horizontal aggregation, GE09).
259    HorizontalAggregate(HorizontalAggregateOp),
260
261    // ==================== Vector Search Operators ====================
262    /// Scan using vector similarity search.
263    VectorScan(VectorScanOp),
264
265    /// Join graph patterns with vector similarity search.
266    ///
267    /// Computes vector distances between entities from the left input and
268    /// a query vector, then joins with similarity scores. Useful for:
269    /// - Filtering graph traversal results by vector similarity
270    /// - Computing aggregated embeddings and finding similar entities
271    /// - Combining multiple vector sources with graph structure
272    VectorJoin(VectorJoinOp),
273
274    /// Scan using full-text search with BM25 scoring.
275    TextScan(TextScanOp),
276
277    // ==================== Set Operations ====================
278    /// Set difference: rows in left that are not in right.
279    Except(ExceptOp),
280
281    /// Set intersection: rows common to all inputs.
282    Intersect(IntersectOp),
283
284    /// Fallback: use left result if non-empty, otherwise right.
285    Otherwise(OtherwiseOp),
286
287    // ==================== Correlated Subquery ====================
288    /// Apply (lateral join): evaluate a subplan per input row.
289    Apply(ApplyOp),
290
291    /// Parameter scan: leaf of a correlated inner plan that receives values
292    /// from the outer Apply operator. The column names match `ApplyOp.shared_variables`.
293    ParameterScan(ParameterScanOp),
294
295    // ==================== DDL Operators ====================
296    /// Define a property graph schema (SQL/PGQ DDL).
297    CreatePropertyGraph(CreatePropertyGraphOp),
298
299    // ==================== Multi-Way Join ====================
300    /// Multi-way join using worst-case optimal join (leapfrog).
301    /// Used for cyclic patterns (triangles, cliques) with 3+ relations.
302    MultiWayJoin(MultiWayJoinOp),
303
304    // ==================== Procedure Call Operators ====================
305    /// Invoke a stored procedure (CALL ... YIELD).
306    CallProcedure(CallProcedureOp),
307
308    // ==================== Data Import Operators ====================
309    /// Load data from a file (CSV, JSONL, or Parquet), producing one row per record.
310    LoadData(LoadDataOp),
311}
312
313impl LogicalOperator {
314    /// Returns `true` if this operator or any of its children perform mutations.
315    #[must_use]
316    pub fn has_mutations(&self) -> bool {
317        match self {
318            // Direct mutation operators
319            Self::CreateNode(_)
320            | Self::CreateEdge(_)
321            | Self::DeleteNode(_)
322            | Self::DeleteEdge(_)
323            | Self::SetProperty(_)
324            | Self::AddLabel(_)
325            | Self::RemoveLabel(_)
326            | Self::Merge(_)
327            | Self::MergeRelationship(_)
328            | Self::InsertTriple(_)
329            | Self::DeleteTriple(_)
330            | Self::Modify(_)
331            | Self::ClearGraph(_)
332            | Self::CreateGraph(_)
333            | Self::DropGraph(_)
334            | Self::LoadGraph(_)
335            | Self::CopyGraph(_)
336            | Self::MoveGraph(_)
337            | Self::AddGraph(_)
338            | Self::CreatePropertyGraph(_) => true,
339
340            // Operators with an `input` child
341            Self::Filter(op) => op.input.has_mutations(),
342            Self::Project(op) => op.input.has_mutations(),
343            Self::Aggregate(op) => op.input.has_mutations(),
344            Self::Limit(op) => op.input.has_mutations(),
345            Self::Skip(op) => op.input.has_mutations(),
346            Self::Sort(op) => op.input.has_mutations(),
347            Self::Distinct(op) => op.input.has_mutations(),
348            Self::Unwind(op) => op.input.has_mutations(),
349            Self::Bind(op) => op.input.has_mutations(),
350            Self::MapCollect(op) => op.input.has_mutations(),
351            Self::Return(op) => op.input.has_mutations(),
352            Self::HorizontalAggregate(op) => op.input.has_mutations(),
353            Self::VectorScan(op) => op.input.as_deref().is_some_and(Self::has_mutations),
354            Self::VectorJoin(op) => op.input.has_mutations(),
355            Self::TextScan(_) => false,
356
357            // Operators with two children
358            Self::Join(op) => op.left.has_mutations() || op.right.has_mutations(),
359            Self::LeftJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
360            Self::AntiJoin(op) => op.left.has_mutations() || op.right.has_mutations(),
361            Self::Except(op) => op.left.has_mutations() || op.right.has_mutations(),
362            Self::Intersect(op) => op.left.has_mutations() || op.right.has_mutations(),
363            Self::Otherwise(op) => op.left.has_mutations() || op.right.has_mutations(),
364            Self::Union(op) => op.inputs.iter().any(|i| i.has_mutations()),
365            Self::MultiWayJoin(op) => op.inputs.iter().any(|i| i.has_mutations()),
366            Self::Apply(op) => op.input.has_mutations() || op.subplan.has_mutations(),
367
368            // Leaf operators (read-only)
369            Self::NodeScan(_)
370            | Self::EdgeScan(_)
371            | Self::Expand(_)
372            | Self::TripleScan(_)
373            | Self::ShortestPath(_)
374            | Self::Empty
375            | Self::ParameterScan(_)
376            | Self::CallProcedure(_)
377            | Self::LoadData(_) => false,
378            Self::Construct(op) => op.input.has_mutations(),
379        }
380    }
381
382    /// Returns references to the child operators.
383    ///
384    /// Used by [`crate::query::profile::build_profile_tree`] to walk the logical
385    /// plan tree in post-order, matching operators to profiling entries.
386    #[must_use]
387    pub fn children(&self) -> Vec<&LogicalOperator> {
388        match self {
389            // Optional single input
390            Self::NodeScan(op) => op.input.as_deref().into_iter().collect(),
391            Self::EdgeScan(op) => op.input.as_deref().into_iter().collect(),
392            Self::TripleScan(op) => op.input.as_deref().into_iter().collect(),
393            Self::VectorScan(op) => op.input.as_deref().into_iter().collect(),
394            Self::CreateNode(op) => op.input.as_deref().into_iter().collect(),
395            Self::InsertTriple(op) => op.input.as_deref().into_iter().collect(),
396            Self::DeleteTriple(op) => op.input.as_deref().into_iter().collect(),
397
398            // Single required input
399            Self::Expand(op) => vec![&*op.input],
400            Self::Filter(op) => vec![&*op.input],
401            Self::Project(op) => vec![&*op.input],
402            Self::Aggregate(op) => vec![&*op.input],
403            Self::Limit(op) => vec![&*op.input],
404            Self::Skip(op) => vec![&*op.input],
405            Self::Sort(op) => vec![&*op.input],
406            Self::Distinct(op) => vec![&*op.input],
407            Self::Return(op) => vec![&*op.input],
408            Self::Unwind(op) => vec![&*op.input],
409            Self::Bind(op) => vec![&*op.input],
410            Self::Construct(op) => vec![&*op.input],
411            Self::MapCollect(op) => vec![&*op.input],
412            Self::ShortestPath(op) => vec![&*op.input],
413            Self::Merge(op) => vec![&*op.input],
414            Self::MergeRelationship(op) => vec![&*op.input],
415            Self::CreateEdge(op) => vec![&*op.input],
416            Self::DeleteNode(op) => vec![&*op.input],
417            Self::DeleteEdge(op) => vec![&*op.input],
418            Self::SetProperty(op) => vec![&*op.input],
419            Self::AddLabel(op) => vec![&*op.input],
420            Self::RemoveLabel(op) => vec![&*op.input],
421            Self::HorizontalAggregate(op) => vec![&*op.input],
422            Self::VectorJoin(op) => vec![&*op.input],
423            Self::Modify(op) => vec![&*op.where_clause],
424
425            // Two children (left + right)
426            Self::Join(op) => vec![&*op.left, &*op.right],
427            Self::LeftJoin(op) => vec![&*op.left, &*op.right],
428            Self::AntiJoin(op) => vec![&*op.left, &*op.right],
429            Self::Except(op) => vec![&*op.left, &*op.right],
430            Self::Intersect(op) => vec![&*op.left, &*op.right],
431            Self::Otherwise(op) => vec![&*op.left, &*op.right],
432
433            // Two children (input + subplan)
434            Self::Apply(op) => vec![&*op.input, &*op.subplan],
435
436            // Vec children
437            Self::Union(op) => op.inputs.iter().collect(),
438            Self::MultiWayJoin(op) => op.inputs.iter().collect(),
439
440            // Leaf operators
441            Self::Empty
442            | Self::ParameterScan(_)
443            | Self::CallProcedure(_)
444            | Self::ClearGraph(_)
445            | Self::CreateGraph(_)
446            | Self::DropGraph(_)
447            | Self::LoadGraph(_)
448            | Self::CopyGraph(_)
449            | Self::MoveGraph(_)
450            | Self::AddGraph(_)
451            | Self::CreatePropertyGraph(_)
452            | Self::LoadData(_)
453            | Self::TextScan(_) => vec![],
454        }
455    }
456
457    /// Returns a compact display label for this operator, used in PROFILE output.
458    #[must_use]
459    pub fn display_label(&self) -> String {
460        match self {
461            Self::NodeScan(op) => {
462                let label = op.label.as_deref().unwrap_or("*");
463                format!("{}:{}", op.variable, label)
464            }
465            Self::EdgeScan(op) => {
466                let types = if op.edge_types.is_empty() {
467                    "*".to_string()
468                } else {
469                    op.edge_types.join("|")
470                };
471                format!("{}:{}", op.variable, types)
472            }
473            Self::Expand(op) => {
474                let types = if op.edge_types.is_empty() {
475                    "*".to_string()
476                } else {
477                    op.edge_types.join("|")
478                };
479                let dir = match op.direction {
480                    ExpandDirection::Outgoing => "->",
481                    ExpandDirection::Incoming => "<-",
482                    ExpandDirection::Both => "--",
483                };
484                format!(
485                    "({from}){dir}[:{types}]{dir}({to})",
486                    from = op.from_variable,
487                    to = op.to_variable,
488                )
489            }
490            Self::Filter(op) => {
491                let hint = match &op.pushdown_hint {
492                    Some(PushdownHint::IndexLookup { property }) => {
493                        format!(" [index: {property}]")
494                    }
495                    Some(PushdownHint::RangeScan { property }) => {
496                        format!(" [range: {property}]")
497                    }
498                    Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
499                    None => String::new(),
500                };
501                format!("{}{hint}", fmt_expr(&op.predicate))
502            }
503            Self::Project(op) => {
504                let cols: Vec<String> = op
505                    .projections
506                    .iter()
507                    .map(|p| match &p.alias {
508                        Some(alias) => alias.clone(),
509                        None => fmt_expr(&p.expression),
510                    })
511                    .collect();
512                cols.join(", ")
513            }
514            Self::Join(op) => format!("{:?}", op.join_type),
515            Self::Aggregate(op) => {
516                let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
517                format!("group: [{}]", groups.join(", "))
518            }
519            Self::Limit(op) => format!("{}", op.count),
520            Self::Skip(op) => format!("{}", op.count),
521            Self::Sort(op) => {
522                let keys: Vec<String> = op
523                    .keys
524                    .iter()
525                    .map(|k| {
526                        let dir = match k.order {
527                            SortOrder::Ascending => "ASC",
528                            SortOrder::Descending => "DESC",
529                        };
530                        format!("{} {dir}", fmt_expr(&k.expression))
531                    })
532                    .collect();
533                keys.join(", ")
534            }
535            Self::Distinct(_) => String::new(),
536            Self::Return(op) => {
537                let items: Vec<String> = op
538                    .items
539                    .iter()
540                    .map(|item| match &item.alias {
541                        Some(alias) => alias.clone(),
542                        None => fmt_expr(&item.expression),
543                    })
544                    .collect();
545                items.join(", ")
546            }
547            Self::Union(op) => format!("{} branches", op.inputs.len()),
548            Self::MultiWayJoin(op) => {
549                format!("{} inputs", op.inputs.len())
550            }
551            Self::LeftJoin(_) => String::new(),
552            Self::AntiJoin(_) => String::new(),
553            Self::Unwind(op) => op.variable.clone(),
554            Self::Bind(op) => op.variable.clone(),
555            Self::MapCollect(op) => op.alias.clone(),
556            Self::ShortestPath(op) => {
557                format!("{} -> {}", op.source_var, op.target_var)
558            }
559            Self::Merge(op) => op.variable.clone(),
560            Self::MergeRelationship(op) => op.variable.clone(),
561            Self::CreateNode(op) => {
562                let labels = op.labels.join(":");
563                format!("{}:{labels}", op.variable)
564            }
565            Self::CreateEdge(op) => {
566                format!(
567                    "[{}:{}]",
568                    op.variable.as_deref().unwrap_or("?"),
569                    op.edge_type
570                )
571            }
572            Self::DeleteNode(op) => op.variable.clone(),
573            Self::DeleteEdge(op) => op.variable.clone(),
574            Self::SetProperty(op) => op.variable.clone(),
575            Self::AddLabel(op) => {
576                let labels = op.labels.join(":");
577                format!("{}:{labels}", op.variable)
578            }
579            Self::RemoveLabel(op) => {
580                let labels = op.labels.join(":");
581                format!("{}:{labels}", op.variable)
582            }
583            Self::CallProcedure(op) => op.name.join("."),
584            Self::LoadData(op) => format!("{} AS {}", op.path, op.variable),
585            Self::Apply(_) => String::new(),
586            Self::VectorScan(op) => op.variable.clone(),
587            Self::VectorJoin(op) => op.right_variable.clone(),
588            Self::TextScan(op) => format!("{}:{}", op.variable, op.label),
589            _ => String::new(),
590        }
591    }
592}
593
594impl LogicalOperator {
595    /// Formats this operator tree as a human-readable plan for EXPLAIN output.
596    pub fn explain_tree(&self) -> String {
597        let mut output = String::new();
598        self.fmt_tree(&mut output, 0);
599        output
600    }
601
602    fn fmt_tree(&self, out: &mut String, depth: usize) {
603        use std::fmt::Write;
604
605        let indent = "  ".repeat(depth);
606        match self {
607            Self::NodeScan(op) => {
608                let label = op.label.as_deref().unwrap_or("*");
609                let _ = writeln!(out, "{indent}NodeScan ({var}:{label})", var = op.variable);
610                if let Some(input) = &op.input {
611                    input.fmt_tree(out, depth + 1);
612                }
613            }
614            Self::EdgeScan(op) => {
615                let types = if op.edge_types.is_empty() {
616                    "*".to_string()
617                } else {
618                    op.edge_types.join("|")
619                };
620                let _ = writeln!(out, "{indent}EdgeScan ({var}:{types})", var = op.variable);
621            }
622            Self::Expand(op) => {
623                let types = if op.edge_types.is_empty() {
624                    "*".to_string()
625                } else {
626                    op.edge_types.join("|")
627                };
628                let dir = match op.direction {
629                    ExpandDirection::Outgoing => "->",
630                    ExpandDirection::Incoming => "<-",
631                    ExpandDirection::Both => "--",
632                };
633                let hops = match (op.min_hops, op.max_hops) {
634                    (1, Some(1)) => String::new(),
635                    (min, Some(max)) if min == max => format!("*{min}"),
636                    (min, Some(max)) => format!("*{min}..{max}"),
637                    (min, None) => format!("*{min}.."),
638                };
639                let _ = writeln!(
640                    out,
641                    "{indent}Expand ({from}){dir}[:{types}{hops}]{dir}({to})",
642                    from = op.from_variable,
643                    to = op.to_variable,
644                );
645                op.input.fmt_tree(out, depth + 1);
646            }
647            Self::Filter(op) => {
648                let hint = match &op.pushdown_hint {
649                    Some(PushdownHint::IndexLookup { property }) => {
650                        format!(" [index: {property}]")
651                    }
652                    Some(PushdownHint::RangeScan { property }) => {
653                        format!(" [range: {property}]")
654                    }
655                    Some(PushdownHint::LabelFirst) => " [label-first]".to_string(),
656                    None => String::new(),
657                };
658                let _ = writeln!(
659                    out,
660                    "{indent}Filter ({expr}){hint}",
661                    expr = fmt_expr(&op.predicate)
662                );
663                op.input.fmt_tree(out, depth + 1);
664            }
665            Self::Project(op) => {
666                let cols: Vec<String> = op
667                    .projections
668                    .iter()
669                    .map(|p| {
670                        let expr = fmt_expr(&p.expression);
671                        match &p.alias {
672                            Some(alias) => format!("{expr} AS {alias}"),
673                            None => expr,
674                        }
675                    })
676                    .collect();
677                let _ = writeln!(out, "{indent}Project ({cols})", cols = cols.join(", "));
678                op.input.fmt_tree(out, depth + 1);
679            }
680            Self::Join(op) => {
681                let _ = writeln!(out, "{indent}Join ({ty:?})", ty = op.join_type);
682                op.left.fmt_tree(out, depth + 1);
683                op.right.fmt_tree(out, depth + 1);
684            }
685            Self::Aggregate(op) => {
686                let groups: Vec<String> = op.group_by.iter().map(fmt_expr).collect();
687                let aggs: Vec<String> = op
688                    .aggregates
689                    .iter()
690                    .map(|a| {
691                        let func = format!("{:?}", a.function).to_lowercase();
692                        match &a.alias {
693                            Some(alias) => format!("{func}(...) AS {alias}"),
694                            None => format!("{func}(...)"),
695                        }
696                    })
697                    .collect();
698                let _ = writeln!(
699                    out,
700                    "{indent}Aggregate (group: [{groups}], aggs: [{aggs}])",
701                    groups = groups.join(", "),
702                    aggs = aggs.join(", "),
703                );
704                op.input.fmt_tree(out, depth + 1);
705            }
706            Self::Limit(op) => {
707                let _ = writeln!(out, "{indent}Limit ({})", op.count);
708                op.input.fmt_tree(out, depth + 1);
709            }
710            Self::Skip(op) => {
711                let _ = writeln!(out, "{indent}Skip ({})", op.count);
712                op.input.fmt_tree(out, depth + 1);
713            }
714            Self::Sort(op) => {
715                let keys: Vec<String> = op
716                    .keys
717                    .iter()
718                    .map(|k| {
719                        let dir = match k.order {
720                            SortOrder::Ascending => "ASC",
721                            SortOrder::Descending => "DESC",
722                        };
723                        format!("{} {dir}", fmt_expr(&k.expression))
724                    })
725                    .collect();
726                let _ = writeln!(out, "{indent}Sort ({keys})", keys = keys.join(", "));
727                op.input.fmt_tree(out, depth + 1);
728            }
729            Self::Distinct(op) => {
730                let _ = writeln!(out, "{indent}Distinct");
731                op.input.fmt_tree(out, depth + 1);
732            }
733            Self::Return(op) => {
734                let items: Vec<String> = op
735                    .items
736                    .iter()
737                    .map(|item| {
738                        let expr = fmt_expr(&item.expression);
739                        match &item.alias {
740                            Some(alias) => format!("{expr} AS {alias}"),
741                            None => expr,
742                        }
743                    })
744                    .collect();
745                let distinct = if op.distinct { " DISTINCT" } else { "" };
746                let _ = writeln!(
747                    out,
748                    "{indent}Return{distinct} ({items})",
749                    items = items.join(", ")
750                );
751                op.input.fmt_tree(out, depth + 1);
752            }
753            Self::Union(op) => {
754                let _ = writeln!(out, "{indent}Union ({n} branches)", n = op.inputs.len());
755                for input in &op.inputs {
756                    input.fmt_tree(out, depth + 1);
757                }
758            }
759            Self::MultiWayJoin(op) => {
760                let vars = op.shared_variables.join(", ");
761                let _ = writeln!(
762                    out,
763                    "{indent}MultiWayJoin ({n} inputs, shared: [{vars}])",
764                    n = op.inputs.len()
765                );
766                for input in &op.inputs {
767                    input.fmt_tree(out, depth + 1);
768                }
769            }
770            Self::LeftJoin(op) => {
771                if let Some(cond) = &op.condition {
772                    let _ = writeln!(out, "{indent}LeftJoin (condition: {cond:?})");
773                } else {
774                    let _ = writeln!(out, "{indent}LeftJoin");
775                }
776                op.left.fmt_tree(out, depth + 1);
777                op.right.fmt_tree(out, depth + 1);
778            }
779            Self::AntiJoin(op) => {
780                let _ = writeln!(out, "{indent}AntiJoin");
781                op.left.fmt_tree(out, depth + 1);
782                op.right.fmt_tree(out, depth + 1);
783            }
784            Self::Unwind(op) => {
785                let _ = writeln!(out, "{indent}Unwind ({var})", var = op.variable);
786                op.input.fmt_tree(out, depth + 1);
787            }
788            Self::Bind(op) => {
789                let _ = writeln!(out, "{indent}Bind ({var})", var = op.variable);
790                op.input.fmt_tree(out, depth + 1);
791            }
792            Self::MapCollect(op) => {
793                let _ = writeln!(
794                    out,
795                    "{indent}MapCollect ({key} -> {val} AS {alias})",
796                    key = op.key_var,
797                    val = op.value_var,
798                    alias = op.alias
799                );
800                op.input.fmt_tree(out, depth + 1);
801            }
802            Self::Apply(op) => {
803                let _ = writeln!(out, "{indent}Apply");
804                op.input.fmt_tree(out, depth + 1);
805                op.subplan.fmt_tree(out, depth + 1);
806            }
807            Self::Except(op) => {
808                let all = if op.all { " ALL" } else { "" };
809                let _ = writeln!(out, "{indent}Except{all}");
810                op.left.fmt_tree(out, depth + 1);
811                op.right.fmt_tree(out, depth + 1);
812            }
813            Self::Intersect(op) => {
814                let all = if op.all { " ALL" } else { "" };
815                let _ = writeln!(out, "{indent}Intersect{all}");
816                op.left.fmt_tree(out, depth + 1);
817                op.right.fmt_tree(out, depth + 1);
818            }
819            Self::Otherwise(op) => {
820                let _ = writeln!(out, "{indent}Otherwise");
821                op.left.fmt_tree(out, depth + 1);
822                op.right.fmt_tree(out, depth + 1);
823            }
824            Self::ShortestPath(op) => {
825                let _ = writeln!(
826                    out,
827                    "{indent}ShortestPath ({from} -> {to})",
828                    from = op.source_var,
829                    to = op.target_var
830                );
831                op.input.fmt_tree(out, depth + 1);
832            }
833            Self::Merge(op) => {
834                let _ = writeln!(out, "{indent}Merge ({var})", var = op.variable);
835                op.input.fmt_tree(out, depth + 1);
836            }
837            Self::MergeRelationship(op) => {
838                let _ = writeln!(out, "{indent}MergeRelationship ({var})", var = op.variable);
839                op.input.fmt_tree(out, depth + 1);
840            }
841            Self::CreateNode(op) => {
842                let labels = op.labels.join(":");
843                let _ = writeln!(
844                    out,
845                    "{indent}CreateNode ({var}:{labels})",
846                    var = op.variable
847                );
848                if let Some(input) = &op.input {
849                    input.fmt_tree(out, depth + 1);
850                }
851            }
852            Self::CreateEdge(op) => {
853                let var = op.variable.as_deref().unwrap_or("?");
854                let _ = writeln!(
855                    out,
856                    "{indent}CreateEdge ({from})-[{var}:{ty}]->({to})",
857                    from = op.from_variable,
858                    ty = op.edge_type,
859                    to = op.to_variable
860                );
861                op.input.fmt_tree(out, depth + 1);
862            }
863            Self::DeleteNode(op) => {
864                let _ = writeln!(out, "{indent}DeleteNode ({var})", var = op.variable);
865                op.input.fmt_tree(out, depth + 1);
866            }
867            Self::DeleteEdge(op) => {
868                let _ = writeln!(out, "{indent}DeleteEdge ({var})", var = op.variable);
869                op.input.fmt_tree(out, depth + 1);
870            }
871            Self::SetProperty(op) => {
872                let props: Vec<String> = op
873                    .properties
874                    .iter()
875                    .map(|(k, _)| format!("{}.{k}", op.variable))
876                    .collect();
877                let _ = writeln!(
878                    out,
879                    "{indent}SetProperty ({props})",
880                    props = props.join(", ")
881                );
882                op.input.fmt_tree(out, depth + 1);
883            }
884            Self::AddLabel(op) => {
885                let labels = op.labels.join(":");
886                let _ = writeln!(out, "{indent}AddLabel ({var}:{labels})", var = op.variable);
887                op.input.fmt_tree(out, depth + 1);
888            }
889            Self::RemoveLabel(op) => {
890                let labels = op.labels.join(":");
891                let _ = writeln!(
892                    out,
893                    "{indent}RemoveLabel ({var}:{labels})",
894                    var = op.variable
895                );
896                op.input.fmt_tree(out, depth + 1);
897            }
898            Self::CallProcedure(op) => {
899                let _ = writeln!(
900                    out,
901                    "{indent}CallProcedure ({name})",
902                    name = op.name.join(".")
903                );
904            }
905            Self::LoadData(op) => {
906                let format_name = match op.format {
907                    LoadDataFormat::Csv => "LoadCsv",
908                    LoadDataFormat::Jsonl => "LoadJsonl",
909                    LoadDataFormat::Parquet => "LoadParquet",
910                    _ => "LoadData",
911                };
912                let headers = if op.with_headers && op.format == LoadDataFormat::Csv {
913                    " WITH HEADERS"
914                } else {
915                    ""
916                };
917                let _ = writeln!(
918                    out,
919                    "{indent}{format_name}{headers} ('{path}' AS {var})",
920                    path = op.path,
921                    var = op.variable,
922                );
923            }
924            Self::TripleScan(op) => {
925                let _ = writeln!(
926                    out,
927                    "{indent}TripleScan ({s} {p} {o})",
928                    s = fmt_triple_component(&op.subject),
929                    p = fmt_triple_component(&op.predicate),
930                    o = fmt_triple_component(&op.object)
931                );
932                if let Some(input) = &op.input {
933                    input.fmt_tree(out, depth + 1);
934                }
935            }
936            Self::VectorScan(op) => {
937                let metric = op.metric.map_or("default", |m| match m {
938                    VectorMetric::Cosine => "cosine",
939                    VectorMetric::Euclidean => "euclidean",
940                    VectorMetric::DotProduct => "dot_product",
941                    VectorMetric::Manhattan => "manhattan",
942                });
943                let mode = match op.k {
944                    Some(k) => format!("top-{k}"),
945                    None => "threshold".to_string(),
946                };
947                let _ = writeln!(
948                    out,
949                    "{indent}VectorScan ({var}:{label}.{prop}, {metric}, {mode})",
950                    var = op.variable,
951                    label = op.label.as_deref().unwrap_or("*"),
952                    prop = op.property,
953                );
954                if let Some(input) = &op.input {
955                    input.fmt_tree(out, depth + 1);
956                }
957            }
958            Self::TextScan(op) => {
959                let mode = match (op.k, op.threshold) {
960                    (Some(k), _) => format!("top-{k}"),
961                    (None, Some(t)) => format!("threshold>={t}"),
962                    (None, None) => "default-top-100".to_string(),
963                };
964                let query = fmt_expr(&op.query);
965                let _ = writeln!(
966                    out,
967                    "{indent}TextScan ({var}:{label}.{prop}, query={query}, {mode})",
968                    var = op.variable,
969                    label = op.label,
970                    prop = op.property,
971                );
972            }
973            Self::Empty => {
974                let _ = writeln!(out, "{indent}Empty");
975            }
976            // Remaining operators: show a simple name
977            _ => {
978                let _ = writeln!(out, "{indent}{:?}", std::mem::discriminant(self));
979            }
980        }
981    }
982}
983
984/// Format a logical expression compactly for EXPLAIN output.
985fn fmt_expr(expr: &LogicalExpression) -> String {
986    match expr {
987        LogicalExpression::Variable(name) => name.clone(),
988        LogicalExpression::Property { variable, property } => format!("{variable}.{property}"),
989        LogicalExpression::Literal(val) => format!("{val}"),
990        LogicalExpression::Binary { left, op, right } => {
991            format!("{} {op:?} {}", fmt_expr(left), fmt_expr(right))
992        }
993        LogicalExpression::Unary { op, operand } => {
994            format!("{op:?} {}", fmt_expr(operand))
995        }
996        LogicalExpression::FunctionCall { name, args, .. } => {
997            let arg_strs: Vec<String> = args.iter().map(fmt_expr).collect();
998            format!("{name}({})", arg_strs.join(", "))
999        }
1000        _ => format!("{expr:?}"),
1001    }
1002}
1003
1004/// Format a triple component for EXPLAIN output.
1005fn fmt_triple_component(comp: &TripleComponent) -> String {
1006    match comp {
1007        TripleComponent::Variable(name) => format!("?{name}"),
1008        TripleComponent::Iri(iri) => format!("<{iri}>"),
1009        TripleComponent::Literal(val) => format!("{val}"),
1010        TripleComponent::LangLiteral { value, lang } => format!("\"{value}\"@{lang}"),
1011        TripleComponent::BlankNode(label) => format!("_:{label}"),
1012    }
1013}
1014
1015/// Scan nodes from the graph.
1016#[derive(Debug, Clone)]
1017pub struct NodeScanOp {
1018    /// Variable name to bind the node to.
1019    pub variable: String,
1020    /// Optional label filter.
1021    pub label: Option<String>,
1022    /// Child operator (if any, for chained patterns).
1023    pub input: Option<Box<LogicalOperator>>,
1024}
1025
1026/// Scan edges from the graph.
1027#[derive(Debug, Clone)]
1028pub struct EdgeScanOp {
1029    /// Variable name to bind the edge to.
1030    pub variable: String,
1031    /// Edge type filter (empty = match all types).
1032    pub edge_types: Vec<String>,
1033    /// Child operator (if any).
1034    pub input: Option<Box<LogicalOperator>>,
1035}
1036
1037/// Path traversal mode for variable-length expansion.
1038#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1039#[non_exhaustive]
1040pub enum PathMode {
1041    /// Allows repeated nodes and edges (default).
1042    #[default]
1043    Walk,
1044    /// No repeated edges.
1045    Trail,
1046    /// No repeated nodes except endpoints.
1047    Simple,
1048    /// No repeated nodes at all.
1049    Acyclic,
1050}
1051
1052/// Expand from nodes to their neighbors.
1053#[derive(Debug, Clone)]
1054pub struct ExpandOp {
1055    /// Source node variable.
1056    pub from_variable: String,
1057    /// Target node variable to bind.
1058    pub to_variable: String,
1059    /// Edge variable to bind (optional).
1060    pub edge_variable: Option<String>,
1061    /// Direction of expansion.
1062    pub direction: ExpandDirection,
1063    /// Edge type filter (empty = match all types, multiple = match any).
1064    pub edge_types: Vec<String>,
1065    /// Minimum hops (for variable-length patterns).
1066    pub min_hops: u32,
1067    /// Maximum hops (for variable-length patterns).
1068    pub max_hops: Option<u32>,
1069    /// Input operator.
1070    pub input: Box<LogicalOperator>,
1071    /// Path alias for variable-length patterns (e.g., `p` in `p = (a)-[*1..3]->(b)`).
1072    /// When set, a path length column will be output under this name.
1073    pub path_alias: Option<String>,
1074    /// Path traversal mode (WALK, TRAIL, SIMPLE, ACYCLIC).
1075    pub path_mode: PathMode,
1076}
1077
1078/// Direction for edge expansion.
1079#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1080#[non_exhaustive]
1081pub enum ExpandDirection {
1082    /// Follow outgoing edges.
1083    Outgoing,
1084    /// Follow incoming edges.
1085    Incoming,
1086    /// Follow edges in either direction.
1087    Both,
1088}
1089
1090/// Join two inputs.
1091#[derive(Debug, Clone)]
1092pub struct JoinOp {
1093    /// Left input.
1094    pub left: Box<LogicalOperator>,
1095    /// Right input.
1096    pub right: Box<LogicalOperator>,
1097    /// Join type.
1098    pub join_type: JoinType,
1099    /// Join conditions.
1100    pub conditions: Vec<JoinCondition>,
1101}
1102
1103/// Join type.
1104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1105#[non_exhaustive]
1106pub enum JoinType {
1107    /// Inner join.
1108    Inner,
1109    /// Left outer join.
1110    Left,
1111    /// Right outer join.
1112    Right,
1113    /// Full outer join.
1114    Full,
1115    /// Cross join (Cartesian product).
1116    Cross,
1117    /// Semi join (returns left rows with matching right rows).
1118    Semi,
1119    /// Anti join (returns left rows without matching right rows).
1120    Anti,
1121}
1122
1123/// A join condition.
1124#[derive(Debug, Clone)]
1125pub struct JoinCondition {
1126    /// Left expression.
1127    pub left: LogicalExpression,
1128    /// Right expression.
1129    pub right: LogicalExpression,
1130}
1131
1132/// Multi-way join for worst-case optimal joins (leapfrog).
1133///
1134/// Unlike binary `JoinOp`, this joins 3+ relations simultaneously
1135/// using the leapfrog trie join algorithm. Preferred for cyclic patterns
1136/// (triangles, cliques) where cascading binary joins hit O(N^2).
1137#[derive(Debug, Clone)]
1138pub struct MultiWayJoinOp {
1139    /// Input relations (one per relation in the join).
1140    pub inputs: Vec<LogicalOperator>,
1141    /// All pairwise join conditions.
1142    pub conditions: Vec<JoinCondition>,
1143    /// Variables shared across multiple inputs (intersection keys).
1144    pub shared_variables: Vec<String>,
1145}
1146
1147/// Aggregate with grouping.
1148#[derive(Debug, Clone)]
1149pub struct AggregateOp {
1150    /// Group by expressions.
1151    pub group_by: Vec<LogicalExpression>,
1152    /// Aggregate functions.
1153    pub aggregates: Vec<AggregateExpr>,
1154    /// Input operator.
1155    pub input: Box<LogicalOperator>,
1156    /// HAVING clause filter (applied after aggregation).
1157    pub having: Option<LogicalExpression>,
1158}
1159
1160/// Whether a horizontal aggregate operates on edges or nodes.
1161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1162#[non_exhaustive]
1163pub enum EntityKind {
1164    /// Aggregate over edges in a path.
1165    Edge,
1166    /// Aggregate over nodes in a path.
1167    Node,
1168}
1169
1170/// Per-row aggregation over a list-valued column (horizontal aggregation, GE09).
1171///
1172/// For each input row, reads a list of entity IDs from `list_column`, accesses
1173/// `property` on each entity, computes the aggregate, and emits the scalar result.
1174#[derive(Debug, Clone)]
1175pub struct HorizontalAggregateOp {
1176    /// The list column name (e.g., `_path_edges_p`).
1177    pub list_column: String,
1178    /// Whether the list contains edge IDs or node IDs.
1179    pub entity_kind: EntityKind,
1180    /// The aggregate function to apply.
1181    pub function: AggregateFunction,
1182    /// The property to access on each entity.
1183    pub property: String,
1184    /// Output alias for the result column.
1185    pub alias: String,
1186    /// Input operator.
1187    pub input: Box<LogicalOperator>,
1188}
1189
1190/// An aggregate expression.
1191#[derive(Debug, Clone)]
1192pub struct AggregateExpr {
1193    /// Aggregate function.
1194    pub function: AggregateFunction,
1195    /// Expression to aggregate (first/only argument, y for binary set functions).
1196    pub expression: Option<LogicalExpression>,
1197    /// Second expression for binary set functions (x for COVAR, CORR, REGR_*).
1198    pub expression2: Option<LogicalExpression>,
1199    /// Whether to use DISTINCT.
1200    pub distinct: bool,
1201    /// Alias for the result.
1202    pub alias: Option<String>,
1203    /// Percentile parameter for PERCENTILE_DISC/PERCENTILE_CONT (0.0 to 1.0).
1204    pub percentile: Option<f64>,
1205    /// Separator string for GROUP_CONCAT / LISTAGG (defaults to space for GROUP_CONCAT, comma for LISTAGG).
1206    pub separator: Option<String>,
1207}
1208
1209/// Aggregate function.
1210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1211#[non_exhaustive]
1212pub enum AggregateFunction {
1213    /// Count all rows (COUNT(*)).
1214    Count,
1215    /// Count non-null values (COUNT(expr)).
1216    CountNonNull,
1217    /// Sum values.
1218    Sum,
1219    /// Average values.
1220    Avg,
1221    /// Minimum value.
1222    Min,
1223    /// Maximum value.
1224    Max,
1225    /// Collect into list.
1226    Collect,
1227    /// Sample standard deviation (STDEV).
1228    StdDev,
1229    /// Population standard deviation (STDEVP).
1230    StdDevPop,
1231    /// Sample variance (VAR_SAMP / VARIANCE).
1232    Variance,
1233    /// Population variance (VAR_POP).
1234    VariancePop,
1235    /// Discrete percentile (PERCENTILE_DISC).
1236    PercentileDisc,
1237    /// Continuous percentile (PERCENTILE_CONT).
1238    PercentileCont,
1239    /// Concatenate values with separator (GROUP_CONCAT).
1240    GroupConcat,
1241    /// Return an arbitrary value from the group (SAMPLE).
1242    Sample,
1243    /// Sample covariance (COVAR_SAMP(y, x)).
1244    CovarSamp,
1245    /// Population covariance (COVAR_POP(y, x)).
1246    CovarPop,
1247    /// Pearson correlation coefficient (CORR(y, x)).
1248    Corr,
1249    /// Regression slope (REGR_SLOPE(y, x)).
1250    RegrSlope,
1251    /// Regression intercept (REGR_INTERCEPT(y, x)).
1252    RegrIntercept,
1253    /// Coefficient of determination (REGR_R2(y, x)).
1254    RegrR2,
1255    /// Regression count of non-null pairs (REGR_COUNT(y, x)).
1256    RegrCount,
1257    /// Regression sum of squares for x (REGR_SXX(y, x)).
1258    RegrSxx,
1259    /// Regression sum of squares for y (REGR_SYY(y, x)).
1260    RegrSyy,
1261    /// Regression sum of cross-products (REGR_SXY(y, x)).
1262    RegrSxy,
1263    /// Regression average of x (REGR_AVGX(y, x)).
1264    RegrAvgx,
1265    /// Regression average of y (REGR_AVGY(y, x)).
1266    RegrAvgy,
1267}
1268
1269/// Hint about how a filter will be executed at the physical level.
1270///
1271/// Set during EXPLAIN annotation to communicate pushdown decisions.
1272#[derive(Debug, Clone)]
1273#[non_exhaustive]
1274pub enum PushdownHint {
1275    /// Equality predicate resolved via a property index.
1276    IndexLookup {
1277        /// The indexed property name.
1278        property: String,
1279    },
1280    /// Range predicate resolved via a range/btree index.
1281    RangeScan {
1282        /// The indexed property name.
1283        property: String,
1284    },
1285    /// No index available, but label narrows the scan before filtering.
1286    LabelFirst,
1287}
1288
1289/// Filter rows based on a predicate.
1290#[derive(Debug, Clone)]
1291pub struct FilterOp {
1292    /// The filter predicate.
1293    pub predicate: LogicalExpression,
1294    /// Input operator.
1295    pub input: Box<LogicalOperator>,
1296    /// Optional hint about pushdown strategy (populated by EXPLAIN).
1297    pub pushdown_hint: Option<PushdownHint>,
1298}
1299
1300/// Project specific columns.
1301#[derive(Debug, Clone)]
1302pub struct ProjectOp {
1303    /// Columns to project.
1304    pub projections: Vec<Projection>,
1305    /// Input operator.
1306    pub input: Box<LogicalOperator>,
1307    /// When true, all input columns are passed through and the explicit
1308    /// projections are appended as additional output columns. Used by GQL
1309    /// LET clauses which add bindings without replacing the existing scope.
1310    pub pass_through_input: bool,
1311}
1312
1313/// A single projection (column selection or computation).
1314#[derive(Debug, Clone)]
1315pub struct Projection {
1316    /// Expression to compute.
1317    pub expression: LogicalExpression,
1318    /// Alias for the result.
1319    pub alias: Option<String>,
1320}
1321
1322/// Limit the number of results.
1323#[derive(Debug, Clone)]
1324pub struct LimitOp {
1325    /// Maximum number of rows to return (literal or parameter reference).
1326    pub count: CountExpr,
1327    /// Input operator.
1328    pub input: Box<LogicalOperator>,
1329}
1330
1331/// Skip a number of results.
1332#[derive(Debug, Clone)]
1333pub struct SkipOp {
1334    /// Number of rows to skip (literal or parameter reference).
1335    pub count: CountExpr,
1336    /// Input operator.
1337    pub input: Box<LogicalOperator>,
1338}
1339
1340/// Sort results.
1341#[derive(Debug, Clone)]
1342pub struct SortOp {
1343    /// Sort keys.
1344    pub keys: Vec<SortKey>,
1345    /// Input operator.
1346    pub input: Box<LogicalOperator>,
1347}
1348
1349/// A sort key.
1350#[derive(Debug, Clone)]
1351pub struct SortKey {
1352    /// Expression to sort by.
1353    pub expression: LogicalExpression,
1354    /// Sort order.
1355    pub order: SortOrder,
1356    /// Optional null ordering (NULLS FIRST / NULLS LAST).
1357    pub nulls: Option<NullsOrdering>,
1358}
1359
1360/// Sort order.
1361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1362#[non_exhaustive]
1363pub enum SortOrder {
1364    /// Ascending order.
1365    Ascending,
1366    /// Descending order.
1367    Descending,
1368}
1369
1370/// Null ordering for sort operations.
1371#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1372#[non_exhaustive]
1373pub enum NullsOrdering {
1374    /// Nulls sort before all non-null values.
1375    First,
1376    /// Nulls sort after all non-null values.
1377    Last,
1378}
1379
1380/// Remove duplicate results.
1381#[derive(Debug, Clone)]
1382pub struct DistinctOp {
1383    /// Input operator.
1384    pub input: Box<LogicalOperator>,
1385    /// Optional columns to use for deduplication.
1386    /// If None, all columns are used.
1387    pub columns: Option<Vec<String>>,
1388}
1389
1390/// Create a new node.
1391#[derive(Debug, Clone)]
1392pub struct CreateNodeOp {
1393    /// Variable name to bind the created node to.
1394    pub variable: String,
1395    /// Labels for the new node.
1396    pub labels: Vec<String>,
1397    /// Properties for the new node.
1398    pub properties: Vec<(String, LogicalExpression)>,
1399    /// Input operator (for chained creates).
1400    pub input: Option<Box<LogicalOperator>>,
1401}
1402
1403/// Create a new edge.
1404#[derive(Debug, Clone)]
1405pub struct CreateEdgeOp {
1406    /// Variable name to bind the created edge to.
1407    pub variable: Option<String>,
1408    /// Source node variable.
1409    pub from_variable: String,
1410    /// Target node variable.
1411    pub to_variable: String,
1412    /// Edge type.
1413    pub edge_type: String,
1414    /// Properties for the new edge.
1415    pub properties: Vec<(String, LogicalExpression)>,
1416    /// Input operator.
1417    pub input: Box<LogicalOperator>,
1418}
1419
1420/// Delete a node.
1421#[derive(Debug, Clone)]
1422pub struct DeleteNodeOp {
1423    /// Variable of the node to delete.
1424    pub variable: String,
1425    /// Whether to detach (delete connected edges) before deleting.
1426    pub detach: bool,
1427    /// Input operator.
1428    pub input: Box<LogicalOperator>,
1429}
1430
1431/// Delete an edge.
1432#[derive(Debug, Clone)]
1433pub struct DeleteEdgeOp {
1434    /// Variable of the edge to delete.
1435    pub variable: String,
1436    /// Input operator.
1437    pub input: Box<LogicalOperator>,
1438}
1439
1440/// Set properties on a node or edge.
1441#[derive(Debug, Clone)]
1442pub struct SetPropertyOp {
1443    /// Variable of the entity to update.
1444    pub variable: String,
1445    /// Properties to set (name -> expression).
1446    pub properties: Vec<(String, LogicalExpression)>,
1447    /// Whether to replace all properties (vs. merge).
1448    pub replace: bool,
1449    /// Whether the target variable is an edge (vs. node).
1450    pub is_edge: bool,
1451    /// Input operator.
1452    pub input: Box<LogicalOperator>,
1453}
1454
1455/// Add labels to a node.
1456#[derive(Debug, Clone)]
1457pub struct AddLabelOp {
1458    /// Variable of the node to update.
1459    pub variable: String,
1460    /// Labels to add.
1461    pub labels: Vec<String>,
1462    /// Input operator.
1463    pub input: Box<LogicalOperator>,
1464}
1465
1466/// Remove labels from a node.
1467#[derive(Debug, Clone)]
1468pub struct RemoveLabelOp {
1469    /// Variable of the node to update.
1470    pub variable: String,
1471    /// Labels to remove.
1472    pub labels: Vec<String>,
1473    /// Input operator.
1474    pub input: Box<LogicalOperator>,
1475}
1476
1477// ==================== RDF/SPARQL Operators ====================
1478
1479/// SPARQL dataset restriction from FROM / FROM NAMED clauses.
1480///
1481/// When present, restricts which graphs are visible to a triple scan:
1482/// - `default_graphs`: IRIs whose union forms the default graph (basic patterns).
1483/// - `named_graphs`: IRIs that enumerate the available named graphs (GRAPH patterns).
1484#[derive(Debug, Clone, Default)]
1485pub struct DatasetRestriction {
1486    /// FROM IRIs: the default graph is the union of these named graphs.
1487    /// Empty means no FROM clause was specified (unrestricted default graph).
1488    pub default_graphs: Vec<String>,
1489    /// FROM NAMED IRIs: only these named graphs are available to GRAPH patterns.
1490    /// Empty means no FROM NAMED clause was specified (all named graphs visible).
1491    pub named_graphs: Vec<String>,
1492}
1493
1494/// Scan RDF triples matching a pattern.
1495#[derive(Debug, Clone)]
1496pub struct TripleScanOp {
1497    /// Subject pattern (variable name or IRI).
1498    pub subject: TripleComponent,
1499    /// Predicate pattern (variable name or IRI).
1500    pub predicate: TripleComponent,
1501    /// Object pattern (variable name, IRI, or literal).
1502    pub object: TripleComponent,
1503    /// Named graph (optional).
1504    pub graph: Option<TripleComponent>,
1505    /// Input operator (for chained patterns).
1506    pub input: Option<Box<LogicalOperator>>,
1507    /// Dataset restriction from SPARQL FROM / FROM NAMED clauses.
1508    pub dataset: Option<DatasetRestriction>,
1509}
1510
1511/// A component of a triple pattern.
1512#[derive(Debug, Clone)]
1513#[non_exhaustive]
1514pub enum TripleComponent {
1515    /// A variable to bind.
1516    Variable(String),
1517    /// A constant IRI.
1518    Iri(String),
1519    /// A constant literal value.
1520    Literal(Value),
1521    /// A language-tagged string literal (RDF `rdf:langString`).
1522    ///
1523    /// Carries the lexical value and the BCP47 language tag separately so that
1524    /// the tag survives the translator to planner to RDF store round-trip.
1525    LangLiteral {
1526        /// The lexical string value.
1527        value: String,
1528        /// BCP47 language tag, e.g. `"fr"`, `"en-GB"`.
1529        lang: String,
1530    },
1531    /// A blank node with a scoped label (used in INSERT DATA).
1532    BlankNode(String),
1533}
1534
1535impl TripleComponent {
1536    /// Returns the variable name if this component is a `Variable`, or `None`.
1537    #[must_use]
1538    pub fn as_variable(&self) -> Option<&str> {
1539        match self {
1540            Self::Variable(v) => Some(v),
1541            _ => None,
1542        }
1543    }
1544}
1545
1546/// Union of multiple result sets.
1547#[derive(Debug, Clone)]
1548pub struct UnionOp {
1549    /// Inputs to union together.
1550    pub inputs: Vec<LogicalOperator>,
1551}
1552
1553/// Set difference: rows in left that are not in right.
1554#[derive(Debug, Clone)]
1555pub struct ExceptOp {
1556    /// Left input.
1557    pub left: Box<LogicalOperator>,
1558    /// Right input (rows to exclude).
1559    pub right: Box<LogicalOperator>,
1560    /// If true, preserve duplicates (EXCEPT ALL); if false, deduplicate (EXCEPT DISTINCT).
1561    pub all: bool,
1562}
1563
1564/// Set intersection: rows common to both inputs.
1565#[derive(Debug, Clone)]
1566pub struct IntersectOp {
1567    /// Left input.
1568    pub left: Box<LogicalOperator>,
1569    /// Right input.
1570    pub right: Box<LogicalOperator>,
1571    /// If true, preserve duplicates (INTERSECT ALL); if false, deduplicate (INTERSECT DISTINCT).
1572    pub all: bool,
1573}
1574
1575/// Fallback operator: use left result if non-empty, otherwise use right.
1576#[derive(Debug, Clone)]
1577pub struct OtherwiseOp {
1578    /// Primary input (preferred).
1579    pub left: Box<LogicalOperator>,
1580    /// Fallback input (used only if left produces zero rows).
1581    pub right: Box<LogicalOperator>,
1582}
1583
1584/// Apply (lateral join): evaluate a subplan for each row of the outer input.
1585///
1586/// The subplan can reference variables bound by the outer input. Results are
1587/// concatenated (cross-product per row).
1588#[derive(Debug, Clone)]
1589pub struct ApplyOp {
1590    /// Outer input providing rows.
1591    pub input: Box<LogicalOperator>,
1592    /// Subplan to evaluate per outer row.
1593    pub subplan: Box<LogicalOperator>,
1594    /// Variables imported from the outer scope into the inner plan.
1595    /// When non-empty, the planner injects these via `ParameterState`.
1596    pub shared_variables: Vec<String>,
1597    /// When true, uses left-join semantics: outer rows with no matching inner
1598    /// rows are emitted with NULLs for the inner columns (OPTIONAL CALL).
1599    pub optional: bool,
1600}
1601
1602/// Parameter scan: leaf operator for correlated subquery inner plans.
1603///
1604/// Emits a single row containing the values injected from the outer Apply.
1605/// Column names correspond to the outer variables imported via WITH.
1606#[derive(Debug, Clone)]
1607pub struct ParameterScanOp {
1608    /// Column names for the injected parameters.
1609    pub columns: Vec<String>,
1610}
1611
1612/// Left outer join for OPTIONAL patterns.
1613#[derive(Debug, Clone)]
1614pub struct LeftJoinOp {
1615    /// Left (required) input.
1616    pub left: Box<LogicalOperator>,
1617    /// Right (optional) input.
1618    pub right: Box<LogicalOperator>,
1619    /// Optional filter condition.
1620    pub condition: Option<LogicalExpression>,
1621}
1622
1623/// Anti-join for MINUS patterns.
1624#[derive(Debug, Clone)]
1625pub struct AntiJoinOp {
1626    /// Left input (results to keep if no match on right).
1627    pub left: Box<LogicalOperator>,
1628    /// Right input (patterns to exclude).
1629    pub right: Box<LogicalOperator>,
1630}
1631
1632/// Bind a variable to an expression.
1633#[derive(Debug, Clone)]
1634pub struct BindOp {
1635    /// Expression to compute.
1636    pub expression: LogicalExpression,
1637    /// Variable to bind the result to.
1638    pub variable: String,
1639    /// Input operator.
1640    pub input: Box<LogicalOperator>,
1641}
1642
1643/// Unwind a list into individual rows.
1644///
1645/// For each input row, evaluates the expression (which should return a list)
1646/// and emits one row for each element in the list.
1647#[derive(Debug, Clone)]
1648pub struct UnwindOp {
1649    /// The list expression to unwind.
1650    pub expression: LogicalExpression,
1651    /// The variable name for each element.
1652    pub variable: String,
1653    /// Optional variable for 1-based element position (ORDINALITY).
1654    pub ordinality_var: Option<String>,
1655    /// Optional variable for 0-based element position (OFFSET).
1656    pub offset_var: Option<String>,
1657    /// Input operator.
1658    pub input: Box<LogicalOperator>,
1659}
1660
1661/// Collect grouped key-value rows into a single Map value.
1662/// Used for Gremlin `groupCount()` semantics.
1663#[derive(Debug, Clone)]
1664pub struct MapCollectOp {
1665    /// Variable holding the map key.
1666    pub key_var: String,
1667    /// Variable holding the map value.
1668    pub value_var: String,
1669    /// Output variable alias.
1670    pub alias: String,
1671    /// Input operator (typically a grouped aggregate).
1672    pub input: Box<LogicalOperator>,
1673}
1674
1675/// Merge a pattern (match or create).
1676///
1677/// MERGE tries to match a pattern in the graph. If found, returns the existing
1678/// elements (optionally applying ON MATCH SET). If not found, creates the pattern
1679/// (optionally applying ON CREATE SET).
1680#[derive(Debug, Clone)]
1681pub struct MergeOp {
1682    /// The node to merge.
1683    pub variable: String,
1684    /// Labels to match/create.
1685    pub labels: Vec<String>,
1686    /// Properties that must match (used for both matching and creation).
1687    pub match_properties: Vec<(String, LogicalExpression)>,
1688    /// Properties to set on CREATE.
1689    pub on_create: Vec<(String, LogicalExpression)>,
1690    /// Properties to set on MATCH.
1691    pub on_match: Vec<(String, LogicalExpression)>,
1692    /// Input operator.
1693    pub input: Box<LogicalOperator>,
1694}
1695
1696/// Merge a relationship pattern (match or create between two bound nodes).
1697///
1698/// MERGE on a relationship tries to find an existing relationship of the given type
1699/// between the source and target nodes. If found, returns the existing relationship
1700/// (optionally applying ON MATCH SET). If not found, creates it (optionally applying
1701/// ON CREATE SET).
1702#[derive(Debug, Clone)]
1703pub struct MergeRelationshipOp {
1704    /// Variable to bind the relationship to.
1705    pub variable: String,
1706    /// Source node variable (must already be bound).
1707    pub source_variable: String,
1708    /// Target node variable (must already be bound).
1709    pub target_variable: String,
1710    /// Relationship type.
1711    pub edge_type: String,
1712    /// Properties that must match (used for both matching and creation).
1713    pub match_properties: Vec<(String, LogicalExpression)>,
1714    /// Properties to set on CREATE.
1715    pub on_create: Vec<(String, LogicalExpression)>,
1716    /// Properties to set on MATCH.
1717    pub on_match: Vec<(String, LogicalExpression)>,
1718    /// Input operator.
1719    pub input: Box<LogicalOperator>,
1720}
1721
1722/// Find shortest path between two nodes.
1723///
1724/// This operator uses Dijkstra's algorithm to find the shortest path(s)
1725/// between a source node and a target node, optionally filtered by edge type.
1726#[derive(Debug, Clone)]
1727pub struct ShortestPathOp {
1728    /// Input operator providing source/target nodes.
1729    pub input: Box<LogicalOperator>,
1730    /// Variable name for the source node.
1731    pub source_var: String,
1732    /// Variable name for the target node.
1733    pub target_var: String,
1734    /// Edge type filter (empty = match all types, multiple = match any).
1735    pub edge_types: Vec<String>,
1736    /// Direction of edge traversal.
1737    pub direction: ExpandDirection,
1738    /// Variable name to bind the path result.
1739    pub path_alias: String,
1740    /// Whether to find all shortest paths (vs. just one).
1741    pub all_paths: bool,
1742}
1743
1744// ==================== SPARQL Update Operators ====================
1745
1746/// Insert RDF triples.
1747#[derive(Debug, Clone)]
1748pub struct InsertTripleOp {
1749    /// Subject of the triple.
1750    pub subject: TripleComponent,
1751    /// Predicate of the triple.
1752    pub predicate: TripleComponent,
1753    /// Object of the triple.
1754    pub object: TripleComponent,
1755    /// Named graph (optional).
1756    pub graph: Option<String>,
1757    /// Input operator (provides variable bindings).
1758    pub input: Option<Box<LogicalOperator>>,
1759}
1760
1761/// Delete RDF triples.
1762#[derive(Debug, Clone)]
1763pub struct DeleteTripleOp {
1764    /// Subject pattern.
1765    pub subject: TripleComponent,
1766    /// Predicate pattern.
1767    pub predicate: TripleComponent,
1768    /// Object pattern.
1769    pub object: TripleComponent,
1770    /// Named graph (optional).
1771    pub graph: Option<String>,
1772    /// Input operator (provides variable bindings).
1773    pub input: Option<Box<LogicalOperator>>,
1774}
1775
1776/// SPARQL MODIFY operation (DELETE/INSERT WHERE).
1777///
1778/// Per SPARQL 1.1 Update spec, this operator:
1779/// 1. Evaluates the WHERE clause once to get bindings
1780/// 2. Applies DELETE templates using those bindings
1781/// 3. Applies INSERT templates using the SAME bindings
1782///
1783/// This ensures DELETE and INSERT see consistent data.
1784#[derive(Debug, Clone)]
1785pub struct ModifyOp {
1786    /// DELETE triple templates (patterns with variables).
1787    pub delete_templates: Vec<TripleTemplate>,
1788    /// INSERT triple templates (patterns with variables).
1789    pub insert_templates: Vec<TripleTemplate>,
1790    /// WHERE clause that provides variable bindings.
1791    pub where_clause: Box<LogicalOperator>,
1792    /// Named graph context (for WITH clause).
1793    pub graph: Option<String>,
1794}
1795
1796/// A triple template for DELETE/INSERT operations.
1797#[derive(Debug, Clone)]
1798pub struct TripleTemplate {
1799    /// Subject (may be a variable).
1800    pub subject: TripleComponent,
1801    /// Predicate (may be a variable).
1802    pub predicate: TripleComponent,
1803    /// Object (may be a variable or literal).
1804    pub object: TripleComponent,
1805    /// Named graph (optional).
1806    pub graph: Option<String>,
1807}
1808
1809/// SPARQL CONSTRUCT: evaluate WHERE, substitute bindings into template.
1810///
1811/// Produces rows with columns `subject`, `predicate`, `object` by instantiating
1812/// the template once per binding from the WHERE clause.
1813#[derive(Debug, Clone)]
1814pub struct ConstructOp {
1815    /// Triple templates to instantiate.
1816    pub templates: Vec<TripleTemplate>,
1817    /// Input operator (WHERE clause evaluation).
1818    pub input: Box<LogicalOperator>,
1819}
1820
1821/// Clear all triples from a graph.
1822#[derive(Debug, Clone)]
1823pub struct ClearGraphOp {
1824    /// Target graph (None = default graph, Some("") = all named, Some(iri) = specific graph).
1825    pub graph: Option<String>,
1826    /// Whether to silently ignore errors.
1827    pub silent: bool,
1828}
1829
1830/// Create a new named graph.
1831#[derive(Debug, Clone)]
1832pub struct CreateGraphOp {
1833    /// IRI of the graph to create.
1834    pub graph: String,
1835    /// Whether to silently ignore if graph already exists.
1836    pub silent: bool,
1837}
1838
1839/// Drop (remove) a named graph.
1840#[derive(Debug, Clone)]
1841pub struct DropGraphOp {
1842    /// Target graph (None = default graph).
1843    pub graph: Option<String>,
1844    /// Whether to silently ignore errors.
1845    pub silent: bool,
1846}
1847
1848/// Load data from a URL into a graph.
1849#[derive(Debug, Clone)]
1850pub struct LoadGraphOp {
1851    /// Source URL to load data from.
1852    pub source: String,
1853    /// Destination graph (None = default graph).
1854    pub destination: Option<String>,
1855    /// Whether to silently ignore errors.
1856    pub silent: bool,
1857}
1858
1859/// Copy triples from one graph to another.
1860#[derive(Debug, Clone)]
1861pub struct CopyGraphOp {
1862    /// Source graph.
1863    pub source: Option<String>,
1864    /// Destination graph.
1865    pub destination: Option<String>,
1866    /// Whether to silently ignore errors.
1867    pub silent: bool,
1868}
1869
1870/// Move triples from one graph to another.
1871#[derive(Debug, Clone)]
1872pub struct MoveGraphOp {
1873    /// Source graph.
1874    pub source: Option<String>,
1875    /// Destination graph.
1876    pub destination: Option<String>,
1877    /// Whether to silently ignore errors.
1878    pub silent: bool,
1879}
1880
1881/// Add (merge) triples from one graph to another.
1882#[derive(Debug, Clone)]
1883pub struct AddGraphOp {
1884    /// Source graph.
1885    pub source: Option<String>,
1886    /// Destination graph.
1887    pub destination: Option<String>,
1888    /// Whether to silently ignore errors.
1889    pub silent: bool,
1890}
1891
1892// ==================== Vector Search Operators ====================
1893
1894/// Vector similarity scan operation.
1895///
1896/// Performs approximate nearest neighbor search using a vector index (HNSW)
1897/// or brute-force search for small datasets. Returns nodes/edges whose
1898/// embeddings are similar to the query vector.
1899///
1900/// # Example GQL
1901///
1902/// ```gql
1903/// MATCH (m:Movie)
1904/// WHERE vector_similarity(m.embedding, $query_vector) > 0.8
1905/// RETURN m.title
1906/// ```
1907#[derive(Debug, Clone)]
1908pub struct VectorScanOp {
1909    /// Variable name to bind matching entities to.
1910    pub variable: String,
1911    /// Name of the vector index to use (None = brute-force).
1912    pub index_name: Option<String>,
1913    /// Property containing the vector embedding.
1914    pub property: String,
1915    /// Optional label filter (scan only nodes with this label).
1916    pub label: Option<String>,
1917    /// The query vector expression.
1918    pub query_vector: LogicalExpression,
1919    /// Number of nearest neighbors to return (None = threshold mode only).
1920    pub k: Option<usize>,
1921    /// Distance metric (None = use index default, typically cosine).
1922    pub metric: Option<VectorMetric>,
1923    /// Minimum similarity threshold (filters results below this).
1924    pub min_similarity: Option<f32>,
1925    /// Maximum distance threshold (filters results above this).
1926    pub max_distance: Option<f32>,
1927    /// Input operator (for hybrid queries combining graph + vector).
1928    pub input: Option<Box<LogicalOperator>>,
1929}
1930
1931/// Vector distance/similarity metric for vector scan operations.
1932#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1933#[non_exhaustive]
1934pub enum VectorMetric {
1935    /// Cosine similarity (1 - cosine_distance). Best for normalized embeddings.
1936    Cosine,
1937    /// Euclidean (L2) distance. Best when magnitude matters.
1938    Euclidean,
1939    /// Dot product. Best for maximum inner product search.
1940    DotProduct,
1941    /// Manhattan (L1) distance. Less sensitive to outliers.
1942    Manhattan,
1943}
1944
1945/// Join graph patterns with vector similarity search.
1946///
1947/// This operator takes entities from the left input and computes vector
1948/// similarity against a query vector, outputting (entity, distance) pairs.
1949///
1950/// # Use Cases
1951///
1952/// 1. **Hybrid graph + vector queries**: Find similar nodes after graph traversal
1953/// 2. **Aggregated embeddings**: Use AVG(embeddings) as query vector
1954/// 3. **Filtering by similarity**: Join with threshold-based filtering
1955///
1956/// # Example
1957///
1958/// ```gql
1959/// // Find movies similar to what the user liked
1960/// MATCH (u:User {id: $user_id})-[:LIKED]->(liked:Movie)
1961/// WITH avg(liked.embedding) AS user_taste
1962/// VECTOR JOIN (m:Movie) ON m.embedding
1963/// WHERE vector_similarity(m.embedding, user_taste) > 0.7
1964/// RETURN m.title
1965/// ```
1966#[derive(Debug, Clone)]
1967pub struct VectorJoinOp {
1968    /// Input operator providing entities to match against.
1969    pub input: Box<LogicalOperator>,
1970    /// Variable from input to extract vectors from (for entity-to-entity similarity).
1971    /// If None, uses `query_vector` directly.
1972    pub left_vector_variable: Option<String>,
1973    /// Property containing the left vector (used with `left_vector_variable`).
1974    pub left_property: Option<String>,
1975    /// The query vector expression (constant or computed).
1976    pub query_vector: LogicalExpression,
1977    /// Variable name to bind the right-side matching entities.
1978    pub right_variable: String,
1979    /// Property containing the right-side vector embeddings.
1980    pub right_property: String,
1981    /// Optional label filter for right-side entities.
1982    pub right_label: Option<String>,
1983    /// Name of vector index on right side (None = brute-force).
1984    pub index_name: Option<String>,
1985    /// Number of nearest neighbors per left-side entity.
1986    pub k: usize,
1987    /// Distance metric.
1988    pub metric: Option<VectorMetric>,
1989    /// Minimum similarity threshold.
1990    pub min_similarity: Option<f32>,
1991    /// Maximum distance threshold.
1992    pub max_distance: Option<f32>,
1993    /// Variable to bind the distance/similarity score.
1994    pub score_variable: Option<String>,
1995}
1996
1997/// Text search scan using BM25 inverted index.
1998#[derive(Debug, Clone)]
1999pub struct TextScanOp {
2000    /// Variable to bind matched nodes.
2001    pub variable: String,
2002    /// Label of nodes to search.
2003    pub label: String,
2004    /// Property holding the text to search.
2005    pub property: String,
2006    /// The search query expression (must resolve to a string).
2007    pub query: LogicalExpression,
2008    /// Top-k limit (None = threshold mode or default 100).
2009    pub k: Option<usize>,
2010    /// Minimum score threshold (None = top-k mode).
2011    pub threshold: Option<f64>,
2012    /// Optional column name to bind the BM25 score.
2013    pub score_column: Option<String>,
2014}
2015
2016/// Return results (terminal operator).
2017#[derive(Debug, Clone)]
2018pub struct ReturnOp {
2019    /// Items to return.
2020    pub items: Vec<ReturnItem>,
2021    /// Whether to return distinct results.
2022    pub distinct: bool,
2023    /// Input operator.
2024    pub input: Box<LogicalOperator>,
2025}
2026
2027/// A single return item.
2028#[derive(Debug, Clone)]
2029pub struct ReturnItem {
2030    /// Expression to return.
2031    pub expression: LogicalExpression,
2032    /// Alias for the result column.
2033    pub alias: Option<String>,
2034}
2035
2036/// Define a property graph schema (SQL/PGQ DDL).
2037#[derive(Debug, Clone)]
2038pub struct CreatePropertyGraphOp {
2039    /// Graph name.
2040    pub name: String,
2041    /// Node table schemas (label name + column definitions).
2042    pub node_tables: Vec<PropertyGraphNodeTable>,
2043    /// Edge table schemas (type name + column definitions + references).
2044    pub edge_tables: Vec<PropertyGraphEdgeTable>,
2045}
2046
2047/// A node table in a property graph definition.
2048#[derive(Debug, Clone)]
2049pub struct PropertyGraphNodeTable {
2050    /// Table name (maps to a node label).
2051    pub name: String,
2052    /// Column definitions as (name, type_name) pairs.
2053    pub columns: Vec<(String, String)>,
2054}
2055
2056/// An edge table in a property graph definition.
2057#[derive(Debug, Clone)]
2058pub struct PropertyGraphEdgeTable {
2059    /// Table name (maps to an edge type).
2060    pub name: String,
2061    /// Column definitions as (name, type_name) pairs.
2062    pub columns: Vec<(String, String)>,
2063    /// Source node table name.
2064    pub source_table: String,
2065    /// Target node table name.
2066    pub target_table: String,
2067}
2068
2069// ==================== Procedure Call Types ====================
2070
2071/// A CALL procedure operation.
2072///
2073/// ```text
2074/// CALL grafeo.pagerank({damping: 0.85}) YIELD nodeId, score
2075/// ```
2076#[derive(Debug, Clone)]
2077pub struct CallProcedureOp {
2078    /// Dotted procedure name, e.g. `["grafeo", "pagerank"]`.
2079    pub name: Vec<String>,
2080    /// Argument expressions (constants in Phase 1).
2081    pub arguments: Vec<LogicalExpression>,
2082    /// Optional YIELD clause: which columns to expose + aliases.
2083    pub yield_items: Option<Vec<ProcedureYield>>,
2084}
2085
2086/// A single YIELD item in a procedure call.
2087#[derive(Debug, Clone)]
2088pub struct ProcedureYield {
2089    /// Column name from the procedure result.
2090    pub field_name: String,
2091    /// Optional alias (YIELD score AS rank).
2092    pub alias: Option<String>,
2093}
2094
2095/// Re-export format enum from the physical operator.
2096pub use grafeo_core::execution::operators::LoadDataFormat;
2097
2098/// LOAD DATA operator: reads a file and produces rows.
2099///
2100/// With headers (CSV), each row is bound as a `Value::Map` with column names as keys.
2101/// Without headers (CSV), each row is bound as a `Value::List` of string values.
2102/// JSONL always produces `Value::Map`. Parquet always produces `Value::Map`.
2103#[derive(Debug, Clone)]
2104pub struct LoadDataOp {
2105    /// File format.
2106    pub format: LoadDataFormat,
2107    /// Whether the file has a header row (CSV only, ignored for JSONL/Parquet).
2108    pub with_headers: bool,
2109    /// File path (local filesystem).
2110    pub path: String,
2111    /// Variable name to bind each row to.
2112    pub variable: String,
2113    /// Field separator character (CSV only, default: comma).
2114    pub field_terminator: Option<char>,
2115}
2116
2117/// A logical expression.
2118#[derive(Debug, Clone)]
2119#[non_exhaustive]
2120pub enum LogicalExpression {
2121    /// A literal value.
2122    Literal(Value),
2123
2124    /// A variable reference.
2125    Variable(String),
2126
2127    /// Property access (e.g., n.name).
2128    Property {
2129        /// The variable to access.
2130        variable: String,
2131        /// The property name.
2132        property: String,
2133    },
2134
2135    /// Binary operation.
2136    Binary {
2137        /// Left operand.
2138        left: Box<LogicalExpression>,
2139        /// Operator.
2140        op: BinaryOp,
2141        /// Right operand.
2142        right: Box<LogicalExpression>,
2143    },
2144
2145    /// Unary operation.
2146    Unary {
2147        /// Operator.
2148        op: UnaryOp,
2149        /// Operand.
2150        operand: Box<LogicalExpression>,
2151    },
2152
2153    /// Function call.
2154    FunctionCall {
2155        /// Function name.
2156        name: String,
2157        /// Arguments.
2158        args: Vec<LogicalExpression>,
2159        /// Whether DISTINCT is applied (e.g., COUNT(DISTINCT x)).
2160        distinct: bool,
2161    },
2162
2163    /// List literal.
2164    List(Vec<LogicalExpression>),
2165
2166    /// Map literal (e.g., {name: 'Alix', age: 30}).
2167    Map(Vec<(String, LogicalExpression)>),
2168
2169    /// Index access (e.g., `list[0]`).
2170    IndexAccess {
2171        /// The base expression (typically a list or string).
2172        base: Box<LogicalExpression>,
2173        /// The index expression.
2174        index: Box<LogicalExpression>,
2175    },
2176
2177    /// Slice access (e.g., list[1..3]).
2178    SliceAccess {
2179        /// The base expression (typically a list or string).
2180        base: Box<LogicalExpression>,
2181        /// Start index (None means from beginning).
2182        start: Option<Box<LogicalExpression>>,
2183        /// End index (None means to end).
2184        end: Option<Box<LogicalExpression>>,
2185    },
2186
2187    /// CASE expression.
2188    Case {
2189        /// Test expression (for simple CASE).
2190        operand: Option<Box<LogicalExpression>>,
2191        /// WHEN clauses.
2192        when_clauses: Vec<(LogicalExpression, LogicalExpression)>,
2193        /// ELSE clause.
2194        else_clause: Option<Box<LogicalExpression>>,
2195    },
2196
2197    /// Parameter reference.
2198    Parameter(String),
2199
2200    /// Labels of a node.
2201    Labels(String),
2202
2203    /// Type of an edge.
2204    Type(String),
2205
2206    /// ID of a node or edge.
2207    Id(String),
2208
2209    /// List comprehension: [x IN list WHERE predicate | expression]
2210    ListComprehension {
2211        /// Variable name for each element.
2212        variable: String,
2213        /// The source list expression.
2214        list_expr: Box<LogicalExpression>,
2215        /// Optional filter predicate.
2216        filter_expr: Option<Box<LogicalExpression>>,
2217        /// The mapping expression for each element.
2218        map_expr: Box<LogicalExpression>,
2219    },
2220
2221    /// List predicate: all/any/none/single(x IN list WHERE pred).
2222    ListPredicate {
2223        /// The kind of list predicate.
2224        kind: ListPredicateKind,
2225        /// The iteration variable name.
2226        variable: String,
2227        /// The source list expression.
2228        list_expr: Box<LogicalExpression>,
2229        /// The predicate to test for each element.
2230        predicate: Box<LogicalExpression>,
2231    },
2232
2233    /// EXISTS subquery.
2234    ExistsSubquery(Box<LogicalOperator>),
2235
2236    /// COUNT subquery.
2237    CountSubquery(Box<LogicalOperator>),
2238
2239    /// VALUE subquery: returns scalar value from first row of inner query.
2240    ValueSubquery(Box<LogicalOperator>),
2241
2242    /// Map projection: `node { .prop1, .prop2, key: expr, .* }`.
2243    MapProjection {
2244        /// The base variable name.
2245        base: String,
2246        /// Projection entries (property selectors, literal entries, all-properties).
2247        entries: Vec<MapProjectionEntry>,
2248    },
2249
2250    /// reduce() accumulator: `reduce(acc = init, x IN list | expr)`.
2251    Reduce {
2252        /// Accumulator variable name.
2253        accumulator: String,
2254        /// Initial value for the accumulator.
2255        initial: Box<LogicalExpression>,
2256        /// Iteration variable name.
2257        variable: String,
2258        /// List to iterate over.
2259        list: Box<LogicalExpression>,
2260        /// Body expression evaluated per iteration (references both accumulator and variable).
2261        expression: Box<LogicalExpression>,
2262    },
2263
2264    /// Pattern comprehension: `[(pattern) WHERE pred | expr]`.
2265    ///
2266    /// Executes the inner subplan, evaluates the projection for each row,
2267    /// and collects the results into a list.
2268    PatternComprehension {
2269        /// The subplan produced by translating the pattern (+optional WHERE).
2270        subplan: Box<LogicalOperator>,
2271        /// The projection expression evaluated for each match.
2272        projection: Box<LogicalExpression>,
2273    },
2274}
2275
2276/// An entry in a map projection.
2277#[derive(Debug, Clone)]
2278#[non_exhaustive]
2279pub enum MapProjectionEntry {
2280    /// `.propertyName`: shorthand for `propertyName: base.propertyName`.
2281    PropertySelector(String),
2282    /// `key: expression`: explicit key-value pair.
2283    LiteralEntry(String, LogicalExpression),
2284    /// `.*`: include all properties of the base entity.
2285    AllProperties,
2286}
2287
2288/// The kind of list predicate function.
2289#[derive(Debug, Clone, PartialEq, Eq)]
2290#[non_exhaustive]
2291pub enum ListPredicateKind {
2292    /// all(x IN list WHERE pred): true if pred holds for every element.
2293    All,
2294    /// any(x IN list WHERE pred): true if pred holds for at least one element.
2295    Any,
2296    /// none(x IN list WHERE pred): true if pred holds for no element.
2297    None,
2298    /// single(x IN list WHERE pred): true if pred holds for exactly one element.
2299    Single,
2300}
2301
2302/// Binary operator.
2303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2304#[non_exhaustive]
2305pub enum BinaryOp {
2306    /// Equality comparison (=).
2307    Eq,
2308    /// Inequality comparison (<>).
2309    Ne,
2310    /// Less than (<).
2311    Lt,
2312    /// Less than or equal (<=).
2313    Le,
2314    /// Greater than (>).
2315    Gt,
2316    /// Greater than or equal (>=).
2317    Ge,
2318
2319    /// Logical AND.
2320    And,
2321    /// Logical OR.
2322    Or,
2323    /// Logical XOR.
2324    Xor,
2325
2326    /// Addition (+).
2327    Add,
2328    /// Subtraction (-).
2329    Sub,
2330    /// Multiplication (*).
2331    Mul,
2332    /// Division (/).
2333    Div,
2334    /// Modulo (%).
2335    Mod,
2336
2337    /// String concatenation.
2338    Concat,
2339    /// String starts with.
2340    StartsWith,
2341    /// String ends with.
2342    EndsWith,
2343    /// String contains.
2344    Contains,
2345
2346    /// Collection membership (IN).
2347    In,
2348    /// Pattern matching (LIKE).
2349    Like,
2350    /// Regex matching (=~).
2351    Regex,
2352    /// Power/exponentiation (^).
2353    Pow,
2354}
2355
2356/// Unary operator.
2357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2358#[non_exhaustive]
2359pub enum UnaryOp {
2360    /// Logical NOT.
2361    Not,
2362    /// Numeric negation.
2363    Neg,
2364    /// IS NULL check.
2365    IsNull,
2366    /// IS NOT NULL check.
2367    IsNotNull,
2368}
2369
2370#[cfg(test)]
2371mod tests {
2372    use super::*;
2373
2374    #[test]
2375    fn test_simple_node_scan_plan() {
2376        let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2377            items: vec![ReturnItem {
2378                expression: LogicalExpression::Variable("n".into()),
2379                alias: None,
2380            }],
2381            distinct: false,
2382            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2383                variable: "n".into(),
2384                label: Some("Person".into()),
2385                input: None,
2386            })),
2387        }));
2388
2389        // Verify structure
2390        if let LogicalOperator::Return(ret) = &plan.root {
2391            assert_eq!(ret.items.len(), 1);
2392            assert!(!ret.distinct);
2393            if let LogicalOperator::NodeScan(scan) = ret.input.as_ref() {
2394                assert_eq!(scan.variable, "n");
2395                assert_eq!(scan.label, Some("Person".into()));
2396            } else {
2397                panic!("Expected NodeScan");
2398            }
2399        } else {
2400            panic!("Expected Return");
2401        }
2402    }
2403
2404    #[test]
2405    fn test_filter_plan() {
2406        let plan = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2407            items: vec![ReturnItem {
2408                expression: LogicalExpression::Property {
2409                    variable: "n".into(),
2410                    property: "name".into(),
2411                },
2412                alias: Some("name".into()),
2413            }],
2414            distinct: false,
2415            input: Box::new(LogicalOperator::Filter(FilterOp {
2416                predicate: LogicalExpression::Binary {
2417                    left: Box::new(LogicalExpression::Property {
2418                        variable: "n".into(),
2419                        property: "age".into(),
2420                    }),
2421                    op: BinaryOp::Gt,
2422                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2423                },
2424                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2425                    variable: "n".into(),
2426                    label: Some("Person".into()),
2427                    input: None,
2428                })),
2429                pushdown_hint: None,
2430            })),
2431        }));
2432
2433        if let LogicalOperator::Return(ret) = &plan.root {
2434            if let LogicalOperator::Filter(filter) = ret.input.as_ref() {
2435                if let LogicalExpression::Binary { op, .. } = &filter.predicate {
2436                    assert_eq!(*op, BinaryOp::Gt);
2437                } else {
2438                    panic!("Expected Binary expression");
2439                }
2440            } else {
2441                panic!("Expected Filter");
2442            }
2443        } else {
2444            panic!("Expected Return");
2445        }
2446    }
2447
2448    // ========================================================================
2449    // has_mutations(): the index-scan operators carry an `input` subtree
2450    // (used to combine graph patterns with vector/text scoring) and must
2451    // recurse into it so a mutation buried under one is not misclassified
2452    // as read-only.
2453    // ========================================================================
2454
2455    fn read_only_scan() -> LogicalOperator {
2456        LogicalOperator::NodeScan(NodeScanOp {
2457            variable: "n".into(),
2458            label: Some("Article".into()),
2459            input: None,
2460        })
2461    }
2462
2463    fn mutating_create_node() -> LogicalOperator {
2464        LogicalOperator::CreateNode(CreateNodeOp {
2465            variable: "n".into(),
2466            labels: vec!["Article".into()],
2467            properties: vec![],
2468            input: None,
2469        })
2470    }
2471
2472    #[test]
2473    fn test_text_scan_is_leaf_no_mutations() {
2474        let op = LogicalOperator::TextScan(TextScanOp {
2475            variable: "doc".into(),
2476            label: "Article".into(),
2477            property: "body".into(),
2478            query: LogicalExpression::Literal(Value::String("rust".into())),
2479            k: Some(10),
2480            threshold: None,
2481            score_column: None,
2482        });
2483        assert!(!op.has_mutations(), "TextScan is a leaf and never mutates");
2484    }
2485
2486    #[test]
2487    fn test_vector_scan_no_input_no_mutations() {
2488        let op = LogicalOperator::VectorScan(VectorScanOp {
2489            variable: "doc".into(),
2490            index_name: None,
2491            property: "embedding".into(),
2492            label: Some("Article".into()),
2493            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2494            k: Some(10),
2495            metric: None,
2496            min_similarity: None,
2497            max_distance: None,
2498            input: None,
2499        });
2500        assert!(!op.has_mutations(), "VectorScan with no input is read-only");
2501    }
2502
2503    #[test]
2504    fn test_vector_scan_recurses_into_mutating_input() {
2505        let op = LogicalOperator::VectorScan(VectorScanOp {
2506            variable: "doc".into(),
2507            index_name: None,
2508            property: "embedding".into(),
2509            label: Some("Article".into()),
2510            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2511            k: Some(10),
2512            metric: None,
2513            min_similarity: None,
2514            max_distance: None,
2515            input: Some(Box::new(mutating_create_node())),
2516        });
2517        assert!(
2518            op.has_mutations(),
2519            "VectorScan must propagate mutations from its input subtree"
2520        );
2521    }
2522
2523    #[test]
2524    fn test_vector_scan_recurses_into_read_only_input() {
2525        let op = LogicalOperator::VectorScan(VectorScanOp {
2526            variable: "doc".into(),
2527            index_name: None,
2528            property: "embedding".into(),
2529            label: Some("Article".into()),
2530            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2531            k: Some(10),
2532            metric: None,
2533            min_similarity: None,
2534            max_distance: None,
2535            input: Some(Box::new(read_only_scan())),
2536        });
2537        assert!(
2538            !op.has_mutations(),
2539            "VectorScan with read-only input is read-only"
2540        );
2541    }
2542
2543    #[test]
2544    fn test_vector_join_recurses_into_mutating_input() {
2545        let op = LogicalOperator::VectorJoin(VectorJoinOp {
2546            input: Box::new(mutating_create_node()),
2547            left_vector_variable: None,
2548            left_property: None,
2549            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2550            right_variable: "m".into(),
2551            right_property: "embedding".into(),
2552            right_label: Some("Movie".into()),
2553            index_name: None,
2554            k: 10,
2555            metric: Some(VectorMetric::Cosine),
2556            min_similarity: None,
2557            max_distance: None,
2558            score_variable: None,
2559        });
2560        assert!(
2561            op.has_mutations(),
2562            "VectorJoin must recurse into input, was previously hard-coded false"
2563        );
2564    }
2565
2566    #[test]
2567    fn test_vector_join_with_read_only_input_is_read_only() {
2568        let op = LogicalOperator::VectorJoin(VectorJoinOp {
2569            input: Box::new(read_only_scan()),
2570            left_vector_variable: None,
2571            left_property: None,
2572            query_vector: LogicalExpression::Literal(Value::Vector(vec![0.5_f32].into())),
2573            right_variable: "m".into(),
2574            right_property: "embedding".into(),
2575            right_label: Some("Movie".into()),
2576            index_name: None,
2577            k: 10,
2578            metric: Some(VectorMetric::Cosine),
2579            min_similarity: None,
2580            max_distance: None,
2581            score_variable: None,
2582        });
2583        assert!(!op.has_mutations());
2584    }
2585
2586    // ========================================================================
2587    // TextScan EXPLAIN/fmt_tree labeling: distinguishes top-k, threshold, and
2588    // the default-top-100 path (when both k and threshold are None).
2589    // ========================================================================
2590
2591    fn text_scan_with_modes(k: Option<usize>, threshold: Option<f64>) -> String {
2592        let plan = LogicalPlan::new(LogicalOperator::TextScan(TextScanOp {
2593            variable: "doc".into(),
2594            label: "Article".into(),
2595            property: "body".into(),
2596            query: LogicalExpression::Literal(Value::String("rust".into())),
2597            k,
2598            threshold,
2599            score_column: None,
2600        }));
2601        let mut out = String::new();
2602        plan.root.fmt_tree(&mut out, 0);
2603        out
2604    }
2605
2606    #[test]
2607    fn test_text_scan_display_top_k_mode() {
2608        let out = text_scan_with_modes(Some(10), None);
2609        assert!(out.contains("top-10"), "expected top-10 in:\n{out}");
2610        assert!(
2611            !out.contains("threshold"),
2612            "top-k mode should not say threshold:\n{out}"
2613        );
2614    }
2615
2616    #[test]
2617    fn test_text_scan_display_threshold_mode() {
2618        let out = text_scan_with_modes(None, Some(0.5));
2619        assert!(
2620            out.contains("threshold>=0.5"),
2621            "expected threshold>=0.5 in:\n{out}"
2622        );
2623        assert!(
2624            !out.contains("top-"),
2625            "threshold mode should not say top-:\n{out}"
2626        );
2627    }
2628
2629    #[test]
2630    fn test_text_scan_display_default_mode_when_both_none() {
2631        let out = text_scan_with_modes(None, None);
2632        assert!(
2633            out.contains("default-top-100"),
2634            "expected default-top-100 (both k and threshold None) in:\n{out}"
2635        );
2636    }
2637
2638    #[test]
2639    fn test_text_scan_display_k_takes_precedence_over_threshold() {
2640        // When both are set, k wins (top-k mode is what the planner actually executes).
2641        let out = text_scan_with_modes(Some(5), Some(0.3));
2642        assert!(out.contains("top-5"), "expected top-5 in:\n{out}");
2643        assert!(
2644            !out.contains("threshold"),
2645            "k should take precedence over threshold:\n{out}"
2646        );
2647    }
2648
2649    /// EXPLAIN tree for Project(Filter(Expand(NodeScan))) includes each
2650    /// operator name, uses 2-space indentation per depth, and calls
2651    /// `display_label` semantics (labels appear in the tree).
2652    #[test]
2653    fn test_explain_tree_basic_operators() {
2654        let plan = LogicalOperator::Project(ProjectOp {
2655            projections: vec![Projection {
2656                expression: LogicalExpression::Property {
2657                    variable: "b".into(),
2658                    property: "name".into(),
2659                },
2660                alias: Some("name".into()),
2661            }],
2662            input: Box::new(LogicalOperator::Filter(FilterOp {
2663                predicate: LogicalExpression::Binary {
2664                    left: Box::new(LogicalExpression::Property {
2665                        variable: "b".into(),
2666                        property: "age".into(),
2667                    }),
2668                    op: BinaryOp::Gt,
2669                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2670                },
2671                input: Box::new(LogicalOperator::Expand(ExpandOp {
2672                    from_variable: "a".into(),
2673                    to_variable: "b".into(),
2674                    edge_variable: None,
2675                    direction: ExpandDirection::Outgoing,
2676                    edge_types: vec!["KNOWS".into()],
2677                    min_hops: 1,
2678                    max_hops: Some(1),
2679                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2680                        variable: "a".into(),
2681                        label: Some("Person".into()),
2682                        input: None,
2683                    })),
2684                    path_alias: None,
2685                    path_mode: PathMode::Walk,
2686                })),
2687                pushdown_hint: Some(PushdownHint::LabelFirst),
2688            })),
2689            pass_through_input: false,
2690        });
2691
2692        let tree = plan.explain_tree();
2693
2694        // Each operator appears with the expected name
2695        assert!(tree.contains("Project"), "missing Project in:\n{tree}");
2696        assert!(tree.contains("Filter"), "missing Filter in:\n{tree}");
2697        assert!(tree.contains("Expand"), "missing Expand in:\n{tree}");
2698        assert!(tree.contains("NodeScan"), "missing NodeScan in:\n{tree}");
2699
2700        // Indentation: Project at depth 0, Filter at depth 1 (2 spaces),
2701        // Expand at depth 2 (4 spaces), NodeScan at depth 3 (6 spaces).
2702        assert!(tree.starts_with("Project"));
2703        assert!(
2704            tree.contains("\n  Filter"),
2705            "Filter should be indented by 2 spaces"
2706        );
2707        assert!(
2708            tree.contains("\n    Expand"),
2709            "Expand should be indented by 4 spaces"
2710        );
2711        assert!(
2712            tree.contains("\n      NodeScan"),
2713            "NodeScan should be indented by 6 spaces"
2714        );
2715
2716        // Labels from display_label-style rendering appear: Person label,
2717        // KNOWS edge type, label-first pushdown hint, projection alias.
2718        assert!(tree.contains("Person"));
2719        assert!(tree.contains("KNOWS"));
2720        assert!(tree.contains("[label-first]"));
2721        assert!(tree.contains("AS name"));
2722    }
2723
2724    /// `has_mutations` recurses through Project/Filter into their inputs.
2725    #[test]
2726    fn test_has_mutations_recursive() {
2727        // Project(Filter(CreateNode)) ⇒ true
2728        let with_mutation = LogicalOperator::Project(ProjectOp {
2729            projections: vec![],
2730            input: Box::new(LogicalOperator::Filter(FilterOp {
2731                predicate: LogicalExpression::Literal(Value::Bool(true)),
2732                input: Box::new(LogicalOperator::CreateNode(CreateNodeOp {
2733                    variable: "n".into(),
2734                    labels: vec!["Person".into()],
2735                    properties: vec![],
2736                    input: None,
2737                })),
2738                pushdown_hint: None,
2739            })),
2740            pass_through_input: false,
2741        });
2742        assert!(with_mutation.has_mutations());
2743
2744        // Project(Filter(NodeScan)) ⇒ false
2745        let read_only = LogicalOperator::Project(ProjectOp {
2746            projections: vec![],
2747            input: Box::new(LogicalOperator::Filter(FilterOp {
2748                predicate: LogicalExpression::Literal(Value::Bool(true)),
2749                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2750                    variable: "n".into(),
2751                    label: None,
2752                    input: None,
2753                })),
2754                pushdown_hint: None,
2755            })),
2756            pass_through_input: false,
2757        });
2758        assert!(!read_only.has_mutations());
2759    }
2760
2761    /// Union returns all its branches in order via `children()`, and
2762    /// Apply returns both input and subplan.
2763    #[test]
2764    fn test_children_collection_for_union_and_apply() {
2765        let leaf = |label: &str| {
2766            LogicalOperator::NodeScan(NodeScanOp {
2767                variable: "n".into(),
2768                label: Some(label.into()),
2769                input: None,
2770            })
2771        };
2772
2773        let union = LogicalOperator::Union(UnionOp {
2774            inputs: vec![leaf("Amsterdam"), leaf("Berlin"), leaf("Prague")],
2775        });
2776        let children = union.children();
2777        assert_eq!(children.len(), 3);
2778        match children[0] {
2779            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Amsterdam")),
2780            _ => panic!("Expected NodeScan"),
2781        }
2782        match children[2] {
2783            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Prague")),
2784            _ => panic!("Expected NodeScan"),
2785        }
2786
2787        let apply = LogicalOperator::Apply(ApplyOp {
2788            input: Box::new(leaf("Person")),
2789            subplan: Box::new(leaf("Company")),
2790            shared_variables: vec![],
2791            optional: false,
2792        });
2793        let apply_children = apply.children();
2794        assert_eq!(apply_children.len(), 2);
2795        match apply_children[0] {
2796            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Person")),
2797            _ => panic!("Expected input NodeScan"),
2798        }
2799        match apply_children[1] {
2800            LogicalOperator::NodeScan(s) => assert_eq!(s.label.as_deref(), Some("Company")),
2801            _ => panic!("Expected subplan NodeScan"),
2802        }
2803    }
2804
2805    /// Unresolved `CountExpr::Parameter` falls back to a default estimate of 10.0.
2806    #[test]
2807    fn test_count_expr_parameter_default() {
2808        let param = CountExpr::Parameter("limit".to_string());
2809        assert!((param.estimate() - 10.0).abs() < f64::EPSILON);
2810
2811        let literal = CountExpr::Literal(42);
2812        assert!((literal.estimate() - 42.0).abs() < f64::EPSILON);
2813        assert_eq!(literal.value(), 42);
2814        assert_eq!(literal.try_value(), Ok(42));
2815
2816        // try_value returns an error for unresolved parameters,
2817        // preserving the parameter name in the message.
2818        let err = param.try_value().unwrap_err();
2819        assert!(err.contains("$limit"), "error should mention $limit: {err}");
2820
2821        // Display/Equality sanity
2822        assert_eq!(format!("{literal}"), "42");
2823        assert_eq!(format!("{param}"), "$limit");
2824        assert!(literal == 42usize);
2825    }
2826
2827    // ==================== CountExpr ====================
2828
2829    #[test]
2830    fn count_expr_literal_value() {
2831        let count = CountExpr::Literal(42);
2832        assert_eq!(count.value(), 42);
2833        assert_eq!(count.try_value(), Ok(42));
2834        assert!((count.estimate() - 42.0).abs() < f64::EPSILON);
2835    }
2836
2837    #[test]
2838    fn count_expr_parameter_try_value_errors() {
2839        let count = CountExpr::Parameter("limit".into());
2840        let err = count.try_value().unwrap_err();
2841        assert!(err.contains("$limit"));
2842        // Estimate falls back to default for unresolved parameters.
2843        assert!((count.estimate() - 10.0).abs() < f64::EPSILON);
2844    }
2845
2846    #[test]
2847    #[should_panic(expected = "Unresolved parameter: $rows")]
2848    fn count_expr_parameter_value_panics() {
2849        let count = CountExpr::Parameter("rows".into());
2850        let _ = count.value();
2851    }
2852
2853    #[test]
2854    fn count_expr_display_and_conversions() {
2855        assert_eq!(format!("{}", CountExpr::Literal(7)), "7");
2856        assert_eq!(format!("{}", CountExpr::Parameter("n".into())), "$n");
2857        let from_usize: CountExpr = 3usize.into();
2858        assert_eq!(from_usize, CountExpr::Literal(3));
2859        assert_eq!(CountExpr::Literal(5), 5usize);
2860        assert!(CountExpr::Parameter("x".into()) != 5usize);
2861    }
2862
2863    // ==================== LogicalPlan constructors ====================
2864
2865    #[test]
2866    fn logical_plan_constructors() {
2867        let leaf = || LogicalOperator::Empty;
2868
2869        let normal = LogicalPlan::new(leaf());
2870        assert!(!normal.explain);
2871        assert!(!normal.profile);
2872        assert!(normal.default_params.is_empty());
2873
2874        let explained = LogicalPlan::explain(leaf());
2875        assert!(explained.explain);
2876        assert!(!explained.profile);
2877
2878        let profiled = LogicalPlan::profile(leaf());
2879        assert!(!profiled.explain);
2880        assert!(profiled.profile);
2881    }
2882
2883    // ==================== Helpers for tests ====================
2884
2885    fn var(name: &str) -> LogicalExpression {
2886        LogicalExpression::Variable(name.into())
2887    }
2888
2889    fn leaf_empty() -> Box<LogicalOperator> {
2890        Box::new(LogicalOperator::Empty)
2891    }
2892
2893    fn leaf_node_scan(v: &str) -> Box<LogicalOperator> {
2894        Box::new(LogicalOperator::NodeScan(NodeScanOp {
2895            variable: v.into(),
2896            label: None,
2897            input: None,
2898        }))
2899    }
2900
2901    fn leaf_create_node(v: &str) -> Box<LogicalOperator> {
2902        Box::new(LogicalOperator::CreateNode(CreateNodeOp {
2903            variable: v.into(),
2904            labels: vec!["Person".into()],
2905            properties: vec![],
2906            input: None,
2907        }))
2908    }
2909
2910    // ==================== has_mutations ====================
2911
2912    #[test]
2913    fn has_mutations_direct_operators_are_mutating() {
2914        // A representative direct mutation operator.
2915        let op = LogicalOperator::CreateNode(CreateNodeOp {
2916            variable: "vincent".into(),
2917            labels: vec!["Person".into()],
2918            properties: vec![],
2919            input: None,
2920        });
2921        assert!(op.has_mutations());
2922
2923        let delete = LogicalOperator::DeleteNode(DeleteNodeOp {
2924            variable: "vincent".into(),
2925            detach: true,
2926            input: leaf_node_scan("vincent"),
2927        });
2928        assert!(delete.has_mutations());
2929
2930        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
2931            variable: "mia".into(),
2932            properties: vec![("city".into(), LogicalExpression::Literal(Value::Null))],
2933            replace: false,
2934            is_edge: false,
2935            input: leaf_node_scan("mia"),
2936        });
2937        assert!(set_prop.has_mutations());
2938
2939        let insert_triple = LogicalOperator::InsertTriple(InsertTripleOp {
2940            subject: TripleComponent::Iri("s".into()),
2941            predicate: TripleComponent::Iri("p".into()),
2942            object: TripleComponent::Iri("o".into()),
2943            graph: None,
2944            input: None,
2945        });
2946        assert!(insert_triple.has_mutations());
2947
2948        let clear = LogicalOperator::ClearGraph(ClearGraphOp {
2949            graph: None,
2950            silent: false,
2951        });
2952        assert!(clear.has_mutations());
2953
2954        let ddl = LogicalOperator::CreatePropertyGraph(CreatePropertyGraphOp {
2955            name: "g".into(),
2956            node_tables: vec![],
2957            edge_tables: vec![],
2958        });
2959        assert!(ddl.has_mutations());
2960    }
2961
2962    #[test]
2963    fn has_mutations_propagates_through_single_input_operators() {
2964        let base = || {
2965            LogicalOperator::SetProperty(SetPropertyOp {
2966                variable: "butch".into(),
2967                properties: vec![],
2968                replace: false,
2969                is_edge: false,
2970                input: leaf_node_scan("butch"),
2971            })
2972        };
2973
2974        // Filter, Project, Limit, Skip, Sort, Distinct, Unwind, Bind, MapCollect,
2975        // Return, HorizontalAggregate all wrap the input.
2976        let filter = LogicalOperator::Filter(FilterOp {
2977            predicate: var("x"),
2978            input: Box::new(base()),
2979            pushdown_hint: None,
2980        });
2981        assert!(filter.has_mutations());
2982
2983        let project = LogicalOperator::Project(ProjectOp {
2984            projections: vec![],
2985            input: Box::new(base()),
2986            pass_through_input: false,
2987        });
2988        assert!(project.has_mutations());
2989
2990        let agg = LogicalOperator::Aggregate(AggregateOp {
2991            group_by: vec![],
2992            aggregates: vec![],
2993            input: Box::new(base()),
2994            having: None,
2995        });
2996        assert!(agg.has_mutations());
2997
2998        let limit = LogicalOperator::Limit(LimitOp {
2999            count: CountExpr::Literal(10),
3000            input: Box::new(base()),
3001        });
3002        assert!(limit.has_mutations());
3003
3004        let skip = LogicalOperator::Skip(SkipOp {
3005            count: CountExpr::Literal(5),
3006            input: Box::new(base()),
3007        });
3008        assert!(skip.has_mutations());
3009
3010        let sort = LogicalOperator::Sort(SortOp {
3011            keys: vec![],
3012            input: Box::new(base()),
3013        });
3014        assert!(sort.has_mutations());
3015
3016        let distinct = LogicalOperator::Distinct(DistinctOp {
3017            input: Box::new(base()),
3018            columns: None,
3019        });
3020        assert!(distinct.has_mutations());
3021
3022        let unwind = LogicalOperator::Unwind(UnwindOp {
3023            expression: var("xs"),
3024            variable: "x".into(),
3025            ordinality_var: None,
3026            offset_var: None,
3027            input: Box::new(base()),
3028        });
3029        assert!(unwind.has_mutations());
3030
3031        let bind = LogicalOperator::Bind(BindOp {
3032            expression: var("x"),
3033            variable: "y".into(),
3034            input: Box::new(base()),
3035        });
3036        assert!(bind.has_mutations());
3037
3038        let map_collect = LogicalOperator::MapCollect(MapCollectOp {
3039            key_var: "k".into(),
3040            value_var: "v".into(),
3041            alias: "m".into(),
3042            input: Box::new(base()),
3043        });
3044        assert!(map_collect.has_mutations());
3045
3046        let ret = LogicalOperator::Return(ReturnOp {
3047            items: vec![],
3048            distinct: false,
3049            input: Box::new(base()),
3050        });
3051        assert!(ret.has_mutations());
3052
3053        let hagg = LogicalOperator::HorizontalAggregate(HorizontalAggregateOp {
3054            list_column: "_path".into(),
3055            entity_kind: EntityKind::Edge,
3056            function: AggregateFunction::Sum,
3057            property: "weight".into(),
3058            alias: "total".into(),
3059            input: Box::new(base()),
3060        });
3061        assert!(hagg.has_mutations());
3062
3063        let construct = LogicalOperator::Construct(ConstructOp {
3064            templates: vec![],
3065            input: Box::new(base()),
3066        });
3067        assert!(construct.has_mutations());
3068    }
3069
3070    #[test]
3071    fn has_mutations_vector_operators_are_readonly() {
3072        let vscan = LogicalOperator::VectorScan(VectorScanOp {
3073            variable: "m".into(),
3074            index_name: None,
3075            property: "embedding".into(),
3076            label: None,
3077            query_vector: LogicalExpression::Literal(Value::Null),
3078            k: Some(5),
3079            metric: Some(VectorMetric::Cosine),
3080            min_similarity: None,
3081            max_distance: None,
3082            input: None,
3083        });
3084        assert!(!vscan.has_mutations());
3085
3086        let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
3087            input: leaf_node_scan("m"),
3088            left_vector_variable: None,
3089            left_property: None,
3090            query_vector: LogicalExpression::Literal(Value::Null),
3091            right_variable: "n".into(),
3092            right_property: "embedding".into(),
3093            right_label: None,
3094            index_name: None,
3095            k: 3,
3096            metric: None,
3097            min_similarity: None,
3098            max_distance: None,
3099            score_variable: None,
3100        });
3101        assert!(!vjoin.has_mutations());
3102    }
3103
3104    #[test]
3105    fn has_mutations_two_children_and_union_apply() {
3106        let mutating = || *leaf_create_node("jules");
3107        let read = || *leaf_node_scan("jules");
3108
3109        let join_readonly = LogicalOperator::Join(JoinOp {
3110            left: Box::new(read()),
3111            right: Box::new(read()),
3112            join_type: JoinType::Inner,
3113            conditions: vec![],
3114        });
3115        assert!(!join_readonly.has_mutations());
3116
3117        let join_right_mutates = LogicalOperator::Join(JoinOp {
3118            left: Box::new(read()),
3119            right: Box::new(mutating()),
3120            join_type: JoinType::Left,
3121            conditions: vec![],
3122        });
3123        assert!(join_right_mutates.has_mutations());
3124
3125        let left_join = LogicalOperator::LeftJoin(LeftJoinOp {
3126            left: Box::new(mutating()),
3127            right: Box::new(read()),
3128            condition: None,
3129        });
3130        assert!(left_join.has_mutations());
3131
3132        let anti_join = LogicalOperator::AntiJoin(AntiJoinOp {
3133            left: Box::new(read()),
3134            right: Box::new(mutating()),
3135        });
3136        assert!(anti_join.has_mutations());
3137
3138        let except = LogicalOperator::Except(ExceptOp {
3139            left: Box::new(read()),
3140            right: Box::new(read()),
3141            all: true,
3142        });
3143        assert!(!except.has_mutations());
3144
3145        let intersect = LogicalOperator::Intersect(IntersectOp {
3146            left: Box::new(mutating()),
3147            right: Box::new(read()),
3148            all: false,
3149        });
3150        assert!(intersect.has_mutations());
3151
3152        let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
3153            left: Box::new(read()),
3154            right: Box::new(mutating()),
3155        });
3156        assert!(otherwise.has_mutations());
3157
3158        let union = LogicalOperator::Union(UnionOp {
3159            inputs: vec![read(), mutating(), read()],
3160        });
3161        assert!(union.has_mutations());
3162
3163        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3164            inputs: vec![read(), read()],
3165            conditions: vec![],
3166            shared_variables: vec!["a".into()],
3167        });
3168        assert!(!mwj.has_mutations());
3169
3170        let apply_readonly = LogicalOperator::Apply(ApplyOp {
3171            input: Box::new(read()),
3172            subplan: Box::new(read()),
3173            shared_variables: vec![],
3174            optional: false,
3175        });
3176        assert!(!apply_readonly.has_mutations());
3177
3178        let apply_inner_mutates = LogicalOperator::Apply(ApplyOp {
3179            input: Box::new(read()),
3180            subplan: Box::new(mutating()),
3181            shared_variables: vec![],
3182            optional: true,
3183        });
3184        assert!(apply_inner_mutates.has_mutations());
3185    }
3186
3187    #[test]
3188    fn has_mutations_leaf_operators_are_readonly() {
3189        assert!(!LogicalOperator::Empty.has_mutations());
3190        assert!(
3191            !LogicalOperator::ParameterScan(ParameterScanOp {
3192                columns: vec!["a".into()],
3193            })
3194            .has_mutations()
3195        );
3196        assert!(
3197            !LogicalOperator::CallProcedure(CallProcedureOp {
3198                name: vec!["grafeo".into(), "pagerank".into()],
3199                arguments: vec![],
3200                yield_items: None,
3201            })
3202            .has_mutations()
3203        );
3204        assert!(
3205            !LogicalOperator::LoadData(LoadDataOp {
3206                format: LoadDataFormat::Csv,
3207                with_headers: true,
3208                path: "/tmp/x.csv".into(),
3209                variable: "row".into(),
3210                field_terminator: None,
3211            })
3212            .has_mutations()
3213        );
3214        assert!(
3215            !LogicalOperator::TripleScan(TripleScanOp {
3216                subject: TripleComponent::Variable("s".into()),
3217                predicate: TripleComponent::Variable("p".into()),
3218                object: TripleComponent::Variable("o".into()),
3219                graph: None,
3220                input: None,
3221                dataset: None,
3222            })
3223            .has_mutations()
3224        );
3225    }
3226
3227    // ==================== children() ====================
3228
3229    #[test]
3230    fn children_of_leaf_operators() {
3231        assert!(LogicalOperator::Empty.children().is_empty());
3232        assert!(
3233            LogicalOperator::CallProcedure(CallProcedureOp {
3234                name: vec!["p".into()],
3235                arguments: vec![],
3236                yield_items: None,
3237            })
3238            .children()
3239            .is_empty()
3240        );
3241        assert!(
3242            LogicalOperator::CreateGraph(CreateGraphOp {
3243                graph: "g".into(),
3244                silent: false,
3245            })
3246            .children()
3247            .is_empty()
3248        );
3249        assert!(
3250            LogicalOperator::LoadData(LoadDataOp {
3251                format: LoadDataFormat::Jsonl,
3252                with_headers: false,
3253                path: "x.jsonl".into(),
3254                variable: "r".into(),
3255                field_terminator: None,
3256            })
3257            .children()
3258            .is_empty()
3259        );
3260    }
3261
3262    #[test]
3263    fn children_of_optional_input_operators() {
3264        let ns_no_input = LogicalOperator::NodeScan(NodeScanOp {
3265            variable: "n".into(),
3266            label: None,
3267            input: None,
3268        });
3269        assert_eq!(ns_no_input.children().len(), 0);
3270
3271        let ns_with_input = LogicalOperator::NodeScan(NodeScanOp {
3272            variable: "n".into(),
3273            label: None,
3274            input: Some(leaf_empty()),
3275        });
3276        assert_eq!(ns_with_input.children().len(), 1);
3277
3278        let edge_scan_in = LogicalOperator::EdgeScan(EdgeScanOp {
3279            variable: "e".into(),
3280            edge_types: vec![],
3281            input: Some(leaf_empty()),
3282        });
3283        assert_eq!(edge_scan_in.children().len(), 1);
3284    }
3285
3286    #[test]
3287    fn children_of_two_child_operators() {
3288        let join = LogicalOperator::Join(JoinOp {
3289            left: leaf_empty(),
3290            right: leaf_empty(),
3291            join_type: JoinType::Cross,
3292            conditions: vec![],
3293        });
3294        assert_eq!(join.children().len(), 2);
3295
3296        let apply = LogicalOperator::Apply(ApplyOp {
3297            input: leaf_empty(),
3298            subplan: leaf_empty(),
3299            shared_variables: vec![],
3300            optional: false,
3301        });
3302        assert_eq!(apply.children().len(), 2);
3303
3304        let union = LogicalOperator::Union(UnionOp {
3305            inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
3306        });
3307        assert_eq!(union.children().len(), 3);
3308    }
3309
3310    #[test]
3311    fn children_of_modify_returns_where_clause() {
3312        let modify = LogicalOperator::Modify(ModifyOp {
3313            delete_templates: vec![],
3314            insert_templates: vec![],
3315            where_clause: leaf_empty(),
3316            graph: None,
3317        });
3318        assert_eq!(modify.children().len(), 1);
3319    }
3320
3321    // ==================== display_label ====================
3322
3323    #[test]
3324    fn display_label_spot_checks() {
3325        let ns = LogicalOperator::NodeScan(NodeScanOp {
3326            variable: "vincent".into(),
3327            label: Some("Person".into()),
3328            input: None,
3329        });
3330        assert_eq!(ns.display_label(), "vincent:Person");
3331
3332        let ns_no_label = LogicalOperator::NodeScan(NodeScanOp {
3333            variable: "mia".into(),
3334            label: None,
3335            input: None,
3336        });
3337        assert_eq!(ns_no_label.display_label(), "mia:*");
3338
3339        let edge_scan = LogicalOperator::EdgeScan(EdgeScanOp {
3340            variable: "e".into(),
3341            edge_types: vec!["KNOWS".into(), "LIKES".into()],
3342            input: None,
3343        });
3344        assert_eq!(edge_scan.display_label(), "e:KNOWS|LIKES");
3345
3346        let edge_scan_any = LogicalOperator::EdgeScan(EdgeScanOp {
3347            variable: "e".into(),
3348            edge_types: vec![],
3349            input: None,
3350        });
3351        assert_eq!(edge_scan_any.display_label(), "e:*");
3352
3353        let expand = LogicalOperator::Expand(ExpandOp {
3354            from_variable: "a".into(),
3355            to_variable: "b".into(),
3356            edge_variable: None,
3357            direction: ExpandDirection::Outgoing,
3358            edge_types: vec!["KNOWS".into()],
3359            min_hops: 1,
3360            max_hops: Some(1),
3361            input: leaf_node_scan("a"),
3362            path_alias: None,
3363            path_mode: PathMode::Walk,
3364        });
3365        assert_eq!(expand.display_label(), "(a)->[:KNOWS]->(b)");
3366
3367        let expand_in = LogicalOperator::Expand(ExpandOp {
3368            from_variable: "a".into(),
3369            to_variable: "b".into(),
3370            edge_variable: None,
3371            direction: ExpandDirection::Incoming,
3372            edge_types: vec![],
3373            min_hops: 1,
3374            max_hops: Some(1),
3375            input: leaf_node_scan("a"),
3376            path_alias: None,
3377            path_mode: PathMode::Walk,
3378        });
3379        assert_eq!(expand_in.display_label(), "(a)<-[:*]<-(b)");
3380
3381        let expand_both = LogicalOperator::Expand(ExpandOp {
3382            from_variable: "a".into(),
3383            to_variable: "b".into(),
3384            edge_variable: None,
3385            direction: ExpandDirection::Both,
3386            edge_types: vec![],
3387            min_hops: 1,
3388            max_hops: Some(1),
3389            input: leaf_node_scan("a"),
3390            path_alias: None,
3391            path_mode: PathMode::Walk,
3392        });
3393        assert_eq!(expand_both.display_label(), "(a)--[:*]--(b)");
3394    }
3395
3396    #[test]
3397    fn display_label_filter_pushdown_hints() {
3398        let make = |hint: Option<PushdownHint>| {
3399            LogicalOperator::Filter(FilterOp {
3400                predicate: var("x"),
3401                input: leaf_empty(),
3402                pushdown_hint: hint,
3403            })
3404        };
3405
3406        let f_none = make(None);
3407        let s = f_none.display_label();
3408        assert!(!s.contains('['));
3409
3410        let f_index = make(Some(PushdownHint::IndexLookup {
3411            property: "name".into(),
3412        }));
3413        assert!(f_index.display_label().contains("[index: name]"));
3414
3415        let f_range = make(Some(PushdownHint::RangeScan {
3416            property: "age".into(),
3417        }));
3418        assert!(f_range.display_label().contains("[range: age]"));
3419
3420        let f_label = make(Some(PushdownHint::LabelFirst));
3421        assert!(f_label.display_label().contains("[label-first]"));
3422    }
3423
3424    #[test]
3425    fn display_label_projection_join_sort_return() {
3426        let proj = LogicalOperator::Project(ProjectOp {
3427            projections: vec![
3428                Projection {
3429                    expression: var("n"),
3430                    alias: Some("person".into()),
3431                },
3432                Projection {
3433                    expression: LogicalExpression::Property {
3434                        variable: "n".into(),
3435                        property: "city".into(),
3436                    },
3437                    alias: None,
3438                },
3439            ],
3440            input: leaf_empty(),
3441            pass_through_input: false,
3442        });
3443        let s = proj.display_label();
3444        assert!(s.contains("person"));
3445        assert!(s.contains("n.city"));
3446
3447        let join = LogicalOperator::Join(JoinOp {
3448            left: leaf_empty(),
3449            right: leaf_empty(),
3450            join_type: JoinType::Cross,
3451            conditions: vec![],
3452        });
3453        assert_eq!(join.display_label(), "Cross");
3454
3455        let agg = LogicalOperator::Aggregate(AggregateOp {
3456            group_by: vec![var("city")],
3457            aggregates: vec![],
3458            input: leaf_empty(),
3459            having: None,
3460        });
3461        assert_eq!(agg.display_label(), "group: [city]");
3462
3463        let limit = LogicalOperator::Limit(LimitOp {
3464            count: CountExpr::Literal(10),
3465            input: leaf_empty(),
3466        });
3467        assert_eq!(limit.display_label(), "10");
3468
3469        let skip = LogicalOperator::Skip(SkipOp {
3470            count: CountExpr::Parameter("off".into()),
3471            input: leaf_empty(),
3472        });
3473        assert_eq!(skip.display_label(), "$off");
3474
3475        let sort = LogicalOperator::Sort(SortOp {
3476            keys: vec![
3477                SortKey {
3478                    expression: var("a"),
3479                    order: SortOrder::Ascending,
3480                    nulls: None,
3481                },
3482                SortKey {
3483                    expression: var("b"),
3484                    order: SortOrder::Descending,
3485                    nulls: None,
3486                },
3487            ],
3488            input: leaf_empty(),
3489        });
3490        let s = sort.display_label();
3491        assert!(s.contains("a ASC"));
3492        assert!(s.contains("b DESC"));
3493
3494        let distinct = LogicalOperator::Distinct(DistinctOp {
3495            input: leaf_empty(),
3496            columns: None,
3497        });
3498        assert_eq!(distinct.display_label(), "");
3499
3500        let ret = LogicalOperator::Return(ReturnOp {
3501            items: vec![
3502                ReturnItem {
3503                    expression: var("n"),
3504                    alias: Some("node".into()),
3505                },
3506                ReturnItem {
3507                    expression: var("m"),
3508                    alias: None,
3509                },
3510            ],
3511            distinct: true,
3512            input: leaf_empty(),
3513        });
3514        let s = ret.display_label();
3515        assert!(s.contains("node"));
3516        assert!(s.contains('m'));
3517    }
3518
3519    #[test]
3520    fn display_label_remaining_operators() {
3521        let union = LogicalOperator::Union(UnionOp {
3522            inputs: vec![*leaf_empty(), *leaf_empty()],
3523        });
3524        assert_eq!(union.display_label(), "2 branches");
3525
3526        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3527            inputs: vec![*leaf_empty(), *leaf_empty(), *leaf_empty()],
3528            conditions: vec![],
3529            shared_variables: vec![],
3530        });
3531        assert_eq!(mwj.display_label(), "3 inputs");
3532
3533        let lj = LogicalOperator::LeftJoin(LeftJoinOp {
3534            left: leaf_empty(),
3535            right: leaf_empty(),
3536            condition: None,
3537        });
3538        assert_eq!(lj.display_label(), "");
3539
3540        let aj = LogicalOperator::AntiJoin(AntiJoinOp {
3541            left: leaf_empty(),
3542            right: leaf_empty(),
3543        });
3544        assert_eq!(aj.display_label(), "");
3545
3546        let unwind = LogicalOperator::Unwind(UnwindOp {
3547            expression: var("xs"),
3548            variable: "item".into(),
3549            ordinality_var: None,
3550            offset_var: None,
3551            input: leaf_empty(),
3552        });
3553        assert_eq!(unwind.display_label(), "item");
3554
3555        let bind = LogicalOperator::Bind(BindOp {
3556            expression: var("x"),
3557            variable: "y".into(),
3558            input: leaf_empty(),
3559        });
3560        assert_eq!(bind.display_label(), "y");
3561
3562        let mapc = LogicalOperator::MapCollect(MapCollectOp {
3563            key_var: "k".into(),
3564            value_var: "v".into(),
3565            alias: "counts".into(),
3566            input: leaf_empty(),
3567        });
3568        assert_eq!(mapc.display_label(), "counts");
3569
3570        let sp = LogicalOperator::ShortestPath(ShortestPathOp {
3571            input: leaf_empty(),
3572            source_var: "a".into(),
3573            target_var: "b".into(),
3574            edge_types: vec![],
3575            direction: ExpandDirection::Outgoing,
3576            path_alias: "p".into(),
3577            all_paths: false,
3578        });
3579        assert_eq!(sp.display_label(), "a -> b");
3580
3581        let merge = LogicalOperator::Merge(MergeOp {
3582            variable: "django".into(),
3583            labels: vec![],
3584            match_properties: vec![],
3585            on_create: vec![],
3586            on_match: vec![],
3587            input: leaf_empty(),
3588        });
3589        assert_eq!(merge.display_label(), "django");
3590
3591        let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
3592            variable: "r".into(),
3593            source_variable: "a".into(),
3594            target_variable: "b".into(),
3595            edge_type: "KNOWS".into(),
3596            match_properties: vec![],
3597            on_create: vec![],
3598            on_match: vec![],
3599            input: leaf_empty(),
3600        });
3601        assert_eq!(merge_rel.display_label(), "r");
3602
3603        let cnode = LogicalOperator::CreateNode(CreateNodeOp {
3604            variable: "shosanna".into(),
3605            labels: vec!["Person".into(), "Hero".into()],
3606            properties: vec![],
3607            input: None,
3608        });
3609        assert_eq!(cnode.display_label(), "shosanna:Person:Hero");
3610
3611        let cedge_with = LogicalOperator::CreateEdge(CreateEdgeOp {
3612            variable: Some("r".into()),
3613            from_variable: "a".into(),
3614            to_variable: "b".into(),
3615            edge_type: "KNOWS".into(),
3616            properties: vec![],
3617            input: leaf_empty(),
3618        });
3619        assert_eq!(cedge_with.display_label(), "[r:KNOWS]");
3620
3621        let cedge_without = LogicalOperator::CreateEdge(CreateEdgeOp {
3622            variable: None,
3623            from_variable: "a".into(),
3624            to_variable: "b".into(),
3625            edge_type: "KNOWS".into(),
3626            properties: vec![],
3627            input: leaf_empty(),
3628        });
3629        assert_eq!(cedge_without.display_label(), "[?:KNOWS]");
3630
3631        let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
3632            variable: "hans".into(),
3633            detach: false,
3634            input: leaf_empty(),
3635        });
3636        assert_eq!(dnode.display_label(), "hans");
3637
3638        let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
3639            variable: "r".into(),
3640            input: leaf_empty(),
3641        });
3642        assert_eq!(dedge.display_label(), "r");
3643
3644        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
3645            variable: "beatrix".into(),
3646            properties: vec![],
3647            replace: false,
3648            is_edge: false,
3649            input: leaf_empty(),
3650        });
3651        assert_eq!(set_prop.display_label(), "beatrix");
3652
3653        let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
3654            variable: "n".into(),
3655            labels: vec!["A".into(), "B".into()],
3656            input: leaf_empty(),
3657        });
3658        assert_eq!(add_lbl.display_label(), "n:A:B");
3659
3660        let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
3661            variable: "n".into(),
3662            labels: vec!["A".into()],
3663            input: leaf_empty(),
3664        });
3665        assert_eq!(rm_lbl.display_label(), "n:A");
3666
3667        let call = LogicalOperator::CallProcedure(CallProcedureOp {
3668            name: vec!["grafeo".into(), "pagerank".into()],
3669            arguments: vec![],
3670            yield_items: None,
3671        });
3672        assert_eq!(call.display_label(), "grafeo.pagerank");
3673
3674        let load = LogicalOperator::LoadData(LoadDataOp {
3675            format: LoadDataFormat::Csv,
3676            with_headers: true,
3677            path: "data.csv".into(),
3678            variable: "r".into(),
3679            field_terminator: None,
3680        });
3681        assert_eq!(load.display_label(), "data.csv AS r");
3682
3683        let apply = LogicalOperator::Apply(ApplyOp {
3684            input: leaf_empty(),
3685            subplan: leaf_empty(),
3686            shared_variables: vec![],
3687            optional: false,
3688        });
3689        assert_eq!(apply.display_label(), "");
3690
3691        let vscan = LogicalOperator::VectorScan(VectorScanOp {
3692            variable: "m".into(),
3693            index_name: None,
3694            property: "embedding".into(),
3695            label: None,
3696            query_vector: LogicalExpression::Literal(Value::Null),
3697            k: Some(5),
3698            metric: None,
3699            min_similarity: None,
3700            max_distance: None,
3701            input: None,
3702        });
3703        assert_eq!(vscan.display_label(), "m");
3704
3705        let vjoin = LogicalOperator::VectorJoin(VectorJoinOp {
3706            input: leaf_empty(),
3707            left_vector_variable: None,
3708            left_property: None,
3709            query_vector: LogicalExpression::Literal(Value::Null),
3710            right_variable: "t".into(),
3711            right_property: "emb".into(),
3712            right_label: None,
3713            index_name: None,
3714            k: 3,
3715            metric: None,
3716            min_similarity: None,
3717            max_distance: None,
3718            score_variable: None,
3719        });
3720        assert_eq!(vjoin.display_label(), "t");
3721
3722        // Empty / catch-all branch.
3723        assert_eq!(LogicalOperator::Empty.display_label(), "");
3724    }
3725
3726    // ==================== explain_tree / fmt_tree ====================
3727
3728    #[test]
3729    fn explain_tree_covers_all_common_arms() {
3730        // Build a deeply nested tree that exercises many arms.
3731        let ns = LogicalOperator::NodeScan(NodeScanOp {
3732            variable: "n".into(),
3733            label: Some("Person".into()),
3734            input: Some(Box::new(LogicalOperator::Empty)),
3735        });
3736        let out = ns.explain_tree();
3737        assert!(out.contains("NodeScan (n:Person)"));
3738        assert!(out.contains("Empty"));
3739
3740        let ns_star = LogicalOperator::NodeScan(NodeScanOp {
3741            variable: "n".into(),
3742            label: None,
3743            input: None,
3744        });
3745        assert!(ns_star.explain_tree().contains("NodeScan (n:*)"));
3746
3747        let es = LogicalOperator::EdgeScan(EdgeScanOp {
3748            variable: "e".into(),
3749            edge_types: vec![],
3750            input: None,
3751        });
3752        assert!(es.explain_tree().contains("EdgeScan (e:*)"));
3753    }
3754
3755    #[test]
3756    fn explain_tree_expand_variants() {
3757        let mk = |min, max, dir| {
3758            LogicalOperator::Expand(ExpandOp {
3759                from_variable: "a".into(),
3760                to_variable: "b".into(),
3761                edge_variable: None,
3762                direction: dir,
3763                edge_types: vec!["KNOWS".into()],
3764                min_hops: min,
3765                max_hops: max,
3766                input: leaf_node_scan("a"),
3767                path_alias: None,
3768                path_mode: PathMode::Walk,
3769            })
3770            .explain_tree()
3771        };
3772
3773        let s = mk(1, Some(1), ExpandDirection::Outgoing);
3774        assert!(s.contains("(a)->[:KNOWS]->(b)"));
3775        let s = mk(2, Some(2), ExpandDirection::Incoming);
3776        assert!(s.contains("*2"));
3777        assert!(s.contains("<-"));
3778        let s = mk(1, Some(3), ExpandDirection::Both);
3779        assert!(s.contains("*1..3"));
3780        assert!(s.contains("--"));
3781        let s = mk(2, None, ExpandDirection::Outgoing);
3782        assert!(s.contains("*2.."));
3783    }
3784
3785    #[test]
3786    fn explain_tree_filter_with_all_hints() {
3787        let base = || {
3788            LogicalOperator::Filter(FilterOp {
3789                predicate: LogicalExpression::Binary {
3790                    left: Box::new(LogicalExpression::Property {
3791                        variable: "n".into(),
3792                        property: "age".into(),
3793                    }),
3794                    op: BinaryOp::Eq,
3795                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3796                },
3797                input: leaf_node_scan("n"),
3798                pushdown_hint: None,
3799            })
3800        };
3801        let mut f = base();
3802        if let LogicalOperator::Filter(ref mut op) = f {
3803            op.pushdown_hint = Some(PushdownHint::IndexLookup {
3804                property: "age".into(),
3805            });
3806        }
3807        assert!(f.explain_tree().contains("[index: age]"));
3808
3809        if let LogicalOperator::Filter(ref mut op) = f {
3810            op.pushdown_hint = Some(PushdownHint::RangeScan {
3811                property: "age".into(),
3812            });
3813        }
3814        assert!(f.explain_tree().contains("[range: age]"));
3815
3816        if let LogicalOperator::Filter(ref mut op) = f {
3817            op.pushdown_hint = Some(PushdownHint::LabelFirst);
3818        }
3819        assert!(f.explain_tree().contains("[label-first]"));
3820    }
3821
3822    #[test]
3823    fn explain_tree_projection_aggregate_sort_return() {
3824        let proj = LogicalOperator::Project(ProjectOp {
3825            projections: vec![
3826                Projection {
3827                    expression: var("n"),
3828                    alias: Some("who".into()),
3829                },
3830                Projection {
3831                    expression: var("m"),
3832                    alias: None,
3833                },
3834            ],
3835            input: leaf_empty(),
3836            pass_through_input: true,
3837        });
3838        let s = proj.explain_tree();
3839        assert!(s.contains("Project"));
3840        assert!(s.contains("n AS who"));
3841
3842        let agg = LogicalOperator::Aggregate(AggregateOp {
3843            group_by: vec![var("city")],
3844            aggregates: vec![
3845                AggregateExpr {
3846                    function: AggregateFunction::Count,
3847                    expression: None,
3848                    expression2: None,
3849                    distinct: false,
3850                    alias: Some("c".into()),
3851                    percentile: None,
3852                    separator: None,
3853                },
3854                AggregateExpr {
3855                    function: AggregateFunction::Sum,
3856                    expression: Some(var("x")),
3857                    expression2: None,
3858                    distinct: false,
3859                    alias: None,
3860                    percentile: None,
3861                    separator: None,
3862                },
3863            ],
3864            input: leaf_empty(),
3865            having: None,
3866        });
3867        let s = agg.explain_tree();
3868        assert!(s.contains("Aggregate"));
3869        assert!(s.contains("count(...) AS c"));
3870        assert!(s.contains("sum(...)"));
3871
3872        let sort = LogicalOperator::Sort(SortOp {
3873            keys: vec![SortKey {
3874                expression: var("age"),
3875                order: SortOrder::Descending,
3876                nulls: None,
3877            }],
3878            input: leaf_empty(),
3879        });
3880        assert!(sort.explain_tree().contains("age DESC"));
3881
3882        let ret_distinct = LogicalOperator::Return(ReturnOp {
3883            items: vec![ReturnItem {
3884                expression: var("n"),
3885                alias: Some("who".into()),
3886            }],
3887            distinct: true,
3888            input: leaf_empty(),
3889        });
3890        let s = ret_distinct.explain_tree();
3891        assert!(s.contains("Return DISTINCT"));
3892        assert!(s.contains("n AS who"));
3893
3894        let limit = LogicalOperator::Limit(LimitOp {
3895            count: CountExpr::Literal(5),
3896            input: leaf_empty(),
3897        });
3898        assert!(limit.explain_tree().contains("Limit (5)"));
3899
3900        let skip = LogicalOperator::Skip(SkipOp {
3901            count: CountExpr::Literal(2),
3902            input: leaf_empty(),
3903        });
3904        assert!(skip.explain_tree().contains("Skip (2)"));
3905
3906        let distinct = LogicalOperator::Distinct(DistinctOp {
3907            input: leaf_empty(),
3908            columns: None,
3909        });
3910        assert!(distinct.explain_tree().contains("Distinct"));
3911    }
3912
3913    #[test]
3914    fn explain_tree_joins_and_set_ops() {
3915        let join = LogicalOperator::Join(JoinOp {
3916            left: leaf_empty(),
3917            right: leaf_empty(),
3918            join_type: JoinType::Inner,
3919            conditions: vec![],
3920        });
3921        assert!(join.explain_tree().contains("Join (Inner)"));
3922
3923        let left_join_cond = LogicalOperator::LeftJoin(LeftJoinOp {
3924            left: leaf_empty(),
3925            right: leaf_empty(),
3926            condition: Some(var("x")),
3927        });
3928        assert!(
3929            left_join_cond
3930                .explain_tree()
3931                .contains("LeftJoin (condition:")
3932        );
3933
3934        let left_join_none = LogicalOperator::LeftJoin(LeftJoinOp {
3935            left: leaf_empty(),
3936            right: leaf_empty(),
3937            condition: None,
3938        });
3939        let s = left_join_none.explain_tree();
3940        assert!(s.contains("LeftJoin"));
3941        assert!(!s.contains("condition:"));
3942
3943        let anti = LogicalOperator::AntiJoin(AntiJoinOp {
3944            left: leaf_empty(),
3945            right: leaf_empty(),
3946        });
3947        assert!(anti.explain_tree().contains("AntiJoin"));
3948
3949        let union = LogicalOperator::Union(UnionOp {
3950            inputs: vec![*leaf_empty(), *leaf_empty()],
3951        });
3952        assert!(union.explain_tree().contains("Union (2 branches)"));
3953
3954        let mwj = LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3955            inputs: vec![*leaf_empty(), *leaf_empty()],
3956            conditions: vec![],
3957            shared_variables: vec!["a".into(), "b".into()],
3958        });
3959        let s = mwj.explain_tree();
3960        assert!(s.contains("MultiWayJoin"));
3961        assert!(s.contains("shared: [a, b]"));
3962
3963        let except_all = LogicalOperator::Except(ExceptOp {
3964            left: leaf_empty(),
3965            right: leaf_empty(),
3966            all: true,
3967        });
3968        assert!(except_all.explain_tree().contains("Except ALL"));
3969        let except = LogicalOperator::Except(ExceptOp {
3970            left: leaf_empty(),
3971            right: leaf_empty(),
3972            all: false,
3973        });
3974        assert!(except.explain_tree().contains("Except\n"));
3975
3976        let inter_all = LogicalOperator::Intersect(IntersectOp {
3977            left: leaf_empty(),
3978            right: leaf_empty(),
3979            all: true,
3980        });
3981        assert!(inter_all.explain_tree().contains("Intersect ALL"));
3982        let inter = LogicalOperator::Intersect(IntersectOp {
3983            left: leaf_empty(),
3984            right: leaf_empty(),
3985            all: false,
3986        });
3987        assert!(inter.explain_tree().contains("Intersect\n"));
3988
3989        let otherwise = LogicalOperator::Otherwise(OtherwiseOp {
3990            left: leaf_empty(),
3991            right: leaf_empty(),
3992        });
3993        assert!(otherwise.explain_tree().contains("Otherwise"));
3994    }
3995
3996    #[test]
3997    fn explain_tree_unwind_bind_mapcollect_apply_sp() {
3998        let unwind = LogicalOperator::Unwind(UnwindOp {
3999            expression: var("xs"),
4000            variable: "item".into(),
4001            ordinality_var: None,
4002            offset_var: None,
4003            input: leaf_empty(),
4004        });
4005        assert!(unwind.explain_tree().contains("Unwind (item)"));
4006
4007        let bind = LogicalOperator::Bind(BindOp {
4008            expression: var("x"),
4009            variable: "y".into(),
4010            input: leaf_empty(),
4011        });
4012        assert!(bind.explain_tree().contains("Bind (y)"));
4013
4014        let mapc = LogicalOperator::MapCollect(MapCollectOp {
4015            key_var: "k".into(),
4016            value_var: "v".into(),
4017            alias: "m".into(),
4018            input: leaf_empty(),
4019        });
4020        let s = mapc.explain_tree();
4021        assert!(s.contains("MapCollect"));
4022        assert!(s.contains("k -> v AS m"));
4023
4024        let apply = LogicalOperator::Apply(ApplyOp {
4025            input: leaf_empty(),
4026            subplan: leaf_empty(),
4027            shared_variables: vec!["a".into()],
4028            optional: true,
4029        });
4030        assert!(apply.explain_tree().contains("Apply"));
4031
4032        let sp = LogicalOperator::ShortestPath(ShortestPathOp {
4033            input: leaf_empty(),
4034            source_var: "a".into(),
4035            target_var: "b".into(),
4036            edge_types: vec![],
4037            direction: ExpandDirection::Outgoing,
4038            path_alias: "p".into(),
4039            all_paths: false,
4040        });
4041        assert!(sp.explain_tree().contains("ShortestPath (a -> b)"));
4042    }
4043
4044    #[test]
4045    fn explain_tree_mutations() {
4046        let merge = LogicalOperator::Merge(MergeOp {
4047            variable: "vincent".into(),
4048            labels: vec!["Person".into()],
4049            match_properties: vec![],
4050            on_create: vec![],
4051            on_match: vec![],
4052            input: leaf_empty(),
4053        });
4054        assert!(merge.explain_tree().contains("Merge (vincent)"));
4055
4056        let merge_rel = LogicalOperator::MergeRelationship(MergeRelationshipOp {
4057            variable: "r".into(),
4058            source_variable: "a".into(),
4059            target_variable: "b".into(),
4060            edge_type: "KNOWS".into(),
4061            match_properties: vec![],
4062            on_create: vec![],
4063            on_match: vec![],
4064            input: leaf_empty(),
4065        });
4066        assert!(merge_rel.explain_tree().contains("MergeRelationship (r)"));
4067
4068        let cnode = LogicalOperator::CreateNode(CreateNodeOp {
4069            variable: "mia".into(),
4070            labels: vec!["Person".into()],
4071            properties: vec![],
4072            input: Some(leaf_empty()),
4073        });
4074        let s = cnode.explain_tree();
4075        assert!(s.contains("CreateNode (mia:Person)"));
4076        assert!(s.contains("Empty"));
4077
4078        let cnode_no_input = LogicalOperator::CreateNode(CreateNodeOp {
4079            variable: "mia".into(),
4080            labels: vec![],
4081            properties: vec![],
4082            input: None,
4083        });
4084        assert!(cnode_no_input.explain_tree().contains("CreateNode (mia:)"));
4085
4086        let cedge = LogicalOperator::CreateEdge(CreateEdgeOp {
4087            variable: Some("r".into()),
4088            from_variable: "a".into(),
4089            to_variable: "b".into(),
4090            edge_type: "KNOWS".into(),
4091            properties: vec![],
4092            input: leaf_empty(),
4093        });
4094        assert!(
4095            cedge
4096                .explain_tree()
4097                .contains("CreateEdge (a)-[r:KNOWS]->(b)")
4098        );
4099
4100        let cedge_anon = LogicalOperator::CreateEdge(CreateEdgeOp {
4101            variable: None,
4102            from_variable: "a".into(),
4103            to_variable: "b".into(),
4104            edge_type: "KNOWS".into(),
4105            properties: vec![],
4106            input: leaf_empty(),
4107        });
4108        assert!(cedge_anon.explain_tree().contains("[?:KNOWS]"));
4109
4110        let dnode = LogicalOperator::DeleteNode(DeleteNodeOp {
4111            variable: "butch".into(),
4112            detach: true,
4113            input: leaf_empty(),
4114        });
4115        assert!(dnode.explain_tree().contains("DeleteNode (butch)"));
4116
4117        let dedge = LogicalOperator::DeleteEdge(DeleteEdgeOp {
4118            variable: "r".into(),
4119            input: leaf_empty(),
4120        });
4121        assert!(dedge.explain_tree().contains("DeleteEdge (r)"));
4122
4123        let set_prop = LogicalOperator::SetProperty(SetPropertyOp {
4124            variable: "n".into(),
4125            properties: vec![("name".into(), var("x")), ("age".into(), var("y"))],
4126            replace: false,
4127            is_edge: false,
4128            input: leaf_empty(),
4129        });
4130        let s = set_prop.explain_tree();
4131        assert!(s.contains("SetProperty"));
4132        assert!(s.contains("n.name"));
4133        assert!(s.contains("n.age"));
4134
4135        let add_lbl = LogicalOperator::AddLabel(AddLabelOp {
4136            variable: "n".into(),
4137            labels: vec!["A".into()],
4138            input: leaf_empty(),
4139        });
4140        assert!(add_lbl.explain_tree().contains("AddLabel (n:A)"));
4141
4142        let rm_lbl = LogicalOperator::RemoveLabel(RemoveLabelOp {
4143            variable: "n".into(),
4144            labels: vec!["A".into(), "B".into()],
4145            input: leaf_empty(),
4146        });
4147        assert!(rm_lbl.explain_tree().contains("RemoveLabel (n:A:B)"));
4148    }
4149
4150    #[test]
4151    fn explain_tree_call_and_load_data() {
4152        let call = LogicalOperator::CallProcedure(CallProcedureOp {
4153            name: vec!["grafeo".into(), "pagerank".into()],
4154            arguments: vec![],
4155            yield_items: None,
4156        });
4157        assert!(
4158            call.explain_tree()
4159                .contains("CallProcedure (grafeo.pagerank)")
4160        );
4161
4162        let csv = LogicalOperator::LoadData(LoadDataOp {
4163            format: LoadDataFormat::Csv,
4164            with_headers: true,
4165            path: "data.csv".into(),
4166            variable: "row".into(),
4167            field_terminator: None,
4168        });
4169        let s = csv.explain_tree();
4170        assert!(s.contains("LoadCsv"));
4171        assert!(s.contains("WITH HEADERS"));
4172        assert!(s.contains("data.csv"));
4173        assert!(s.contains("AS row"));
4174
4175        let csv_no_hdr = LogicalOperator::LoadData(LoadDataOp {
4176            format: LoadDataFormat::Csv,
4177            with_headers: false,
4178            path: "data.csv".into(),
4179            variable: "row".into(),
4180            field_terminator: None,
4181        });
4182        assert!(!csv_no_hdr.explain_tree().contains("WITH HEADERS"));
4183
4184        let jsonl = LogicalOperator::LoadData(LoadDataOp {
4185            format: LoadDataFormat::Jsonl,
4186            with_headers: false,
4187            path: "data.jsonl".into(),
4188            variable: "r".into(),
4189            field_terminator: None,
4190        });
4191        assert!(jsonl.explain_tree().contains("LoadJsonl"));
4192
4193        let parquet = LogicalOperator::LoadData(LoadDataOp {
4194            format: LoadDataFormat::Parquet,
4195            with_headers: false,
4196            path: "data.parquet".into(),
4197            variable: "r".into(),
4198            field_terminator: None,
4199        });
4200        assert!(parquet.explain_tree().contains("LoadParquet"));
4201    }
4202
4203    #[test]
4204    fn explain_tree_triple_scan_and_fallback() {
4205        let ts = LogicalOperator::TripleScan(TripleScanOp {
4206            subject: TripleComponent::Variable("s".into()),
4207            predicate: TripleComponent::Iri("http://ex/p".into()),
4208            object: TripleComponent::Literal(Value::Int64(5)),
4209            graph: None,
4210            input: Some(leaf_empty()),
4211            dataset: None,
4212        });
4213        let s = ts.explain_tree();
4214        assert!(s.contains("TripleScan"));
4215        assert!(s.contains("?s"));
4216        assert!(s.contains("<http://ex/p>"));
4217        assert!(s.contains("Empty"));
4218
4219        let ts_no_input = LogicalOperator::TripleScan(TripleScanOp {
4220            subject: TripleComponent::Variable("s".into()),
4221            predicate: TripleComponent::Variable("p".into()),
4222            object: TripleComponent::Variable("o".into()),
4223            graph: None,
4224            input: None,
4225            dataset: None,
4226        });
4227        assert!(ts_no_input.explain_tree().contains("TripleScan"));
4228
4229        // Fallback arm for operators without a specific formatter.
4230        let graph_op = LogicalOperator::CreateGraph(CreateGraphOp {
4231            graph: "g".into(),
4232            silent: false,
4233        });
4234        let out = graph_op.explain_tree();
4235        assert!(!out.is_empty());
4236    }
4237
4238    // ==================== fmt_expr helper ====================
4239
4240    #[test]
4241    fn fmt_expr_covers_common_variants() {
4242        let v = var("n");
4243        assert_eq!(fmt_expr(&v), "n");
4244
4245        let p = LogicalExpression::Property {
4246            variable: "n".into(),
4247            property: "age".into(),
4248        };
4249        assert_eq!(fmt_expr(&p), "n.age");
4250
4251        let lit = LogicalExpression::Literal(Value::Int64(42));
4252        assert_eq!(fmt_expr(&lit), "42");
4253
4254        let bin = LogicalExpression::Binary {
4255            left: Box::new(var("a")),
4256            op: BinaryOp::Eq,
4257            right: Box::new(LogicalExpression::Literal(Value::Int64(1))),
4258        };
4259        let s = fmt_expr(&bin);
4260        assert!(s.contains("Eq"));
4261        assert!(s.contains('a'));
4262
4263        let un = LogicalExpression::Unary {
4264            op: UnaryOp::Not,
4265            operand: Box::new(var("a")),
4266        };
4267        let s = fmt_expr(&un);
4268        assert!(s.contains("Not"));
4269
4270        let fc = LogicalExpression::FunctionCall {
4271            name: "toLower".into(),
4272            args: vec![var("name")],
4273            distinct: false,
4274        };
4275        assert_eq!(fmt_expr(&fc), "toLower(name)");
4276
4277        // Fallback arm: non-common variant hits the `_ => format!("{expr:?}")` path.
4278        let list = LogicalExpression::List(vec![var("a")]);
4279        let out = fmt_expr(&list);
4280        assert!(out.contains("List") || out.contains('['));
4281    }
4282
4283    // ==================== fmt_triple_component helper ====================
4284
4285    #[test]
4286    fn fmt_triple_component_variants() {
4287        assert_eq!(
4288            fmt_triple_component(&TripleComponent::Variable("s".into())),
4289            "?s"
4290        );
4291        assert_eq!(
4292            fmt_triple_component(&TripleComponent::Iri("http://ex/p".into())),
4293            "<http://ex/p>"
4294        );
4295        assert!(fmt_triple_component(&TripleComponent::Literal(Value::Int64(10))).contains("10"));
4296        assert_eq!(
4297            fmt_triple_component(&TripleComponent::LangLiteral {
4298                value: "hello".into(),
4299                lang: "en".into(),
4300            }),
4301            "\"hello\"@en"
4302        );
4303        assert_eq!(
4304            fmt_triple_component(&TripleComponent::BlankNode("b0".into())),
4305            "_:b0"
4306        );
4307    }
4308
4309    // ==================== TripleComponent::as_variable ====================
4310
4311    #[test]
4312    fn triple_component_as_variable() {
4313        assert_eq!(
4314            TripleComponent::Variable("s".into()).as_variable(),
4315            Some("s")
4316        );
4317        assert_eq!(
4318            TripleComponent::Iri("http://ex/p".into()).as_variable(),
4319            None
4320        );
4321        assert_eq!(
4322            TripleComponent::Literal(Value::Int64(1)).as_variable(),
4323            None
4324        );
4325        assert_eq!(TripleComponent::BlankNode("b".into()).as_variable(), None);
4326        assert_eq!(
4327            TripleComponent::LangLiteral {
4328                value: "v".into(),
4329                lang: "en".into(),
4330            }
4331            .as_variable(),
4332            None
4333        );
4334    }
4335}