Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, CreateEdgeSpec,
12    CreateNodeSpec, Direction, Expr, Literal, LogicalPlan, RemoveSpec, ReturnItem, SetAssignment,
13    SortItem, UnaryOp, YieldSpec,
14};
15use meshdb_storage::RocksDbStorageEngine;
16use std::cell::RefCell;
17use std::cmp::Ordering;
18use std::collections::{HashMap, HashSet};
19
20/// Shared tombstone set for nodes / edges that have been DELETEd
21/// earlier in the current query. Property / label / type / keys /
22/// id accesses consult this set and raise `DeletedEntityAccess`
23/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
24/// runtime instead of reading off a stale clone.
25#[derive(Default)]
26pub struct Tombstones {
27    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
28    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
29}
30
31pub struct ExecCtx<'a> {
32    pub store: &'a dyn GraphReader,
33    pub writer: &'a dyn GraphWriter,
34    /// Per-query parameter bindings. Empty for the single-node and Raft
35    /// entry points that don't carry a Bolt RUN; populated with the
36    /// driver-supplied params on the Bolt path. Threaded into every
37    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
38    pub params: &'a ParamMap,
39    /// Procedures known to this execution. Defaults to an empty
40    /// registry on call sites that don't care; the TCK harness and
41    /// any future server startup plug in a populated registry so
42    /// `CALL ns.name(...)` resolves.
43    pub procedures: &'a ProcedureRegistry,
44    /// Outer-scope rows contributed by enclosing operators. The
45    /// innermost slot (first entry) is the nearest outer. Set by
46    /// [`CartesianProductOp`] when running its right side so that
47    /// operators inside — typically `EdgeExpandOp` /
48    /// `VarLengthExpandOp` constraint lookups — can resolve
49    /// variables the right-side scan didn't bind directly. Empty
50    /// at the top level.
51    pub outer_rows: &'a [&'a Row],
52    /// Tombstones for entities deleted earlier in the same query.
53    /// Shared by reference so DeleteOp can insert and downstream
54    /// eval can check.
55    pub tombstones: &'a Tombstones,
56}
57
58pub(crate) struct NoOpWriter;
59impl GraphWriter for NoOpWriter {
60    fn put_node(&self, _: &Node) -> Result<()> {
61        Ok(())
62    }
63    fn put_edge(&self, _: &Edge) -> Result<()> {
64        Ok(())
65    }
66    fn delete_edge(&self, _: EdgeId) -> Result<()> {
67        Ok(())
68    }
69    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
70        Ok(())
71    }
72}
73
74impl<'a> ExecCtx<'a> {
75    /// Build an `EvalCtx` wrapping the given row alongside this
76    /// exec context's params and reader. Used by operators to
77    /// bridge the per-operator context into the expression
78    /// evaluator without repeating the field list at every
79    /// `eval_expr` call site.
80    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
81    where
82        'a: 'b,
83    {
84        EvalCtx {
85            row,
86            params: self.params,
87            reader: self.store,
88            procedures: self.procedures,
89            outer_rows: self.outer_rows,
90            tombstones: self.tombstones,
91        }
92    }
93
94    /// Look up a variable in `row` first, then in each outer-scope
95    /// row in order. Returns the nearest match. Used for constraint
96    /// lookups inside right-side operators of a `CartesianProduct`
97    /// so a fresh scan that references an outer binding
98    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
99    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
100        if let Some(v) = row.get(name) {
101            return Some(v);
102        }
103        for outer in self.outer_rows {
104            if let Some(v) = outer.get(name) {
105                return Some(v);
106            }
107        }
108        None
109    }
110}
111
112pub trait Operator {
113    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
114}
115
116/// Execute a plan using the given store for both reads and writes.
117/// Equivalent to [`execute_with_reader`] with the store acting as both
118/// and an empty parameter map.
119pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
120    let params = ParamMap::new();
121    execute_with_reader(
122        plan,
123        store as &dyn GraphReader,
124        store as &dyn GraphWriter,
125        &params,
126    )
127}
128
129/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
130/// sending mutations to a separate [`GraphWriter`]. Used by in-process
131/// setups where reads are always cheap-local (single node or
132/// full-replica Raft). No parameters.
133pub fn execute_with_writer(
134    plan: &LogicalPlan,
135    store: &RocksDbStorageEngine,
136    writer: &dyn GraphWriter,
137) -> Result<Vec<Row>> {
138    let params = ParamMap::new();
139    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
140}
141
142/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
143/// parameter map for `$param` resolution. Routing mode uses a
144/// partitioned reader that fans out reads across peers; the Bolt
145/// listener supplies the driver-bound param map.
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
147    let text = meshdb_cypher::format_plan(plan);
148    let mut row = Row::new();
149    row.insert("plan".to_string(), Value::Property(Property::String(text)));
150    vec![row]
151}
152
153pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
154    let result_rows = execute(plan, store)?;
155    let row_count = result_rows.len() as i64;
156    let plan_text = meshdb_cypher::format_plan(plan);
157    let summary = format!("{plan_text}\nRows: {row_count}");
158    let mut row = Row::new();
159    row.insert(
160        "profile".to_string(),
161        Value::Property(Property::String(summary)),
162    );
163    row.insert(
164        "rows".to_string(),
165        Value::Property(Property::Int64(row_count)),
166    );
167    Ok(vec![row])
168}
169
170pub fn execute_with_reader(
171    plan: &LogicalPlan,
172    reader: &dyn GraphReader,
173    writer: &dyn GraphWriter,
174    params: &ParamMap,
175) -> Result<Vec<Row>> {
176    let empty_procs = ProcedureRegistry::new();
177    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
178}
179
180/// Like [`execute_with_reader`] but with an explicit procedure
181/// registry in scope. Used by the TCK harness to mount mock
182/// procedures declared by `there exists a procedure ...` steps,
183/// and reserved for the server startup path once built-in
184/// procedures land.
185pub fn execute_with_reader_and_procs(
186    plan: &LogicalPlan,
187    reader: &dyn GraphReader,
188    writer: &dyn GraphWriter,
189    params: &ParamMap,
190    procedures: &ProcedureRegistry,
191) -> Result<Vec<Row>> {
192    // `datetime()`, `localtime()`, and friends cache "now" in a
193    // thread-local so multiple calls inside the same statement see
194    // the same instant. Clear it at the start of every execution so
195    // the next query gets a fresh reading.
196    crate::eval::reset_statement_time();
197    // Schema DDL never enters the operator pipeline — it's a
198    // side-effect on the backing store, not a row-producing query.
199    // Short-circuit here so `build_op` stays a pure plan-to-operator
200    // mapping and doesn't need DDL-specific cases.
201    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
202        return Ok(rows);
203    }
204    let suppress_output = is_write_only_plan(plan);
205    let mut op = build_op(plan);
206    let tombstones = Tombstones::default();
207    let ctx = ExecCtx {
208        store: reader,
209        writer,
210        params,
211        procedures,
212        outer_rows: &[],
213        tombstones: &tombstones,
214    };
215    let mut rows = Vec::new();
216    while let Some(row) = op.next(&ctx)? {
217        rows.push(row);
218    }
219    if suppress_output {
220        Ok(Vec::new())
221    } else {
222        Ok(rows)
223    }
224}
225
226fn is_write_only_plan(plan: &LogicalPlan) -> bool {
227    // A plan is write-only when its top-level operator is a mutation
228    // with no projection (Project/Identity) wrapping it. Mutations
229    // with RETURN have a Project on top.
230    match plan {
231        LogicalPlan::CreatePath { .. }
232        | LogicalPlan::Delete { .. }
233        | LogicalPlan::SetProperty { .. }
234        | LogicalPlan::Remove { .. }
235        | LogicalPlan::Foreach { .. }
236        | LogicalPlan::MergeNode { .. }
237        | LogicalPlan::MergeEdge { .. } => true,
238        _ => false,
239    }
240}
241
242/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
243/// statement and was handled; `Ok(None)` when it's a regular graph
244/// operation that the operator pipeline should execute normally.
245///
246/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
247/// value so Bolt clients see a non-empty RECORD stream rather than
248/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
249/// `SHOW INDEXES` returns one row per registered index with
250/// `label`, `property`, and `state` columns.
251fn try_execute_ddl(
252    plan: &LogicalPlan,
253    reader: &dyn GraphReader,
254    writer: &dyn GraphWriter,
255) -> Result<Option<Vec<Row>>> {
256    match plan {
257        LogicalPlan::CreatePropertyIndex { label, property } => {
258            writer.create_property_index(label, property)?;
259            Ok(Some(vec![ddl_ack_row("created", label, property)]))
260        }
261        LogicalPlan::DropPropertyIndex { label, property } => {
262            writer.drop_property_index(label, property)?;
263            Ok(Some(vec![ddl_ack_row("dropped", label, property)]))
264        }
265        LogicalPlan::ShowPropertyIndexes => {
266            // SHOW is a pure read — source it from the reader so
267            // overlay/partitioned readers deliver the right view
268            // without round-tripping through a writer.
269            let specs = reader.list_property_indexes()?;
270            let rows = specs
271                .into_iter()
272                .map(|(label, property)| {
273                    let mut row = Row::default();
274                    row.insert("label".into(), Value::Property(Property::String(label)));
275                    row.insert(
276                        "property".into(),
277                        Value::Property(Property::String(property)),
278                    );
279                    row.insert(
280                        "state".into(),
281                        Value::Property(Property::String("online".into())),
282                    );
283                    row
284                })
285                .collect();
286            Ok(Some(rows))
287        }
288        _ => Ok(None),
289    }
290}
291
292fn ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
293    let mut row = Row::default();
294    row.insert(
295        "state".into(),
296        Value::Property(Property::String(state.into())),
297    );
298    row.insert(
299        "label".into(),
300        Value::Property(Property::String(label.into())),
301    );
302    row.insert(
303        "property".into(),
304        Value::Property(Property::String(property.into())),
305    );
306    row
307}
308
309fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
310    build_op_inner(plan, None)
311}
312
313pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
314    macro_rules! child {
315        ($p:expr) => {
316            build_op_inner($p, seed)
317        };
318    }
319    match plan {
320        LogicalPlan::CreatePath {
321            input,
322            nodes,
323            edges,
324        } => Box::new(CreatePathOp::new(
325            input.as_ref().map(|p| child!(p)),
326            nodes.clone(),
327            edges.clone(),
328        )),
329        LogicalPlan::CartesianProduct { left, right } => {
330            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
331        }
332        LogicalPlan::Delete {
333            input,
334            detach,
335            vars,
336            exprs,
337        } => Box::new(DeleteOp::new(
338            child!(input),
339            *detach,
340            vars.clone(),
341            exprs.clone(),
342        )),
343        LogicalPlan::SetProperty { input, assignments } => {
344            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
345        }
346        LogicalPlan::Remove { input, items } => {
347            Box::new(RemoveOp::new(child!(input), items.clone()))
348        }
349        LogicalPlan::LoadCsv {
350            input,
351            path_expr,
352            var,
353            with_headers,
354        } => Box::new(LoadCsvOp::new(
355            input.as_ref().map(|p| child!(p)),
356            path_expr.clone(),
357            var.clone(),
358            *with_headers,
359        )),
360        LogicalPlan::Foreach {
361            input,
362            var,
363            list_expr,
364            set_assignments,
365            remove_items,
366        } => Box::new(ForeachOp::new(
367            child!(input),
368            var.clone(),
369            list_expr.clone(),
370            set_assignments.clone(),
371            remove_items.clone(),
372        )),
373        LogicalPlan::CallSubquery { input, body } => {
374            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
375        }
376        LogicalPlan::OptionalApply {
377            input,
378            body,
379            null_vars,
380        } => Box::new(OptionalApplyOp::new(
381            child!(input),
382            (**body).clone(),
383            null_vars.clone(),
384        )),
385        LogicalPlan::ProcedureCall {
386            input,
387            qualified_name,
388            args,
389            yield_spec,
390            standalone,
391        } => Box::new(ProcedureCallOp::new(
392            input.as_ref().map(|p| child!(p)),
393            qualified_name.clone(),
394            args.clone(),
395            yield_spec.clone(),
396            *standalone,
397        )),
398        LogicalPlan::SeedRow => match seed {
399            Some(r) => Box::new(SeededRowOp {
400                row: Some(r.clone()),
401            }),
402            None => Box::new(SeedRowOp { done: false }),
403        },
404        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
405        LogicalPlan::NodeScanByLabels { var, labels } => {
406            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
407        }
408        LogicalPlan::EdgeExpand {
409            input,
410            src_var,
411            edge_var,
412            dst_var,
413            dst_labels,
414            edge_properties,
415            edge_types,
416            direction,
417            edge_constraint_var,
418        } => Box::new(EdgeExpandOp::new(
419            child!(input),
420            src_var.clone(),
421            edge_var.clone(),
422            dst_var.clone(),
423            dst_labels.clone(),
424            edge_properties.clone(),
425            edge_types.clone(),
426            *direction,
427            edge_constraint_var.clone(),
428        )),
429        LogicalPlan::OptionalEdgeExpand {
430            input,
431            src_var,
432            edge_var,
433            dst_var,
434            dst_labels,
435            dst_properties,
436            edge_types,
437            direction,
438            dst_constraint_var,
439            edge_constraint_var,
440        } => Box::new(OptionalEdgeExpandOp::new(
441            child!(input),
442            src_var.clone(),
443            edge_var.clone(),
444            dst_var.clone(),
445            dst_labels.clone(),
446            dst_properties.clone(),
447            edge_types.clone(),
448            *direction,
449            dst_constraint_var.clone(),
450            edge_constraint_var.clone(),
451        )),
452        LogicalPlan::VarLengthExpand {
453            input,
454            src_var,
455            edge_var,
456            dst_var,
457            dst_labels,
458            edge_types,
459            edge_properties,
460            direction,
461            min_hops,
462            max_hops,
463            path_var,
464            optional,
465            dst_constraint_var,
466            bound_edge_list_var,
467            excluded_edge_vars,
468        } => Box::new(VarLengthExpandOp::new(
469            child!(input),
470            src_var.clone(),
471            edge_var.clone(),
472            dst_var.clone(),
473            dst_labels.clone(),
474            edge_types.clone(),
475            edge_properties.clone(),
476            *direction,
477            *min_hops,
478            *max_hops,
479            path_var.clone(),
480            *optional,
481            dst_constraint_var.clone(),
482            bound_edge_list_var.clone(),
483            excluded_edge_vars.clone(),
484        )),
485        LogicalPlan::Filter { input, predicate } => {
486            Box::new(FilterOp::new(child!(input), predicate.clone()))
487        }
488        LogicalPlan::Project { input, items } => {
489            Box::new(ProjectOp::new(child!(input), items.clone()))
490        }
491        LogicalPlan::Aggregate {
492            input,
493            group_keys,
494            aggregates,
495        } => Box::new(AggregateOp::new(
496            child!(input),
497            group_keys.clone(),
498            aggregates.clone(),
499        )),
500        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
501        LogicalPlan::CoalesceNullRow { input, null_vars } => {
502            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
503        }
504        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
505        LogicalPlan::OrderBy { input, sort_items } => {
506            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
507        }
508        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
509        LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
510        LogicalPlan::MergeNode {
511            input,
512            var,
513            labels,
514            properties,
515            on_create,
516            on_match,
517        } => Box::new(MergeNodeOp::new(
518            input.as_ref().map(|p| child!(p)),
519            var.clone(),
520            labels.clone(),
521            properties.clone(),
522            on_create.clone(),
523            on_match.clone(),
524        )),
525        LogicalPlan::MergeEdge {
526            input,
527            edge_var,
528            src_var,
529            dst_var,
530            edge_type,
531            undirected,
532            properties,
533            on_create,
534            on_match,
535        } => Box::new(MergeEdgeOp::new(
536            child!(input),
537            edge_var.clone(),
538            src_var.clone(),
539            dst_var.clone(),
540            edge_type.clone(),
541            *undirected,
542            properties.clone(),
543            on_create.clone(),
544            on_match.clone(),
545        )),
546        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
547        LogicalPlan::UnwindChain { input, var, expr } => {
548            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
549        }
550        LogicalPlan::IndexSeek {
551            var,
552            label,
553            property,
554            value,
555        } => Box::new(IndexSeekOp::new(
556            var.clone(),
557            label.clone(),
558            property.clone(),
559            value.clone(),
560        )),
561        // DDL plans are handled by `try_execute_ddl` before the
562        // operator pipeline is built, so they should never reach
563        // this point. An assertion here catches any future
564        // refactor that forgets to gate them.
565        LogicalPlan::Union { branches, all } => {
566            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
567            Box::new(UnionOp::new(branch_ops, *all))
568        }
569        LogicalPlan::BindPath {
570            input,
571            path_var,
572            node_vars,
573            edge_vars,
574        } => Box::new(BindPathOp::new(
575            child!(input),
576            path_var.clone(),
577            node_vars.clone(),
578            edge_vars.clone(),
579        )),
580        LogicalPlan::ShortestPath {
581            input,
582            src_var,
583            dst_var,
584            path_var,
585            edge_types,
586            direction,
587            max_hops,
588            kind,
589        } => Box::new(ShortestPathOp::new(
590            child!(input),
591            src_var.clone(),
592            dst_var.clone(),
593            path_var.clone(),
594            edge_types.clone(),
595            *direction,
596            *max_hops,
597            *kind,
598        )),
599        LogicalPlan::CreatePropertyIndex { .. }
600        | LogicalPlan::DropPropertyIndex { .. }
601        | LogicalPlan::ShowPropertyIndexes => {
602            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
603        }
604    }
605}
606
607struct UnwindOp {
608    var: String,
609    expr: Expr,
610    items: Option<Vec<Value>>,
611    cursor: usize,
612}
613
614impl UnwindOp {
615    fn new(var: String, expr: Expr) -> Self {
616        Self {
617            var,
618            expr,
619            items: None,
620            cursor: 0,
621        }
622    }
623}
624
625impl Operator for UnwindOp {
626    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
627        if self.items.is_none() {
628            let empty = Row::new();
629            let ectx = EvalCtx {
630                row: &empty,
631                params: ctx.params,
632                reader: ctx.store,
633                procedures: ctx.procedures,
634                outer_rows: ctx.outer_rows,
635                tombstones: ctx.tombstones,
636            };
637            let val = eval_expr(&self.expr, &ectx)?;
638            self.items = Some(coerce_unwind_list(val)?);
639        }
640        let items = self.items.as_ref().unwrap();
641        if self.cursor < items.len() {
642            let v = items[self.cursor].clone();
643            self.cursor += 1;
644            let mut row = Row::new();
645            row.insert(self.var.clone(), v);
646            Ok(Some(row))
647        } else {
648            Ok(None)
649        }
650    }
651}
652
653/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
654/// to produce a list, and emits one output row per list element.
655/// Each output row inherits every binding from the input row plus
656/// a new `var` binding. Empty / null lists drop the input row and
657/// the operator immediately pulls the next input row.
658struct UnwindChainOp {
659    input: Box<dyn Operator>,
660    var: String,
661    expr: Expr,
662    current_row: Option<Row>,
663    items: Vec<Value>,
664    cursor: usize,
665}
666
667impl UnwindChainOp {
668    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
669        Self {
670            input,
671            var,
672            expr,
673            current_row: None,
674            items: Vec::new(),
675            cursor: 0,
676        }
677    }
678}
679
680impl Operator for UnwindChainOp {
681    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
682        loop {
683            if let Some(base) = &self.current_row {
684                if self.cursor < self.items.len() {
685                    let v = self.items[self.cursor].clone();
686                    self.cursor += 1;
687                    let mut row = base.clone();
688                    row.insert(self.var.clone(), v);
689                    return Ok(Some(row));
690                }
691                self.current_row = None;
692                self.items.clear();
693                self.cursor = 0;
694            }
695            let base = match self.input.next(ctx)? {
696                Some(r) => r,
697                None => return Ok(None),
698            };
699            let ectx = EvalCtx {
700                row: &base,
701                params: ctx.params,
702                reader: ctx.store,
703                procedures: ctx.procedures,
704                outer_rows: ctx.outer_rows,
705                tombstones: ctx.tombstones,
706            };
707            let val = eval_expr(&self.expr, &ectx)?;
708            self.items = coerce_unwind_list(val)?;
709            self.current_row = Some(base);
710        }
711    }
712}
713
714/// Shared list-coercion used by both UNWIND operators. Accepts a
715/// native executor `Value::List`, a property-list
716/// `Property::List`, or `Null` (treated as an empty list). Any
717/// other shape is a type error — UNWIND is defined over lists.
718fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
719    match val {
720        Value::List(items) => Ok(items),
721        Value::Property(Property::List(props)) => {
722            Ok(props.into_iter().map(Value::Property).collect())
723        }
724        Value::Null => Ok(Vec::new()),
725        _ => Err(Error::TypeMismatch),
726    }
727}
728
729struct CreatePathOp {
730    input: Option<Box<dyn Operator>>,
731    nodes: Vec<CreateNodeSpec>,
732    edges: Vec<CreateEdgeSpec>,
733    done: bool,
734    buffered: Option<Vec<Row>>,
735    cursor: usize,
736}
737
738impl CreatePathOp {
739    fn new(
740        input: Option<Box<dyn Operator>>,
741        nodes: Vec<CreateNodeSpec>,
742        edges: Vec<CreateEdgeSpec>,
743    ) -> Self {
744        Self {
745            input,
746            nodes,
747            edges,
748            done: false,
749            buffered: None,
750            cursor: 0,
751        }
752    }
753
754    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
755        let mut out = row.clone();
756        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
757        for spec in &self.nodes {
758            match spec {
759                CreateNodeSpec::New {
760                    var,
761                    labels,
762                    properties,
763                } => {
764                    let mut node = Node::new();
765                    for label in labels {
766                        node.labels.push(label.clone());
767                    }
768                    // Property values are `Expr` (literal | parameter
769                    // per the grammar). Evaluate against the current row
770                    // + params and convert to a stored Property via the
771                    // existing helper, which rejects Node/Edge values.
772                    // Null-valued properties aren't stored — openCypher
773                    // treats `{k: null}` as "no such property," so
774                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
775                    for (k, expr) in properties {
776                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
777                        let prop = value_to_property(value)?;
778                        if matches!(prop, Property::Null) {
779                            continue;
780                        }
781                        node.properties.insert(k.clone(), prop);
782                    }
783                    ctx.writer.put_node(&node)?;
784                    node_ids.push(node.id);
785                    if let Some(v) = var {
786                        out.insert(v.clone(), Value::Node(node));
787                    }
788                }
789                CreateNodeSpec::Reference(name) => {
790                    let id = match out.get(name) {
791                        Some(Value::Node(n)) => n.id,
792                        _ => return Err(Error::UnboundVariable(name.clone())),
793                    };
794                    node_ids.push(id);
795                }
796            }
797        }
798        for spec in &self.edges {
799            let src = node_ids[spec.src_idx];
800            let dst = node_ids[spec.dst_idx];
801            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
802            for (k, expr) in &spec.properties {
803                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
804                let prop = value_to_property(value)?;
805                if matches!(prop, Property::Null) {
806                    continue;
807                }
808                edge.properties.insert(k.clone(), prop);
809            }
810            ctx.writer.put_edge(&edge)?;
811            if let Some(v) = &spec.var {
812                out.insert(v.clone(), Value::Edge(edge));
813            }
814        }
815        Ok(out)
816    }
817}
818
819impl Operator for CreatePathOp {
820    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
821        if self.input.is_some() {
822            // Drain the whole input first, then replay sequentially. Draining up
823            // front avoids aliasing the source scan's cursor while we're writing
824            // to the store (node-label scans cache ids lazily on first call).
825            if let Some(buffered) = self.buffered.as_mut() {
826                if self.cursor < buffered.len() {
827                    let row = buffered[self.cursor].clone();
828                    self.cursor += 1;
829                    return Ok(Some(self.apply(ctx, &row)?));
830                }
831                return Ok(None);
832            }
833            let mut rows: Vec<Row> = Vec::new();
834            {
835                let input = self.input.as_mut().unwrap();
836                while let Some(row) = input.next(ctx)? {
837                    rows.push(row);
838                }
839            }
840            self.buffered = Some(rows);
841            self.cursor = 0;
842            // Fall through to next call via recursion.
843            self.next(ctx)
844        } else {
845            if self.done {
846                return Ok(None);
847            }
848            self.done = true;
849            let empty = Row::new();
850            Ok(Some(self.apply(ctx, &empty)?))
851        }
852    }
853}
854
855struct CartesianProductOp {
856    left: Box<dyn Operator>,
857    right_plan: LogicalPlan,
858    left_row: Option<Row>,
859    right_op: Option<Box<dyn Operator>>,
860}
861
862impl CartesianProductOp {
863    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
864        Self {
865            left,
866            right_plan,
867            left_row: None,
868            right_op: None,
869        }
870    }
871}
872
873impl Operator for CartesianProductOp {
874    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
875        loop {
876            if self.left_row.is_none() {
877                match self.left.next(ctx)? {
878                    None => return Ok(None),
879                    Some(row) => {
880                        self.left_row = Some(row);
881                        self.right_op = Some(build_op(&self.right_plan));
882                    }
883                }
884            }
885            let right_op = self.right_op.as_mut().expect("right_op set");
886            // Expose the current left row to right-side operators
887            // via a shadow context so constraint lookups (for
888            // bound edges / bound edge lists) can find vars that
889            // the right-side scan itself doesn't bind.
890            let left_ref = self.left_row.as_ref().unwrap();
891            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
892            stacked.push(left_ref);
893            stacked.extend_from_slice(ctx.outer_rows);
894            let inner_ctx = ExecCtx {
895                store: ctx.store,
896                writer: ctx.writer,
897                params: ctx.params,
898                procedures: ctx.procedures,
899                outer_rows: &stacked,
900                tombstones: ctx.tombstones,
901            };
902            match right_op.next(&inner_ctx)? {
903                Some(right_row) => {
904                    let mut combined = left_ref.clone();
905                    for (k, v) in right_row {
906                        combined.insert(k, v);
907                    }
908                    return Ok(Some(combined));
909                }
910                None => {
911                    self.left_row = None;
912                    self.right_op = None;
913                }
914            }
915        }
916    }
917}
918
919struct DeleteOp {
920    input: Box<dyn Operator>,
921    detach: bool,
922    #[allow(dead_code)]
923    vars: Vec<String>,
924    exprs: Vec<Expr>,
925    /// Rows drained from `input` before any deletes have run.
926    /// Populated on first call to `next`. openCypher treats DELETE
927    /// as a batch mutation: the whole preceding row set is
928    /// materialised, then the mutations happen, then rows are
929    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
930    /// a, b RETURN count(*)` deletes the first row's nodes before
931    /// the pipelined scan reaches the second row.
932    buffered: Option<Vec<Row>>,
933    cursor: usize,
934}
935
936impl DeleteOp {
937    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
938        Self {
939            input,
940            detach,
941            vars,
942            exprs,
943            buffered: None,
944            cursor: 0,
945        }
946    }
947
948    /// Gather every edge and node id referenced by the row's
949    /// DELETE expressions and run the delete in two phases —
950    /// edges first, then nodes. The two-phase ordering is what
951    /// lets `DELETE p1, p2` succeed when two paths share nodes
952    /// connected by each other's edges: by the time the node
953    /// phase runs, the in-batch edges are already gone. The
954    /// non-detach check probes the graph then filters out
955    /// any edge we just deleted, so the batch-level detachment
956    /// is counted as "no longer attached" rather than failing.
957    fn apply_deletes(
958        &self,
959        ctx: &ExecCtx,
960        row: &Row,
961        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
962        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
963    ) -> Result<()> {
964        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
965        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
966        for expr in &self.exprs {
967            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
968            match v {
969                Value::Node(n) => node_ids.push(n.id),
970                Value::Edge(e) => edge_ids.push(e.id),
971                Value::Path { nodes, edges } => {
972                    for e in edges {
973                        edge_ids.push(e.id);
974                    }
975                    for n in nodes {
976                        node_ids.push(n.id);
977                    }
978                }
979                Value::Null | Value::Property(Property::Null) => continue,
980                _ => return Err(Error::TypeMismatch),
981            }
982        }
983        for eid in &edge_ids {
984            if seen_edges.insert(*eid) {
985                ctx.writer.delete_edge(*eid)?;
986                ctx.tombstones.edges.borrow_mut().insert(*eid);
987            }
988        }
989        for nid in &node_ids {
990            if !seen_nodes.insert(*nid) {
991                continue;
992            }
993            if self.detach {
994                // DETACH DELETE also removes every attached edge;
995                // tombstone them too so a later projection over a
996                // formerly-attached edge clone also raises rather
997                // than reading stale properties.
998                for (eid, _) in ctx.store.outgoing(*nid)? {
999                    ctx.tombstones.edges.borrow_mut().insert(eid);
1000                }
1001                for (eid, _) in ctx.store.incoming(*nid)? {
1002                    ctx.tombstones.edges.borrow_mut().insert(eid);
1003                }
1004                ctx.writer.detach_delete_node(*nid)?;
1005            } else {
1006                let out = ctx.store.outgoing(*nid)?;
1007                let inc = ctx.store.incoming(*nid)?;
1008                let still_attached = out
1009                    .iter()
1010                    .chain(inc.iter())
1011                    .any(|(eid, _)| !seen_edges.contains(eid));
1012                if still_attached {
1013                    return Err(Error::CannotDeleteAttachedNode);
1014                }
1015                ctx.writer.detach_delete_node(*nid)?;
1016            }
1017            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1018        }
1019        Ok(())
1020    }
1021}
1022
1023impl Operator for DeleteOp {
1024    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1025        // First call: drain the input, run all deletes, then
1026        // start forwarding rows. Buffering protects against the
1027        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1028        // DELETE r, a, b RETURN count(*)` must observe every
1029        // MATCH row before any node or edge disappears, otherwise
1030        // the undirected expansion hits an already-deleted source
1031        // on the second iteration.
1032        if self.buffered.is_none() {
1033            let mut rows: Vec<Row> = Vec::new();
1034            while let Some(row) = self.input.next(ctx)? {
1035                rows.push(row);
1036            }
1037            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1038            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1039            for row in &rows {
1040                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1041            }
1042            self.buffered = Some(rows);
1043            self.cursor = 0;
1044        }
1045        let rows = self.buffered.as_ref().unwrap();
1046        if self.cursor < rows.len() {
1047            let row = rows[self.cursor].clone();
1048            self.cursor += 1;
1049            return Ok(Some(row));
1050        }
1051        Ok(None)
1052    }
1053}
1054
1055struct SetPropertyOp {
1056    input: Box<dyn Operator>,
1057    assignments: Vec<SetAssignment>,
1058}
1059
1060impl SetPropertyOp {
1061    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1062        Self { input, assignments }
1063    }
1064}
1065
1066impl Operator for SetPropertyOp {
1067    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1068        match self.input.next(ctx)? {
1069            None => Ok(None),
1070            Some(mut row) => {
1071                // Phase 1: evaluate any RHSes against the original row bindings.
1072                enum Action {
1073                    SetKey {
1074                        var: String,
1075                        key: String,
1076                        prop: Property,
1077                    },
1078                    AddLabels {
1079                        var: String,
1080                        labels: Vec<String>,
1081                    },
1082                    Replace {
1083                        var: String,
1084                        props: Vec<(String, Property)>,
1085                    },
1086                    Merge {
1087                        var: String,
1088                        props: Vec<(String, Property)>,
1089                    },
1090                }
1091                let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1092                for a in &self.assignments {
1093                    match a {
1094                        SetAssignment::Property { var, key, value } => {
1095                            let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1096                            let prop = value_to_property(evaluated)?;
1097                            actions.push(Action::SetKey {
1098                                var: var.clone(),
1099                                key: key.clone(),
1100                                prop,
1101                            });
1102                        }
1103                        SetAssignment::Labels { var, labels } => {
1104                            actions.push(Action::AddLabels {
1105                                var: var.clone(),
1106                                labels: labels.clone(),
1107                            });
1108                        }
1109                        SetAssignment::Replace { var, properties } => {
1110                            // Property values are now `Expr`. Evaluate
1111                            // each against the current row + params and
1112                            // convert via the shared helper, which
1113                            // surfaces InvalidSetValue for Node/Edge.
1114                            let props = properties
1115                                .iter()
1116                                .map(|(k, expr)| {
1117                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1118                                    Ok((k.clone(), value_to_property(v)?))
1119                                })
1120                                .collect::<Result<Vec<(String, Property)>>>()?;
1121                            actions.push(Action::Replace {
1122                                var: var.clone(),
1123                                props,
1124                            });
1125                        }
1126                        SetAssignment::Merge { var, properties } => {
1127                            let props = properties
1128                                .iter()
1129                                .map(|(k, expr)| {
1130                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1131                                    Ok((k.clone(), value_to_property(v)?))
1132                                })
1133                                .collect::<Result<Vec<(String, Property)>>>()?;
1134                            actions.push(Action::Merge {
1135                                var: var.clone(),
1136                                props,
1137                            });
1138                        }
1139                        SetAssignment::ReplaceFromExpr {
1140                            var,
1141                            source,
1142                            replace,
1143                        } => {
1144                            let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1145                            let props = extract_property_map(&v)?;
1146                            if *replace {
1147                                actions.push(Action::Replace {
1148                                    var: var.clone(),
1149                                    props,
1150                                });
1151                            } else {
1152                                actions.push(Action::Merge {
1153                                    var: var.clone(),
1154                                    props,
1155                                });
1156                            }
1157                        }
1158                    }
1159                }
1160
1161                // Phase 2: apply updates in-place to the row bindings.
1162                let mut updated_nodes: HashSet<String> = HashSet::new();
1163                let mut updated_edges: HashSet<String> = HashSet::new();
1164                for action in actions {
1165                    match action {
1166                        Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1167                            // openCypher: SET on a null target (from
1168                            // OPTIONAL MATCH that didn't bind) is a
1169                            // silent no-op rather than an error.
1170                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1171                                continue
1172                            }
1173                            // openCypher: SET a.key = null removes the
1174                            // property rather than storing a null value.
1175                            Some(Value::Node(n)) => {
1176                                if matches!(prop, Property::Null) {
1177                                    n.properties.remove(&key);
1178                                } else {
1179                                    n.properties.insert(key, prop);
1180                                }
1181                                updated_nodes.insert(var);
1182                            }
1183                            Some(Value::Edge(e)) => {
1184                                if matches!(prop, Property::Null) {
1185                                    e.properties.remove(&key);
1186                                } else {
1187                                    e.properties.insert(key, prop);
1188                                }
1189                                updated_edges.insert(var);
1190                            }
1191                            _ => return Err(Error::UnboundVariable(var)),
1192                        },
1193                        Action::AddLabels { var, labels } => match row.get_mut(&var) {
1194                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1195                                continue
1196                            }
1197                            Some(Value::Node(n)) => {
1198                                for label in labels {
1199                                    if !n.labels.contains(&label) {
1200                                        n.labels.push(label);
1201                                    }
1202                                }
1203                                updated_nodes.insert(var);
1204                            }
1205                            _ => return Err(Error::UnboundVariable(var)),
1206                        },
1207                        Action::Replace { var, props } => match row.get_mut(&var) {
1208                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1209                                continue
1210                            }
1211                            Some(Value::Node(n)) => {
1212                                n.properties.clear();
1213                                for (k, v) in props {
1214                                    if !matches!(v, Property::Null) {
1215                                        n.properties.insert(k, v);
1216                                    }
1217                                }
1218                                updated_nodes.insert(var);
1219                            }
1220                            Some(Value::Edge(e)) => {
1221                                e.properties.clear();
1222                                for (k, v) in props {
1223                                    if !matches!(v, Property::Null) {
1224                                        e.properties.insert(k, v);
1225                                    }
1226                                }
1227                                updated_edges.insert(var);
1228                            }
1229                            _ => return Err(Error::UnboundVariable(var)),
1230                        },
1231                        Action::Merge { var, props } => match row.get_mut(&var) {
1232                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1233                                continue
1234                            }
1235                            Some(Value::Node(n)) => {
1236                                for (k, v) in props {
1237                                    if matches!(v, Property::Null) {
1238                                        n.properties.remove(&k);
1239                                    } else {
1240                                        n.properties.insert(k, v);
1241                                    }
1242                                }
1243                                updated_nodes.insert(var);
1244                            }
1245                            Some(Value::Edge(e)) => {
1246                                for (k, v) in props {
1247                                    if matches!(v, Property::Null) {
1248                                        e.properties.remove(&k);
1249                                    } else {
1250                                        e.properties.insert(k, v);
1251                                    }
1252                                }
1253                                updated_edges.insert(var);
1254                            }
1255                            _ => return Err(Error::UnboundVariable(var)),
1256                        },
1257                    }
1258                }
1259
1260                // Phase 3: flush each mutated entity once to the writer.
1261                for var in &updated_nodes {
1262                    if let Some(Value::Node(n)) = row.get(var) {
1263                        ctx.writer.put_node(n)?;
1264                    }
1265                }
1266                for var in &updated_edges {
1267                    if let Some(Value::Edge(e)) = row.get(var) {
1268                        ctx.writer.put_edge(e)?;
1269                    }
1270                }
1271
1272                Ok(Some(row))
1273            }
1274        }
1275    }
1276}
1277
1278struct RemoveOp {
1279    input: Box<dyn Operator>,
1280    items: Vec<RemoveSpec>,
1281}
1282
1283impl RemoveOp {
1284    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1285        Self { input, items }
1286    }
1287}
1288
1289impl Operator for RemoveOp {
1290    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1291        match self.input.next(ctx)? {
1292            None => Ok(None),
1293            Some(mut row) => {
1294                let mut updated_nodes: HashSet<String> = HashSet::new();
1295                let mut updated_edges: HashSet<String> = HashSet::new();
1296                for item in &self.items {
1297                    match item {
1298                        RemoveSpec::Property { var, key } => match row.get_mut(var) {
1299                            // Null target (from OPTIONAL MATCH) is a
1300                            // no-op, matching Neo4j's REMOVE semantics.
1301                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1302                                continue
1303                            }
1304                            Some(Value::Node(n)) => {
1305                                n.properties.remove(key);
1306                                updated_nodes.insert(var.clone());
1307                            }
1308                            Some(Value::Edge(e)) => {
1309                                e.properties.remove(key);
1310                                updated_edges.insert(var.clone());
1311                            }
1312                            _ => return Err(Error::UnboundVariable(var.clone())),
1313                        },
1314                        RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1315                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1316                                continue
1317                            }
1318                            Some(Value::Node(n)) => {
1319                                n.labels.retain(|l| !labels.contains(l));
1320                                updated_nodes.insert(var.clone());
1321                            }
1322                            _ => return Err(Error::UnboundVariable(var.clone())),
1323                        },
1324                    }
1325                }
1326                for var in &updated_nodes {
1327                    if let Some(Value::Node(n)) = row.get(var) {
1328                        ctx.writer.put_node(n)?;
1329                    }
1330                }
1331                for var in &updated_edges {
1332                    if let Some(Value::Edge(e)) = row.get(var) {
1333                        ctx.writer.put_edge(e)?;
1334                    }
1335                }
1336                Ok(Some(row))
1337            }
1338        }
1339    }
1340}
1341
1342struct LoadCsvOp {
1343    input: Option<Box<dyn Operator>>,
1344    path_expr: Expr,
1345    var: String,
1346    with_headers: bool,
1347    rows: Option<Vec<Value>>,
1348    cursor: usize,
1349}
1350
1351impl LoadCsvOp {
1352    fn new(
1353        input: Option<Box<dyn Operator>>,
1354        path_expr: Expr,
1355        var: String,
1356        with_headers: bool,
1357    ) -> Self {
1358        Self {
1359            input,
1360            path_expr,
1361            var,
1362            with_headers,
1363            rows: None,
1364            cursor: 0,
1365        }
1366    }
1367
1368    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1369        let ectx = ctx.eval_ctx(base_row);
1370        let path_val = eval_expr(&self.path_expr, &ectx)?;
1371        let path = match path_val {
1372            Value::Property(Property::String(s)) => s,
1373            _ => return Err(Error::TypeMismatch),
1374        };
1375        let content = std::fs::read_to_string(&path).map_err(|e| {
1376            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1377        })?;
1378        let mut lines = content.lines();
1379        let headers: Option<Vec<String>> = if self.with_headers {
1380            lines
1381                .next()
1382                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1383        } else {
1384            None
1385        };
1386        let mut csv_rows = Vec::new();
1387        for line in lines {
1388            if line.trim().is_empty() {
1389                continue;
1390            }
1391            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1392            if let Some(hdrs) = &headers {
1393                let mut map = std::collections::HashMap::new();
1394                for (i, h) in hdrs.iter().enumerate() {
1395                    let val = fields.get(i).cloned().unwrap_or_default();
1396                    map.insert(h.clone(), Property::String(val));
1397                }
1398                csv_rows.push(Value::Property(Property::Map(map)));
1399            } else {
1400                let list: Vec<Value> = fields
1401                    .into_iter()
1402                    .map(|f| Value::Property(Property::String(f)))
1403                    .collect();
1404                csv_rows.push(Value::List(list));
1405            }
1406        }
1407        self.rows = Some(csv_rows);
1408        self.cursor = 0;
1409        Ok(())
1410    }
1411}
1412
1413impl Operator for LoadCsvOp {
1414    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1415        if self.rows.is_none() {
1416            let base = if let Some(input) = &mut self.input {
1417                match input.next(ctx)? {
1418                    Some(r) => r,
1419                    None => return Ok(None),
1420                }
1421            } else {
1422                Row::new()
1423            };
1424            self.load(ctx, &base)?;
1425        }
1426        let rows = self.rows.as_ref().unwrap();
1427        if self.cursor < rows.len() {
1428            let val = rows[self.cursor].clone();
1429            self.cursor += 1;
1430            let mut row = Row::new();
1431            row.insert(self.var.clone(), val);
1432            Ok(Some(row))
1433        } else {
1434            Ok(None)
1435        }
1436    }
1437}
1438
1439struct ForeachOp {
1440    input: Box<dyn Operator>,
1441    var: String,
1442    list_expr: Expr,
1443    set_assignments: Vec<SetAssignment>,
1444    remove_items: Vec<RemoveSpec>,
1445}
1446
1447impl ForeachOp {
1448    fn new(
1449        input: Box<dyn Operator>,
1450        var: String,
1451        list_expr: Expr,
1452        set_assignments: Vec<SetAssignment>,
1453        remove_items: Vec<RemoveSpec>,
1454    ) -> Self {
1455        Self {
1456            input,
1457            var,
1458            list_expr,
1459            set_assignments,
1460            remove_items,
1461        }
1462    }
1463}
1464
1465impl Operator for ForeachOp {
1466    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1467        let Some(row) = self.input.next(ctx)? else {
1468            return Ok(None);
1469        };
1470        let ectx = ctx.eval_ctx(&row);
1471        let list_val = eval_expr(&self.list_expr, &ectx)?;
1472        let items = match list_val {
1473            Value::List(items) => items,
1474            Value::Property(Property::List(props)) => {
1475                props.into_iter().map(Value::Property).collect()
1476            }
1477            Value::Null | Value::Property(Property::Null) => Vec::new(),
1478            _ => return Err(Error::TypeMismatch),
1479        };
1480        for item in items {
1481            let mut scratch = row.clone();
1482            scratch.insert(self.var.clone(), item);
1483            for a in &self.set_assignments {
1484                match a {
1485                    SetAssignment::Property { var, key, value } => {
1486                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1487                        let prop = value_to_property(evaluated)?;
1488                        match scratch.get_mut(var) {
1489                            Some(Value::Node(n)) => {
1490                                n.properties.insert(key.clone(), prop);
1491                            }
1492                            Some(Value::Edge(e)) => {
1493                                e.properties.insert(key.clone(), prop);
1494                            }
1495                            _ => return Err(Error::UnboundVariable(var.clone())),
1496                        }
1497                    }
1498                    SetAssignment::Labels { var, labels } => {
1499                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1500                            for l in labels {
1501                                if !n.labels.contains(l) {
1502                                    n.labels.push(l.clone());
1503                                }
1504                            }
1505                        }
1506                    }
1507                    _ => {}
1508                }
1509            }
1510            for ri in &self.remove_items {
1511                match ri {
1512                    RemoveSpec::Property { var, key } => {
1513                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1514                            n.properties.remove(key);
1515                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1516                            e.properties.remove(key);
1517                        }
1518                    }
1519                    RemoveSpec::Labels { var, labels } => {
1520                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1521                            n.labels.retain(|l| !labels.contains(l));
1522                        }
1523                    }
1524                }
1525            }
1526            // Flush mutated entities for each iteration
1527            for (_, val) in scratch.iter() {
1528                match val {
1529                    Value::Node(n) => ctx.writer.put_node(n)?,
1530                    Value::Edge(e) => ctx.writer.put_edge(e)?,
1531                    _ => {}
1532                }
1533            }
1534        }
1535        Ok(Some(row))
1536    }
1537}
1538
1539struct SeedRowOp {
1540    done: bool,
1541}
1542
1543impl Operator for SeedRowOp {
1544    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1545        if self.done {
1546            return Ok(None);
1547        }
1548        self.done = true;
1549        Ok(Some(Row::new()))
1550    }
1551}
1552
1553struct SeededRowOp {
1554    row: Option<Row>,
1555}
1556
1557impl Operator for SeededRowOp {
1558    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1559        Ok(self.row.take())
1560    }
1561}
1562
1563struct CallSubqueryOp {
1564    input: Box<dyn Operator>,
1565    body_plan: LogicalPlan,
1566    pending: Vec<Row>,
1567    pending_idx: usize,
1568}
1569
1570impl CallSubqueryOp {
1571    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1572        Self {
1573            input,
1574            body_plan,
1575            pending: Vec::new(),
1576            pending_idx: 0,
1577        }
1578    }
1579}
1580
1581impl Operator for CallSubqueryOp {
1582    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1583        loop {
1584            if self.pending_idx < self.pending.len() {
1585                let row = self.pending[self.pending_idx].clone();
1586                self.pending_idx += 1;
1587                return Ok(Some(row));
1588            }
1589            let outer_row = match self.input.next(ctx)? {
1590                Some(r) => r,
1591                None => return Ok(None),
1592            };
1593            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1594            let mut results = Vec::new();
1595            while let Some(body_row) = body_op.next(ctx)? {
1596                let mut merged = outer_row.clone();
1597                for (k, v) in body_row {
1598                    merged.insert(k, v);
1599                }
1600                results.push(merged);
1601            }
1602            if results.is_empty() {
1603                continue;
1604            }
1605            self.pending = results;
1606            self.pending_idx = 0;
1607        }
1608    }
1609}
1610
1611/// Per-input-row left-join driver (see
1612/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
1613/// outer row with the row as its seed; forwards all emitted rows
1614/// merged with the outer bindings, and emits one null-fallback
1615/// row (outer bindings plus `null_vars` bound to Null) only when
1616/// the body produced zero rows for that input.
1617struct OptionalApplyOp {
1618    input: Box<dyn Operator>,
1619    body_plan: LogicalPlan,
1620    null_vars: Vec<String>,
1621    pending: Vec<Row>,
1622    pending_idx: usize,
1623}
1624
1625impl OptionalApplyOp {
1626    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1627        Self {
1628            input,
1629            body_plan,
1630            null_vars,
1631            pending: Vec::new(),
1632            pending_idx: 0,
1633        }
1634    }
1635}
1636
1637impl Operator for OptionalApplyOp {
1638    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1639        loop {
1640            if self.pending_idx < self.pending.len() {
1641                let row = self.pending[self.pending_idx].clone();
1642                self.pending_idx += 1;
1643                return Ok(Some(row));
1644            }
1645            let outer_row = match self.input.next(ctx)? {
1646                Some(r) => r,
1647                None => return Ok(None),
1648            };
1649            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1650            // Push the outer row onto `outer_rows` so Filter /
1651            // eval_expr inside the body can resolve outer
1652            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
1653            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1654            stacked.push(&outer_row);
1655            stacked.extend_from_slice(ctx.outer_rows);
1656            let inner_ctx = ExecCtx {
1657                store: ctx.store,
1658                writer: ctx.writer,
1659                params: ctx.params,
1660                procedures: ctx.procedures,
1661                outer_rows: &stacked,
1662                tombstones: ctx.tombstones,
1663            };
1664            let mut results = Vec::new();
1665            while let Some(body_row) = body_op.next(&inner_ctx)? {
1666                let mut merged = outer_row.clone();
1667                for (k, v) in body_row {
1668                    merged.insert(k, v);
1669                }
1670                results.push(merged);
1671            }
1672            if results.is_empty() {
1673                let mut fallback = outer_row;
1674                for v in &self.null_vars {
1675                    fallback.insert(v.clone(), Value::Null);
1676                }
1677                return Ok(Some(fallback));
1678            }
1679            self.pending = results;
1680            self.pending_idx = 0;
1681        }
1682    }
1683}
1684
1685/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
1686///
1687/// Resolves the procedure against the [`ProcedureRegistry`] at
1688/// `next()` time, validates arity / types / form (implicit args,
1689/// `YIELD *`, in-query YIELD requirement), then scans the registered
1690/// data-table for rows whose input cells match the call arguments.
1691/// Each matching row is projected according to the YIELD spec and
1692/// merged into the upstream row.
1693///
1694/// Two shapes are supported at once:
1695/// * Standalone (`input = None`): the op is the full plan. It
1696///   runs the procedure exactly once (against a synthetic empty row)
1697///   and emits the matching rows directly.
1698/// * In-query (`input = Some(...)`): for each upstream row, runs
1699///   the procedure with args evaluated against that row and emits
1700///   one merged row per match. Procedures with zero declared output
1701///   columns act as a pass-through — the upstream row is forwarded
1702///   unchanged, matching Neo4j's side-effect-only semantics.
1703struct ProcedureCallOp {
1704    input: Option<Box<dyn Operator>>,
1705    qualified_name: Vec<String>,
1706    args: Option<Vec<Expr>>,
1707    yield_spec: Option<YieldSpec>,
1708    standalone: bool,
1709    buffered: Vec<Row>,
1710    buffered_idx: usize,
1711    // Only set for the standalone form, which drives itself off a
1712    // synthetic seed row exactly once.
1713    done: bool,
1714}
1715
1716impl ProcedureCallOp {
1717    fn new(
1718        input: Option<Box<dyn Operator>>,
1719        qualified_name: Vec<String>,
1720        args: Option<Vec<Expr>>,
1721        yield_spec: Option<YieldSpec>,
1722        standalone: bool,
1723    ) -> Self {
1724        Self {
1725            input,
1726            qualified_name,
1727            args,
1728            yield_spec,
1729            standalone,
1730            buffered: Vec::new(),
1731            buffered_idx: 0,
1732            done: false,
1733        }
1734    }
1735
1736    /// Resolve the projection list `(source_column, output_alias)`
1737    /// from the procedure signature and this op's yield spec. Also
1738    /// validates the spec — unknown YIELD columns, duplicate
1739    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
1740    /// surface as `Error::Procedure`.
1741    fn resolve_projection(
1742        &self,
1743        proc: &crate::procedures::Procedure,
1744    ) -> Result<Vec<(String, String)>> {
1745        match &self.yield_spec {
1746            None => {
1747                if !self.standalone {
1748                    // In-query CALL with no YIELD: legal only for
1749                    // side-effect-only procedures (zero declared
1750                    // outputs). A procedure with outputs but no
1751                    // YIELD leaves those outputs unbound, so any
1752                    // downstream RETURN that references them would
1753                    // see `UndefinedVariable`; reject here so the
1754                    // error surfaces instead of silently emitting
1755                    // nulls.
1756                    if proc.outputs.is_empty() {
1757                        return Ok(Vec::new());
1758                    }
1759                    return Err(Error::Procedure(format!(
1760                        "procedure '{}' has outputs but no YIELD clause",
1761                        self.qualified_name.join(".")
1762                    )));
1763                }
1764                Ok(proc
1765                    .outputs
1766                    .iter()
1767                    .map(|o| (o.name.clone(), o.name.clone()))
1768                    .collect())
1769            }
1770            Some(YieldSpec::Star) => {
1771                if !self.standalone {
1772                    return Err(Error::Procedure(
1773                        "YIELD * is only allowed on standalone CALL".into(),
1774                    ));
1775                }
1776                Ok(proc
1777                    .outputs
1778                    .iter()
1779                    .map(|o| (o.name.clone(), o.name.clone()))
1780                    .collect())
1781            }
1782            Some(YieldSpec::Items(items)) => {
1783                let mut projection = Vec::with_capacity(items.len());
1784                let mut seen_aliases: std::collections::HashSet<String> =
1785                    std::collections::HashSet::new();
1786                for yi in items {
1787                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
1788                        return Err(Error::Procedure(format!(
1789                            "procedure '{}' has no output column '{}'",
1790                            self.qualified_name.join("."),
1791                            yi.column
1792                        )));
1793                    }
1794                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
1795                    if !seen_aliases.insert(alias.clone()) {
1796                        return Err(Error::Procedure(format!(
1797                            "variable '{alias}' already bound by YIELD"
1798                        )));
1799                    }
1800                    projection.push((yi.column.clone(), alias));
1801                }
1802                Ok(projection)
1803            }
1804        }
1805    }
1806
1807    /// Evaluate the call's argument list against `row`. For the
1808    /// implicit-args form (`args = None`), each declared input
1809    /// column's value comes from the per-query parameter map
1810    /// (keyed by the input-column name). Returns the argument
1811    /// values in declaration order. Raises `ProcedureError` on
1812    /// arity mismatch, type mismatch, or missing parameter.
1813    fn evaluate_args(
1814        &self,
1815        ctx: &ExecCtx,
1816        row: &Row,
1817        proc: &crate::procedures::Procedure,
1818    ) -> Result<Vec<Value>> {
1819        match &self.args {
1820            Some(exprs) => {
1821                if exprs.len() != proc.inputs.len() {
1822                    return Err(Error::Procedure(format!(
1823                        "procedure '{}' expects {} argument(s), got {}",
1824                        self.qualified_name.join("."),
1825                        proc.inputs.len(),
1826                        exprs.len()
1827                    )));
1828                }
1829                let eval_ctx = ctx.eval_ctx(row);
1830                let mut values = Vec::with_capacity(exprs.len());
1831                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
1832                    let v = eval_expr(expr, &eval_ctx)?;
1833                    if !spec.ty.accepts(&v) {
1834                        return Err(Error::Procedure(format!(
1835                            "argument '{}' has wrong type for procedure '{}'",
1836                            spec.name,
1837                            self.qualified_name.join(".")
1838                        )));
1839                    }
1840                    values.push(coerce_arg(v, spec.ty));
1841                }
1842                Ok(values)
1843            }
1844            None => {
1845                // Implicit-arg form only valid standalone.
1846                if !self.standalone {
1847                    return Err(Error::Procedure(
1848                        "in-query CALL requires explicit argument list".into(),
1849                    ));
1850                }
1851                let mut values = Vec::with_capacity(proc.inputs.len());
1852                for spec in &proc.inputs {
1853                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
1854                        Error::Procedure(format!(
1855                            "missing parameter ${} for procedure '{}'",
1856                            spec.name,
1857                            self.qualified_name.join(".")
1858                        ))
1859                    })?;
1860                    if !spec.ty.accepts(&v) {
1861                        return Err(Error::Procedure(format!(
1862                            "parameter '{}' has wrong type",
1863                            spec.name
1864                        )));
1865                    }
1866                    values.push(coerce_arg(v, spec.ty));
1867                }
1868                Ok(values)
1869            }
1870        }
1871    }
1872
1873    /// Invoke the procedure once for `input_row` and emit zero or
1874    /// more output rows into `out`. Handles the "zero-output-column
1875    /// pass-through" case that keeps `MATCH (n) CALL test.doNothing()
1876    /// RETURN n.name` from filtering the match rows.
1877    fn invoke_once(
1878        &self,
1879        ctx: &ExecCtx,
1880        input_row: &Row,
1881        proc: &crate::procedures::Procedure,
1882        projection: &[(String, String)],
1883        out: &mut Vec<Row>,
1884    ) -> Result<()> {
1885        // Zero-output-column procedures are side-effect-only in
1886        // the TCK; they either suppress rows entirely (standalone)
1887        // or pass the input row through unchanged (in-query).
1888        if proc.outputs.is_empty() {
1889            if !self.standalone {
1890                out.push(input_row.clone());
1891            }
1892            return Ok(());
1893        }
1894        let args = self.evaluate_args(ctx, input_row, proc)?;
1895        for proc_row in &proc.rows {
1896            if !proc.row_matches(proc_row, &args) {
1897                continue;
1898            }
1899            let mut merged = if self.standalone {
1900                Row::new()
1901            } else {
1902                input_row.clone()
1903            };
1904            for (src, alias) in projection {
1905                let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
1906                merged.insert(alias.clone(), v);
1907            }
1908            out.push(merged);
1909        }
1910        Ok(())
1911    }
1912}
1913
1914/// Cast an int to a float when the declared type is `FLOAT`. Other
1915/// declared types leave the value as-is (accept() has already
1916/// gated on kind) so the comparison in `row_matches` sees a
1917/// consistent shape.
1918fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
1919    use crate::procedures::ProcType;
1920    if matches!(ty, ProcType::Float) {
1921        if let Value::Property(Property::Int64(n)) = v {
1922            return Value::Property(Property::Float64(n as f64));
1923        }
1924    }
1925    v
1926}
1927
1928impl Operator for ProcedureCallOp {
1929    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1930        loop {
1931            if self.buffered_idx < self.buffered.len() {
1932                let row = self.buffered[self.buffered_idx].clone();
1933                self.buffered_idx += 1;
1934                return Ok(Some(row));
1935            }
1936            self.buffered.clear();
1937            self.buffered_idx = 0;
1938
1939            let proc = match ctx.procedures.get(&self.qualified_name) {
1940                Some(p) => p,
1941                None => {
1942                    return Err(Error::Procedure(format!(
1943                        "procedure '{}' not found",
1944                        self.qualified_name.join(".")
1945                    )));
1946                }
1947            };
1948            let projection = self.resolve_projection(proc)?;
1949
1950            let input_row = match &mut self.input {
1951                Some(inp) => match inp.next(ctx)? {
1952                    Some(r) => r,
1953                    None => return Ok(None),
1954                },
1955                None => {
1956                    if self.done {
1957                        return Ok(None);
1958                    }
1959                    self.done = true;
1960                    Row::new()
1961                }
1962            };
1963
1964            let mut produced = Vec::new();
1965            self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
1966            if produced.is_empty() {
1967                if self.input.is_some() {
1968                    continue;
1969                }
1970                return Ok(None);
1971            }
1972            self.buffered = produced;
1973        }
1974    }
1975}
1976
1977/// Pull a property map out of a value. Supports node / edge
1978/// (uses their live property map) and map values (parameter or
1979/// map literal). Null propagates as an empty map — `SET x = null`
1980/// is spec'd as a property clear / no-op and SET = <null-binding>
1981/// (from an unmatched OPTIONAL MATCH) matches that shape.
1982fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
1983    match v {
1984        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
1985        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
1986        Value::Map(pairs) => pairs
1987            .iter()
1988            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
1989            .collect(),
1990        Value::Property(Property::Map(entries)) => Ok(entries
1991            .iter()
1992            .map(|(k, p)| (k.clone(), p.clone()))
1993            .collect()),
1994        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
1995        _ => Err(Error::InvalidSetValue),
1996    }
1997}
1998
1999fn value_to_property(v: Value) -> Result<Property> {
2000    match v {
2001        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2002        Value::Property(p) => Ok(p),
2003        Value::Null => Ok(Property::Null),
2004        Value::List(items) => {
2005            let props: Vec<Property> = items
2006                .into_iter()
2007                .map(value_to_property)
2008                .collect::<Result<_>>()?;
2009            Ok(Property::List(props))
2010        }
2011        // Graph-aware `Value::Map` and graph elements can't be
2012        // stored as node / edge property values; SET will reject
2013        // them.
2014        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2015            Err(Error::InvalidSetValue)
2016        }
2017    }
2018}
2019
2020struct NodeScanAllOp {
2021    var: String,
2022    ids: Option<Vec<NodeId>>,
2023    cursor: usize,
2024}
2025
2026impl NodeScanAllOp {
2027    fn new(var: String) -> Self {
2028        Self {
2029            var,
2030            ids: None,
2031            cursor: 0,
2032        }
2033    }
2034}
2035
2036impl Operator for NodeScanAllOp {
2037    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2038        if self.ids.is_none() {
2039            self.ids = Some(ctx.store.all_node_ids()?);
2040        }
2041        let ids = self.ids.as_ref().unwrap();
2042        while self.cursor < ids.len() {
2043            let id = ids[self.cursor];
2044            self.cursor += 1;
2045            if let Some(node) = ctx.store.get_node(id)? {
2046                let mut row = Row::new();
2047                row.insert(self.var.clone(), Value::Node(node));
2048                return Ok(Some(row));
2049            }
2050        }
2051        Ok(None)
2052    }
2053}
2054
2055struct NodeScanByLabelsOp {
2056    var: String,
2057    labels: Vec<String>,
2058    ids: Option<Vec<NodeId>>,
2059    cursor: usize,
2060}
2061
2062impl NodeScanByLabelsOp {
2063    fn new(var: String, labels: Vec<String>) -> Self {
2064        Self {
2065            var,
2066            labels,
2067            ids: None,
2068            cursor: 0,
2069        }
2070    }
2071}
2072
2073impl Operator for NodeScanByLabelsOp {
2074    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2075        if self.ids.is_none() {
2076            // Use the first label for the index scan, filter the rest per-node.
2077            let primary = self
2078                .labels
2079                .first()
2080                .expect("NodeScanByLabels must have at least one label");
2081            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2082        }
2083        let ids = self.ids.as_ref().unwrap();
2084        while self.cursor < ids.len() {
2085            let id = ids[self.cursor];
2086            self.cursor += 1;
2087            if let Some(node) = ctx.store.get_node(id)? {
2088                if has_all_labels(&node, &self.labels) {
2089                    let mut row = Row::new();
2090                    row.insert(self.var.clone(), Value::Node(node));
2091                    return Ok(Some(row));
2092                }
2093            }
2094        }
2095        Ok(None)
2096    }
2097}
2098
2099fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2100    labels.iter().all(|l| node.labels.contains(l))
2101}
2102
2103/// Equality-lookup operator backed by a property index. Evaluates
2104/// the value expression lazily on the first `next()` — crucially
2105/// so parameters resolve against the per-query `ExecCtx::params`
2106/// map, not against a literal baked in at plan-construction time.
2107///
2108/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2109/// that slipped through) surface as `Error::InvalidSetValue`. The
2110/// planner only emits this op for indexes that do exist, so the
2111/// reader call should find a populated CF unless a concurrent DROP
2112/// raced us (in which case the result is just empty, which matches
2113/// how Neo4j's planner handles the race).
2114struct IndexSeekOp {
2115    var: String,
2116    label: String,
2117    property: String,
2118    value_expr: Expr,
2119    results: Option<Vec<NodeId>>,
2120    cursor: usize,
2121}
2122
2123impl IndexSeekOp {
2124    fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2125        Self {
2126            var,
2127            label,
2128            property,
2129            value_expr,
2130            results: None,
2131            cursor: 0,
2132        }
2133    }
2134}
2135
2136impl Operator for IndexSeekOp {
2137    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2138        if self.results.is_none() {
2139            let empty = Row::new();
2140            let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2141            let property = match value {
2142                Value::Property(p) => p,
2143                Value::Null => Property::Null,
2144                Value::Node(_)
2145                | Value::Edge(_)
2146                | Value::List(_)
2147                | Value::Map(_)
2148                | Value::Path { .. } => {
2149                    return Err(Error::InvalidSetValue);
2150                }
2151            };
2152            let ids = ctx
2153                .store
2154                .nodes_by_property(&self.label, &self.property, &property)?;
2155            self.results = Some(ids);
2156        }
2157        let ids = self.results.as_ref().unwrap();
2158        while self.cursor < ids.len() {
2159            let id = ids[self.cursor];
2160            self.cursor += 1;
2161            if let Some(node) = ctx.store.get_node(id)? {
2162                let mut row = Row::new();
2163                row.insert(self.var.clone(), Value::Node(node));
2164                return Ok(Some(row));
2165            }
2166        }
2167        Ok(None)
2168    }
2169}
2170
2171fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2172    props.iter().all(|(k, v)| {
2173        node.properties
2174            .get(k)
2175            .map(|stored| stored == v)
2176            .unwrap_or(false)
2177    })
2178}
2179
2180struct MergeNodeOp {
2181    var: String,
2182    labels: Vec<String>,
2183    /// Pattern property expressions as they came from the planner. These
2184    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
2185    /// isn't available until `next()` is called.
2186    properties: Vec<(String, Expr)>,
2187    /// `ON CREATE SET ...` assignments — applied only when the
2188    /// merge took the create branch. Evaluated against a row
2189    /// `{var → Node}` so the value expressions can reference the
2190    /// just-created node.
2191    on_create: Vec<SetAssignment>,
2192    /// `ON MATCH SET ...` assignments — applied to every matched
2193    /// node when the merge took the match branch. Same row shape
2194    /// as `on_create`.
2195    on_match: Vec<SetAssignment>,
2196    /// Optional upstream operator. `None` means this is a
2197    /// top-level producer (`MERGE (n) RETURN n`) and emits
2198    /// rows with a fresh empty base. `Some` means this is a
2199    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
2200    /// and each emitted row is a cross-join between an input
2201    /// row and a merge-result node.
2202    input: Option<Box<dyn Operator>>,
2203    /// Cached merge result. Populated on the first `next()`
2204    /// call by running the scan + maybe-create logic *once*,
2205    /// then reused for every input row. Running the merge
2206    /// exactly once sidesteps the read-after-write issue in
2207    /// buffered-writer mode (a node created by the first input
2208    /// row wouldn't be visible to a re-scan on the second).
2209    merged_nodes: Vec<Node>,
2210    /// Whether `merged_nodes` has been populated. The merge
2211    /// logic runs lazily on the first `next()` so
2212    /// `ExecCtx::params` is available.
2213    merge_done: bool,
2214    /// Current upstream row — held between calls while we drain
2215    /// `merged_nodes` against it. `None` for the top-level
2216    /// case (no input).
2217    current_input_row: Option<Row>,
2218    cursor: usize,
2219}
2220
2221impl MergeNodeOp {
2222    fn new(
2223        input: Option<Box<dyn Operator>>,
2224        var: String,
2225        labels: Vec<String>,
2226        properties: Vec<(String, Expr)>,
2227        on_create: Vec<SetAssignment>,
2228        on_match: Vec<SetAssignment>,
2229    ) -> Self {
2230        Self {
2231            var,
2232            labels,
2233            properties,
2234            on_create,
2235            on_match,
2236            input,
2237            merged_nodes: Vec::new(),
2238            merge_done: false,
2239            current_input_row: None,
2240            cursor: 0,
2241        }
2242    }
2243
2244    /// Run the MERGE logic exactly once: scan the store for
2245    /// existing matches, apply ON MATCH SET to each; or create
2246    /// a fresh node and apply ON CREATE SET; persist everything
2247    /// via `ctx.writer`; stash the resulting nodes in
2248    /// `self.merged_nodes`. Idempotent — subsequent calls are
2249    /// no-ops once `self.merge_done` is set.
2250    /// Resolve the pattern properties against `base`, scan the
2251    /// store, and either match existing nodes or create a fresh
2252    /// one. Returns the resulting node set — can be called
2253    /// multiple times with different `base` rows for
2254    /// input-driven merges.
2255    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2256        let resolved_props: Vec<(String, Property)> = self
2257            .properties
2258            .iter()
2259            .map(|(k, expr)| {
2260                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2261                Ok((k.clone(), value_to_property(v)?))
2262            })
2263            .collect::<Result<Vec<_>>>()?;
2264
2265        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2266            ctx.store.nodes_by_label(primary)?
2267        } else {
2268            ctx.store.all_node_ids()?
2269        };
2270        let mut merged_nodes: Vec<Node> = Vec::new();
2271        for id in candidate_ids {
2272            if let Some(node) = ctx.store.get_node(id)? {
2273                if has_all_labels(&node, &self.labels)
2274                    && matches_pattern_props(&node, &resolved_props)
2275                {
2276                    merged_nodes.push(node);
2277                }
2278            }
2279        }
2280
2281        if merged_nodes.is_empty() {
2282            let mut node = Node::new();
2283            for label in &self.labels {
2284                node.labels.push(label.clone());
2285            }
2286            for (k, prop) in resolved_props {
2287                node.properties.insert(k, prop);
2288            }
2289            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2290            ctx.writer.put_node(&node)?;
2291            merged_nodes.push(node);
2292        } else if !self.on_match.is_empty() {
2293            for node in merged_nodes.iter_mut() {
2294                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2295                ctx.writer.put_node(node)?;
2296            }
2297        }
2298        Ok(merged_nodes)
2299    }
2300}
2301
2302impl Operator for MergeNodeOp {
2303    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2304        // Top-level producer: no upstream context, so the pattern
2305        // properties can only reference literals / parameters.
2306        // Run the merge once against an empty row, then emit
2307        // each result.
2308        if self.input.is_none() {
2309            if !self.merge_done {
2310                let empty = Row::new();
2311                let nodes = self.run_merge_for(ctx, &empty)?;
2312                self.merged_nodes = nodes;
2313                self.merge_done = true;
2314            }
2315            if self.cursor < self.merged_nodes.len() {
2316                let node = self.merged_nodes[self.cursor].clone();
2317                self.cursor += 1;
2318                let mut row = Row::new();
2319                row.insert(self.var.clone(), Value::Node(node));
2320                return Ok(Some(row));
2321            }
2322            return Ok(None);
2323        }
2324
2325        // Input-driven case: evaluate pattern properties *per
2326        // input row* so references like `MERGE (:City {name:
2327        // person.bornIn})` resolve against the bound `person`.
2328        // Each incoming row produces its own merged-node set,
2329        // which is then cross-joined with that row before
2330        // emission.
2331        loop {
2332            if let Some(base) = self.current_input_row.as_ref() {
2333                if self.cursor < self.merged_nodes.len() {
2334                    let node = self.merged_nodes[self.cursor].clone();
2335                    self.cursor += 1;
2336                    let mut row = base.clone();
2337                    row.insert(self.var.clone(), Value::Node(node));
2338                    return Ok(Some(row));
2339                }
2340            }
2341            match self.input.as_mut().unwrap().next(ctx)? {
2342                None => return Ok(None),
2343                Some(row) => {
2344                    let nodes = self.run_merge_for(ctx, &row)?;
2345                    self.merged_nodes = nodes;
2346                    self.cursor = 0;
2347                    self.current_input_row = Some(row);
2348                }
2349            }
2350        }
2351    }
2352}
2353
2354/// Apply MERGE-conditional SET assignments (`ON CREATE` or
2355/// Find-or-create executor for edge MERGE
2356/// (`MERGE (a)-[r:KNOWS]->(b)`).
2357///
2358/// For every row pulled from `input`, looks up the `src_var`
2359/// and `dst_var` bindings (which must be `Value::Node` — the
2360/// planner enforces that they came from a prior MATCH or
2361/// MERGE), scans `src`'s outgoing edges, and either:
2362///
2363/// - Picks the first edge of type `edge_type` whose target is
2364///   `dst` and applies `on_match` to it, or
2365/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
2366///   `on_create`, and persists it via `ctx.writer.put_edge`.
2367///
2368/// Either way, the resulting edge is bound into `edge_var` in
2369/// the output row and the row is emitted. v1 restrictions:
2370/// single directed hop, both endpoints already bound,
2371/// explicit relationship type.
2372struct MergeEdgeOp {
2373    input: Box<dyn Operator>,
2374    edge_var: String,
2375    src_var: String,
2376    dst_var: String,
2377    edge_type: String,
2378    undirected: bool,
2379    /// Inline edge property filter from the MERGE pattern
2380    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
2381    /// the create branch stamps them onto the new edge.
2382    properties: Vec<(String, Expr)>,
2383    on_create: Vec<SetAssignment>,
2384    on_match: Vec<SetAssignment>,
2385    /// Rows buffered from the current input row's MERGE result —
2386    /// one per existing matched edge (or a single synthesized edge
2387    /// if the create branch fired). Drained before the next
2388    /// `input.next()` call so multi-match semantics stay correct:
2389    /// `MATCH (a:A),(b:B) MERGE (a)-[r:T]->(b)` against two pre-
2390    /// existing edges has to yield two rows, not one.
2391    pending: std::collections::VecDeque<Row>,
2392}
2393
2394impl MergeEdgeOp {
2395    #[allow(clippy::too_many_arguments)]
2396    fn new(
2397        input: Box<dyn Operator>,
2398        edge_var: String,
2399        src_var: String,
2400        dst_var: String,
2401        edge_type: String,
2402        undirected: bool,
2403        properties: Vec<(String, Expr)>,
2404        on_create: Vec<SetAssignment>,
2405        on_match: Vec<SetAssignment>,
2406    ) -> Self {
2407        Self {
2408            input,
2409            edge_var,
2410            src_var,
2411            dst_var,
2412            edge_type,
2413            undirected,
2414            properties,
2415            on_create,
2416            on_match,
2417            pending: std::collections::VecDeque::new(),
2418        }
2419    }
2420}
2421
2422impl Operator for MergeEdgeOp {
2423    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2424        loop {
2425            if let Some(row) = self.pending.pop_front() {
2426                return Ok(Some(row));
2427            }
2428            let Some(row) = self.input.next(ctx)? else {
2429                return Ok(None);
2430            };
2431            // Resolve src/dst. Both must be Value::Node — the
2432            // planner enforces that the variables came from an
2433            // earlier producer, so anything else is a bug or a
2434            // later-added feature that didn't update the check.
2435            let src_node = match row.get(&self.src_var) {
2436                Some(Value::Node(n)) => n.clone(),
2437                _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2438            };
2439            let dst_node = match row.get(&self.dst_var) {
2440                Some(Value::Node(n)) => n.clone(),
2441                _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2442            };
2443
2444            // Evaluate the inline edge property filter once per
2445            // input row. These are AST expressions so they can
2446            // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
2447            let required_props: Vec<(String, Property)> = self
2448                .properties
2449                .iter()
2450                .map(|(k, expr)| {
2451                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2452                    Ok((k.clone(), value_to_property(v)?))
2453                })
2454                .collect::<Result<Vec<_>>>()?;
2455            let edge_matches = |edge: &Edge| -> bool {
2456                required_props.iter().all(|(k, want)| {
2457                    edge.properties
2458                        .get(k)
2459                        .map(|have| have == want)
2460                        .unwrap_or(false)
2461                })
2462            };
2463
2464            // Collect every edge of type `edge_type` from src to
2465            // dst (and, for undirected patterns, dst to src) that
2466            // also satisfies the inline property filter. If any
2467            // exist we take the match branch and yield one row per
2468            // match. If none exist we synthesize one and yield a
2469            // single row from the create branch.
2470            let mut matched: Vec<Edge> = Vec::new();
2471            for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2472                if neighbor_id != dst_node.id {
2473                    continue;
2474                }
2475                if let Some(edge) = ctx.store.get_edge(edge_id)? {
2476                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
2477                        matched.push(edge);
2478                    }
2479                }
2480            }
2481            if self.undirected {
2482                for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2483                    if neighbor_id != dst_node.id {
2484                        continue;
2485                    }
2486                    if let Some(edge) = ctx.store.get_edge(edge_id)? {
2487                        if edge.edge_type == self.edge_type && edge_matches(&edge) {
2488                            matched.push(edge);
2489                        }
2490                    }
2491                }
2492            }
2493
2494            if matched.is_empty() {
2495                let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2496                for (k, p) in &required_props {
2497                    new_edge.properties.insert(k.clone(), p.clone());
2498                }
2499                let mut row_out = row.clone();
2500                apply_merge_edge_actions(
2501                    &mut new_edge,
2502                    &self.on_create,
2503                    &self.edge_var,
2504                    ctx,
2505                    &mut row_out,
2506                )?;
2507                ctx.writer.put_edge(&new_edge)?;
2508                row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2509                self.pending.push_back(row_out);
2510            } else {
2511                for mut existing in matched {
2512                    let mut row_out = row.clone();
2513                    if !self.on_match.is_empty() {
2514                        apply_merge_edge_actions(
2515                            &mut existing,
2516                            &self.on_match,
2517                            &self.edge_var,
2518                            ctx,
2519                            &mut row_out,
2520                        )?;
2521                        ctx.writer.put_edge(&existing)?;
2522                    }
2523                    row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2524                    self.pending.push_back(row_out);
2525                }
2526            }
2527        }
2528    }
2529}
2530
2531/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
2532/// each `SetAssignment` against the outer `row` (augmented with
2533/// the current edge binding) and mutates either the edge itself
2534/// or a non-edge target node from the outer row. MERGE's ON
2535/// CREATE / ON MATCH clauses are scoped against the whole input
2536/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
2537/// reach outside the edge-local binding. Non-edge mutations
2538/// are persisted via `ctx.writer.put_node` so the change is
2539/// visible to later clauses.
2540fn apply_merge_edge_actions(
2541    edge: &mut Edge,
2542    actions: &[SetAssignment],
2543    var: &str,
2544    exec_ctx: &ExecCtx,
2545    outer: &mut Row,
2546) -> Result<()> {
2547    if actions.is_empty() {
2548        return Ok(());
2549    }
2550    // Edge binding is live in `outer` while we evaluate — RHS can
2551    // reference both the edge and any sibling node binding.
2552    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2553    for action in actions {
2554        match action {
2555            SetAssignment::Property {
2556                var: target,
2557                key,
2558                value,
2559            } => {
2560                let sub_ctx = exec_ctx.eval_ctx(outer);
2561                let evaluated = eval_expr(value, &sub_ctx)?;
2562                let prop = value_to_property(evaluated)?;
2563                if target == var {
2564                    if matches!(prop, Property::Null) {
2565                        edge.properties.remove(key);
2566                    } else {
2567                        edge.properties.insert(key.clone(), prop);
2568                    }
2569                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2570                } else {
2571                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2572                }
2573            }
2574            SetAssignment::Merge {
2575                var: target,
2576                properties,
2577            } => {
2578                let sub_ctx = exec_ctx.eval_ctx(outer);
2579                let resolved: Vec<(String, Property)> = properties
2580                    .iter()
2581                    .map(|(k, expr)| {
2582                        let v = eval_expr(expr, &sub_ctx)?;
2583                        Ok((k.clone(), value_to_property(v)?))
2584                    })
2585                    .collect::<Result<Vec<_>>>()?;
2586                if target == var {
2587                    for (k, p) in resolved {
2588                        edge.properties.insert(k, p);
2589                    }
2590                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2591                } else {
2592                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2593                }
2594            }
2595            SetAssignment::Replace {
2596                var: target,
2597                properties,
2598            } => {
2599                let sub_ctx = exec_ctx.eval_ctx(outer);
2600                let resolved: Vec<(String, Property)> = properties
2601                    .iter()
2602                    .map(|(k, expr)| {
2603                        let v = eval_expr(expr, &sub_ctx)?;
2604                        Ok((k.clone(), value_to_property(v)?))
2605                    })
2606                    .collect::<Result<Vec<_>>>()?;
2607                if target == var {
2608                    edge.properties.clear();
2609                    for (k, p) in resolved {
2610                        edge.properties.insert(k, p);
2611                    }
2612                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2613                } else {
2614                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2615                }
2616            }
2617            SetAssignment::Labels {
2618                var: target,
2619                labels,
2620            } => {
2621                if target == var {
2622                    // Edges don't carry labels.
2623                    return Err(Error::UnboundVariable(target.clone()));
2624                }
2625                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2626            }
2627            SetAssignment::ReplaceFromExpr {
2628                var: target,
2629                source,
2630                replace,
2631            } => {
2632                let sub_ctx = exec_ctx.eval_ctx(outer);
2633                let v = eval_expr(source, &sub_ctx)?;
2634                let props = extract_property_map(&v)?;
2635                if target == var {
2636                    if *replace {
2637                        edge.properties.clear();
2638                    }
2639                    for (k, p) in props {
2640                        edge.properties.insert(k, p);
2641                    }
2642                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2643                } else {
2644                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2645                }
2646            }
2647        }
2648    }
2649    Ok(())
2650}
2651
2652/// Apply a single `SET target.key = prop` to a node or edge bound
2653/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
2654/// the target isn't the merge edge itself (the common case being
2655/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
2656fn apply_set_prop_to_outer(
2657    outer: &mut Row,
2658    exec_ctx: &ExecCtx,
2659    target: &str,
2660    key: &str,
2661    prop: Property,
2662) -> Result<()> {
2663    match outer.get_mut(target) {
2664        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2665            // SET on a null target (typically an unmatched OPTIONAL
2666            // MATCH binding) is a silent no-op in openCypher.
2667            return Ok(());
2668        }
2669        Some(Value::Node(n)) => {
2670            if matches!(prop, Property::Null) {
2671                n.properties.remove(key);
2672            } else {
2673                n.properties.insert(key.to_string(), prop);
2674            }
2675            exec_ctx.writer.put_node(n)?;
2676        }
2677        Some(Value::Edge(e)) => {
2678            if matches!(prop, Property::Null) {
2679                e.properties.remove(key);
2680            } else {
2681                e.properties.insert(key.to_string(), prop);
2682            }
2683            exec_ctx.writer.put_edge(e)?;
2684        }
2685        _ => return Err(Error::UnboundVariable(target.to_string())),
2686    }
2687    Ok(())
2688}
2689
2690/// Apply a property-map assignment (`SET target = {..}` when
2691/// `replace`, or `SET target += {..}` when not) to a node or
2692/// edge bound in the outer row.
2693fn apply_set_map_to_outer(
2694    outer: &mut Row,
2695    exec_ctx: &ExecCtx,
2696    target: &str,
2697    props: Vec<(String, Property)>,
2698    replace: bool,
2699) -> Result<()> {
2700    match outer.get_mut(target) {
2701        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2702        Some(Value::Node(n)) => {
2703            if replace {
2704                n.properties.clear();
2705            }
2706            for (k, p) in props {
2707                if replace || !matches!(p, Property::Null) {
2708                    n.properties.insert(k, p);
2709                } else {
2710                    n.properties.remove(&k);
2711                }
2712            }
2713            exec_ctx.writer.put_node(n)?;
2714            Ok(())
2715        }
2716        Some(Value::Edge(e)) => {
2717            if replace {
2718                e.properties.clear();
2719            }
2720            for (k, p) in props {
2721                if replace || !matches!(p, Property::Null) {
2722                    e.properties.insert(k, p);
2723                } else {
2724                    e.properties.remove(&k);
2725                }
2726            }
2727            exec_ctx.writer.put_edge(e)?;
2728            Ok(())
2729        }
2730        _ => Err(Error::UnboundVariable(target.to_string())),
2731    }
2732}
2733
2734/// Apply a labels assignment to a node bound in the outer row.
2735fn apply_set_labels_to_outer(
2736    outer: &mut Row,
2737    exec_ctx: &ExecCtx,
2738    target: &str,
2739    labels: &[String],
2740) -> Result<()> {
2741    match outer.get_mut(target) {
2742        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2743        Some(Value::Node(n)) => {
2744            for label in labels {
2745                if !n.labels.contains(label) {
2746                    n.labels.push(label.clone());
2747                }
2748            }
2749            exec_ctx.writer.put_node(n)?;
2750            Ok(())
2751        }
2752        _ => Err(Error::UnboundVariable(target.to_string())),
2753    }
2754}
2755
2756/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
2757/// dispatch but specialized to a single bound variable so we
2758/// don't have to materialize a full row dispatcher.
2759///
2760/// Value expressions are evaluated against a temporary row
2761/// `{var → Node(node.clone())}` so the RHS can reference the
2762/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
2763/// works the same as if the SET ran in a SetPropertyOp pipeline.
2764fn apply_merge_actions(
2765    node: &mut Node,
2766    actions: &[SetAssignment],
2767    var: &str,
2768    exec_ctx: &ExecCtx,
2769    base_row: &Row,
2770) -> Result<()> {
2771    if actions.is_empty() {
2772        return Ok(());
2773    }
2774    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
2775    // can resolve `other` — then overlay the merged node under `var`.
2776    let mut row = base_row.clone();
2777    row.insert(var.to_string(), Value::Node(node.clone()));
2778    for action in actions {
2779        let sub_ctx = exec_ctx.eval_ctx(&row);
2780        match action {
2781            SetAssignment::Property {
2782                var: target,
2783                key,
2784                value,
2785            } => {
2786                if target != var {
2787                    return Err(Error::UnboundVariable(target.clone()));
2788                }
2789                let evaluated = eval_expr(value, &sub_ctx)?;
2790                let prop = value_to_property(evaluated)?;
2791                node.properties.insert(key.clone(), prop);
2792                row.insert(var.to_string(), Value::Node(node.clone()));
2793            }
2794            SetAssignment::Labels {
2795                var: target,
2796                labels,
2797            } => {
2798                if target != var {
2799                    return Err(Error::UnboundVariable(target.clone()));
2800                }
2801                for label in labels {
2802                    if !node.labels.contains(label) {
2803                        node.labels.push(label.clone());
2804                    }
2805                }
2806                row.insert(var.to_string(), Value::Node(node.clone()));
2807            }
2808            SetAssignment::Replace {
2809                var: target,
2810                properties,
2811            } => {
2812                if target != var {
2813                    return Err(Error::UnboundVariable(target.clone()));
2814                }
2815                let resolved: Vec<(String, Property)> = properties
2816                    .iter()
2817                    .map(|(k, expr)| {
2818                        let v = eval_expr(expr, &sub_ctx)?;
2819                        Ok((k.clone(), value_to_property(v)?))
2820                    })
2821                    .collect::<Result<Vec<_>>>()?;
2822                node.properties.clear();
2823                for (k, p) in resolved {
2824                    node.properties.insert(k, p);
2825                }
2826                row.insert(var.to_string(), Value::Node(node.clone()));
2827            }
2828            SetAssignment::Merge {
2829                var: target,
2830                properties,
2831            } => {
2832                if target != var {
2833                    return Err(Error::UnboundVariable(target.clone()));
2834                }
2835                let resolved: Vec<(String, Property)> = properties
2836                    .iter()
2837                    .map(|(k, expr)| {
2838                        let v = eval_expr(expr, &sub_ctx)?;
2839                        Ok((k.clone(), value_to_property(v)?))
2840                    })
2841                    .collect::<Result<Vec<_>>>()?;
2842                for (k, p) in resolved {
2843                    node.properties.insert(k, p);
2844                }
2845                row.insert(var.to_string(), Value::Node(node.clone()));
2846            }
2847            SetAssignment::ReplaceFromExpr {
2848                var: target,
2849                source,
2850                replace,
2851            } => {
2852                if target != var {
2853                    return Err(Error::UnboundVariable(target.clone()));
2854                }
2855                let v = eval_expr(source, &sub_ctx)?;
2856                let props = extract_property_map(&v)?;
2857                if *replace {
2858                    node.properties.clear();
2859                }
2860                for (k, p) in props {
2861                    node.properties.insert(k, p);
2862                }
2863                row.insert(var.to_string(), Value::Node(node.clone()));
2864            }
2865        }
2866    }
2867    Ok(())
2868}
2869
2870struct EdgeExpandOp {
2871    input: Box<dyn Operator>,
2872    src_var: String,
2873    edge_var: Option<String>,
2874    dst_var: String,
2875    dst_labels: Vec<String>,
2876    edge_properties: Vec<(String, Expr)>,
2877    edge_types: Vec<String>,
2878    direction: Direction,
2879    /// When set, only the specific edge whose id matches the
2880    /// row-bound value counts as a match. Used by fresh-scan
2881    /// MATCH patterns that reuse an edge variable from a prior
2882    /// clause so the new hop stays an existence check instead of
2883    /// rebinding and clobbering the outer edge.
2884    edge_constraint_var: Option<String>,
2885    current_row: Option<Row>,
2886    pending: Vec<(EdgeId, NodeId)>,
2887    pending_idx: usize,
2888}
2889
2890impl EdgeExpandOp {
2891    #[allow(clippy::too_many_arguments)]
2892    fn new(
2893        input: Box<dyn Operator>,
2894        src_var: String,
2895        edge_var: Option<String>,
2896        dst_var: String,
2897        dst_labels: Vec<String>,
2898        edge_properties: Vec<(String, Expr)>,
2899        edge_types: Vec<String>,
2900        direction: Direction,
2901        edge_constraint_var: Option<String>,
2902    ) -> Self {
2903        Self {
2904            input,
2905            src_var,
2906            edge_var,
2907            dst_var,
2908            dst_labels,
2909            edge_properties,
2910            edge_types,
2911            direction,
2912            edge_constraint_var,
2913            current_row: None,
2914            pending: Vec::new(),
2915            pending_idx: 0,
2916        }
2917    }
2918}
2919
2920impl Operator for EdgeExpandOp {
2921    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2922        loop {
2923            while self.pending_idx < self.pending.len() {
2924                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
2925                self.pending_idx += 1;
2926
2927                let edge = match ctx.store.get_edge(edge_id)? {
2928                    Some(e) => e,
2929                    None => continue,
2930                };
2931                if !self.edge_types.is_empty()
2932                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
2933                {
2934                    continue;
2935                }
2936                // Pre-bound edge constraint: only accept the edge
2937                // whose id matches the row-bound value. Falls
2938                // through to outer scopes so a fresh right-side
2939                // scan of a CartesianProduct can still see the
2940                // bound edge from the left side. Non-edge / null
2941                // bindings trigger no matches at all — the
2942                // expansion yields nothing for that input row.
2943                if let Some(constraint_var) = &self.edge_constraint_var {
2944                    let base = self
2945                        .current_row
2946                        .as_ref()
2947                        .expect("pending edges without source row");
2948                    let expected = match ctx.lookup_binding(base, constraint_var) {
2949                        Some(Value::Edge(e)) => Some(e.id),
2950                        _ => None,
2951                    };
2952                    match expected {
2953                        Some(id) if id != edge.id => continue,
2954                        None => continue,
2955                        _ => {}
2956                    }
2957                }
2958                // Inline edge property filter: every (key, value)
2959                // must match the traversed edge's property of the
2960                // same name via `=` equality. Missing keys fail the
2961                // check (matching Neo4j).
2962                if !self.edge_properties.is_empty() {
2963                    let base = self
2964                        .current_row
2965                        .as_ref()
2966                        .expect("pending edges without source row");
2967                    let ectx = ctx.eval_ctx(base);
2968                    let mut ok = true;
2969                    for (key, expr) in &self.edge_properties {
2970                        let expected = eval_expr(expr, &ectx)?;
2971                        let actual = match edge.properties.get(key) {
2972                            Some(v) => Value::Property(v.clone()),
2973                            None => {
2974                                ok = false;
2975                                break;
2976                            }
2977                        };
2978                        if !values_equal(&actual, &expected) {
2979                            ok = false;
2980                            break;
2981                        }
2982                    }
2983                    if !ok {
2984                        continue;
2985                    }
2986                }
2987
2988                let neighbor = match ctx.store.get_node(neighbor_id)? {
2989                    Some(n) => n,
2990                    None => continue,
2991                };
2992                if !has_all_labels(&neighbor, &self.dst_labels) {
2993                    continue;
2994                }
2995
2996                let base = self
2997                    .current_row
2998                    .as_ref()
2999                    .expect("pending edges without source row");
3000                let mut out = base.clone();
3001                if let Some(ev) = &self.edge_var {
3002                    out.insert(ev.clone(), Value::Edge(edge));
3003                }
3004                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3005                return Ok(Some(out));
3006            }
3007
3008            match self.input.next(ctx)? {
3009                None => return Ok(None),
3010                Some(row) => {
3011                    let src_id = match row.get(&self.src_var) {
3012                        Some(Value::Node(n)) => n.id,
3013                        // A null source (e.g. from OPTIONAL MATCH
3014                        // that matched nothing) drops the input
3015                        // row — `MATCH (a)-->(b)` against a null
3016                        // `a` is just empty, not an error.
3017                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3018                            continue
3019                        }
3020                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3021                    };
3022                    self.pending = match self.direction {
3023                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3024                        Direction::Incoming => ctx.store.incoming(src_id)?,
3025                        Direction::Both => {
3026                            // For undirected traversal, a self-loop
3027                            // appears once as outgoing and once as
3028                            // incoming. The Cypher spec visits each
3029                            // physical edge once, so dedupe by
3030                            // edge_id before emitting.
3031                            let mut all = ctx.store.outgoing(src_id)?;
3032                            let mut seen: std::collections::HashSet<EdgeId> =
3033                                all.iter().map(|(e, _)| *e).collect();
3034                            for (e, n) in ctx.store.incoming(src_id)? {
3035                                if seen.insert(e) {
3036                                    all.push((e, n));
3037                                }
3038                            }
3039                            all
3040                        }
3041                    };
3042                    self.pending_idx = 0;
3043                    self.current_row = Some(row);
3044                }
3045            }
3046        }
3047    }
3048}
3049
3050/// Left-join variant of [`EdgeExpandOp`]. For each input row,
3051/// expands the adjacency in the configured direction and
3052/// filters by `edge_type` / `dst_labels` — if **any** neighbor
3053/// survives the filters, emits rows exactly like
3054/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
3055/// that carries the input row's bindings plus `edge_var` /
3056/// `dst_var` set to `Value::Null`, preserving the input row in
3057/// the output stream. This is the left-outer-join semantics
3058/// OPTIONAL MATCH needs.
3059///
3060/// Tracks per-input-row whether any output was produced so the
3061/// fallback Null row is only emitted after the pending buffer
3062/// drains without yielding anything. The `yielded_for_current`
3063/// flag is reset whenever a new input row is pulled.
3064struct OptionalEdgeExpandOp {
3065    input: Box<dyn Operator>,
3066    src_var: String,
3067    edge_var: Option<String>,
3068    dst_var: String,
3069    dst_labels: Vec<String>,
3070    dst_properties: Vec<(String, Expr)>,
3071    edge_types: Vec<String>,
3072    direction: Direction,
3073    /// When set, edges whose target id differs from the node
3074    /// bound at this variable in the current row are skipped
3075    /// inside the expansion loop. A row whose outgoing edges all
3076    /// fail the constraint then triggers the same null-fallback
3077    /// as having no edges at all.
3078    dst_constraint_var: Option<String>,
3079    /// When set, the expansion only considers the single edge
3080    /// whose id matches the edge already bound at this row
3081    /// variable — used when the OPTIONAL MATCH pattern reuses
3082    /// an edge variable from a prior clause.
3083    edge_constraint_var: Option<String>,
3084    current_row: Option<Row>,
3085    pending: Vec<(EdgeId, NodeId)>,
3086    pending_idx: usize,
3087    yielded_for_current: bool,
3088}
3089
3090impl OptionalEdgeExpandOp {
3091    #[allow(clippy::too_many_arguments)]
3092    fn new(
3093        input: Box<dyn Operator>,
3094        src_var: String,
3095        edge_var: Option<String>,
3096        dst_var: String,
3097        dst_labels: Vec<String>,
3098        dst_properties: Vec<(String, Expr)>,
3099        edge_types: Vec<String>,
3100        direction: Direction,
3101        dst_constraint_var: Option<String>,
3102        edge_constraint_var: Option<String>,
3103    ) -> Self {
3104        Self {
3105            input,
3106            src_var,
3107            edge_var,
3108            dst_var,
3109            dst_labels,
3110            dst_properties,
3111            edge_types,
3112            direction,
3113            dst_constraint_var,
3114            edge_constraint_var,
3115            current_row: None,
3116            pending: Vec::new(),
3117            pending_idx: 0,
3118            yielded_for_current: false,
3119        }
3120    }
3121}
3122
3123impl Operator for OptionalEdgeExpandOp {
3124    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3125        loop {
3126            while self.pending_idx < self.pending.len() {
3127                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3128                self.pending_idx += 1;
3129
3130                let edge = match ctx.store.get_edge(edge_id)? {
3131                    Some(e) => e,
3132                    None => continue,
3133                };
3134                if !self.edge_types.is_empty()
3135                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3136                {
3137                    continue;
3138                }
3139                // Pre-bound edge constraint: only the specific
3140                // edge whose id matches the row-bound value
3141                // counts as a match. Falls through to outer
3142                // scopes so the constraint works for fresh
3143                // scans on the right side of a CartesianProduct.
3144                if let Some(constraint_var) = &self.edge_constraint_var {
3145                    let base = self
3146                        .current_row
3147                        .as_ref()
3148                        .expect("pending without source row");
3149                    let expected = match ctx.lookup_binding(base, constraint_var) {
3150                        Some(Value::Edge(e)) => Some(e.id),
3151                        _ => None,
3152                    };
3153                    match expected {
3154                        Some(id) if id != edge.id => continue,
3155                        None => continue,
3156                        _ => {}
3157                    }
3158                }
3159
3160                let neighbor = match ctx.store.get_node(neighbor_id)? {
3161                    Some(n) => n,
3162                    None => continue,
3163                };
3164                if !has_all_labels(&neighbor, &self.dst_labels) {
3165                    continue;
3166                }
3167                // Bound-endpoint constraint: when the declared
3168                // target is already bound in the row, only edges
3169                // that lead to that exact node count as a match.
3170                // Edges failing the constraint are silently
3171                // skipped — if every candidate fails, the
3172                // per-row left-join fallback below still fires.
3173                if let Some(constraint_var) = &self.dst_constraint_var {
3174                    let base = self
3175                        .current_row
3176                        .as_ref()
3177                        .expect("pending without source row");
3178                    let bound_id = match base.get(constraint_var) {
3179                        Some(Value::Node(n)) => Some(n.id),
3180                        Some(Value::Null)
3181                        | Some(Value::Property(meshdb_core::Property::Null))
3182                        | None => None,
3183                        _ => None,
3184                    };
3185                    match bound_id {
3186                        Some(id) if id != neighbor.id => continue,
3187                        None => continue,
3188                        _ => {}
3189                    }
3190                }
3191                if !self.dst_properties.is_empty() {
3192                    let base = self
3193                        .current_row
3194                        .as_ref()
3195                        .expect("pending without source row");
3196                    let ectx = ctx.eval_ctx(base);
3197                    let mut props_ok = true;
3198                    for (key, expr) in &self.dst_properties {
3199                        let expected = eval_expr(expr, &ectx)?;
3200                        let actual = neighbor
3201                            .properties
3202                            .get(key)
3203                            .cloned()
3204                            .map(Value::Property)
3205                            .unwrap_or(Value::Null);
3206                        if !values_equal(&expected, &actual) {
3207                            props_ok = false;
3208                            break;
3209                        }
3210                    }
3211                    if !props_ok {
3212                        continue;
3213                    }
3214                }
3215
3216                let base = self
3217                    .current_row
3218                    .as_ref()
3219                    .expect("pending edges without source row");
3220                let mut out = base.clone();
3221                if let Some(ev) = &self.edge_var {
3222                    out.insert(ev.clone(), Value::Edge(edge));
3223                }
3224                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3225                self.yielded_for_current = true;
3226                return Ok(Some(out));
3227            }
3228
3229            // Pending drained for the current row. If nothing was
3230            // yielded, emit the left-join fallback: preserve the
3231            // input row with the optional variables set to Null.
3232            //
3233            // Exception: when an edge / dst constraint names a
3234            // variable that was pre-bound in the input row, the
3235            // fallback must NOT clobber that variable — the
3236            // constraint turns the expansion into an existence
3237            // check and the outer value should survive through.
3238            if let Some(base) = self.current_row.take() {
3239                if !self.yielded_for_current {
3240                    let mut out = base;
3241                    if let Some(ev) = &self.edge_var {
3242                        let preserve = self
3243                            .edge_constraint_var
3244                            .as_ref()
3245                            .map(|c| c == ev)
3246                            .unwrap_or(false);
3247                        if !preserve {
3248                            out.insert(ev.clone(), Value::Null);
3249                        }
3250                    }
3251                    let preserve_dst = self
3252                        .dst_constraint_var
3253                        .as_ref()
3254                        .map(|c| c == &self.dst_var)
3255                        .unwrap_or(false);
3256                    if !preserve_dst {
3257                        out.insert(self.dst_var.clone(), Value::Null);
3258                    }
3259                    self.yielded_for_current = true;
3260                    return Ok(Some(out));
3261                }
3262            }
3263
3264            match self.input.next(ctx)? {
3265                None => return Ok(None),
3266                Some(row) => {
3267                    let src_id = match row.get(&self.src_var) {
3268                        Some(Value::Node(n)) => n.id,
3269                        // src_var is Null (because a prior
3270                        // OPTIONAL MATCH chained before this one
3271                        // Null-bound it). Skip adjacency entirely
3272                        // and fall through to the fallback Null
3273                        // row so downstream clauses see the
3274                        // preserved input.
3275                        Some(Value::Null) => {
3276                            self.pending = Vec::new();
3277                            self.pending_idx = 0;
3278                            self.yielded_for_current = false;
3279                            self.current_row = Some(row);
3280                            continue;
3281                        }
3282                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3283                    };
3284                    self.pending = match self.direction {
3285                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3286                        Direction::Incoming => ctx.store.incoming(src_id)?,
3287                        Direction::Both => {
3288                            // For undirected traversal, a self-loop
3289                            // appears once as outgoing and once as
3290                            // incoming. The Cypher spec visits each
3291                            // physical edge once, so dedupe by
3292                            // edge_id before emitting.
3293                            let mut all = ctx.store.outgoing(src_id)?;
3294                            let mut seen: std::collections::HashSet<EdgeId> =
3295                                all.iter().map(|(e, _)| *e).collect();
3296                            for (e, n) in ctx.store.incoming(src_id)? {
3297                                if seen.insert(e) {
3298                                    all.push((e, n));
3299                                }
3300                            }
3301                            all
3302                        }
3303                    };
3304                    self.pending_idx = 0;
3305                    self.yielded_for_current = false;
3306                    self.current_row = Some(row);
3307                }
3308            }
3309        }
3310    }
3311}
3312
3313struct VarLengthExpandOp {
3314    input: Box<dyn Operator>,
3315    src_var: String,
3316    edge_var: Option<String>,
3317    dst_var: String,
3318    dst_labels: Vec<String>,
3319    edge_types: Vec<String>,
3320    /// Per-edge property filter — every edge along the walked
3321    /// path must have these `(key, value)` pairs. Mirrors the
3322    /// inline filter on `EdgeExpandOp`; applied during DFS so
3323    /// failing edges prune the branch instead of generating
3324    /// wrong-length results.
3325    edge_properties: Vec<(String, Expr)>,
3326    direction: Direction,
3327    min_hops: u64,
3328    max_hops: u64,
3329    path_var: Option<String>,
3330    /// Per-row left-join mode: when `true`, an input row that
3331    /// produces no matching paths still emits one row with the
3332    /// expansion's output vars bound to Null. Set for
3333    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
3334    /// even when the path search is empty.
3335    optional: bool,
3336    /// When set, paths whose terminal node id differs from the
3337    /// node bound at this variable in the current row are
3338    /// filtered out before counting as a match. Combined with
3339    /// `optional`, an input row whose candidate paths all miss
3340    /// the bound target triggers the null-fallback instead of
3341    /// silently dropping.
3342    dst_constraint_var: Option<String>,
3343    /// Replay mode: read the walked edge sequence from the list
3344    /// already bound at this row variable instead of doing DFS.
3345    /// Used by openCypher's "walk a pre-bound edge list" form.
3346    bound_edge_list_var: Option<String>,
3347    /// Row variables whose bound edges (or edge lists) must not
3348    /// appear in the walked path — enforces openCypher's
3349    /// relationship-uniqueness rule across hops within the same
3350    /// MATCH pattern. Each entry resolves against the current row
3351    /// (falling through to outer-scope rows) at run time; the
3352    /// union of every referenced edge id becomes the DFS
3353    /// exclusion set.
3354    excluded_edge_vars: Vec<String>,
3355    current_row: Option<Row>,
3356    pending_paths: Vec<Vec<Edge>>,
3357    pending_node_paths: Vec<Vec<NodeId>>,
3358    pending_targets: Vec<NodeId>,
3359    pending_idx: usize,
3360}
3361
3362impl VarLengthExpandOp {
3363    #[allow(clippy::too_many_arguments)]
3364    fn new(
3365        input: Box<dyn Operator>,
3366        src_var: String,
3367        edge_var: Option<String>,
3368        dst_var: String,
3369        dst_labels: Vec<String>,
3370        edge_types: Vec<String>,
3371        edge_properties: Vec<(String, Expr)>,
3372        direction: Direction,
3373        min_hops: u64,
3374        max_hops: u64,
3375        path_var: Option<String>,
3376        optional: bool,
3377        dst_constraint_var: Option<String>,
3378        bound_edge_list_var: Option<String>,
3379        excluded_edge_vars: Vec<String>,
3380    ) -> Self {
3381        Self {
3382            input,
3383            src_var,
3384            edge_var,
3385            dst_var,
3386            dst_labels,
3387            edge_types,
3388            edge_properties,
3389            direction,
3390            min_hops,
3391            max_hops,
3392            path_var,
3393            optional,
3394            dst_constraint_var,
3395            bound_edge_list_var,
3396            excluded_edge_vars,
3397            current_row: None,
3398            pending_paths: Vec::new(),
3399            pending_node_paths: Vec::new(),
3400            pending_targets: Vec::new(),
3401            pending_idx: 0,
3402        }
3403    }
3404
3405    fn enumerate(
3406        &self,
3407        ctx: &ExecCtx,
3408        start: NodeId,
3409        input_row: &Row,
3410    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3411        let mut paths: Vec<Vec<Edge>> = Vec::new();
3412        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3413        let mut targets: Vec<NodeId> = Vec::new();
3414        let mut edge_buf: Vec<Edge> = Vec::new();
3415        let mut node_buf: Vec<NodeId> = vec![start];
3416        // Seed `used` with edges from outer-scope bindings the
3417        // pattern flags as exclusions (e.g. the other hops'
3418        // edge / edge-list vars). A fresh walk can't revisit an
3419        // edge that's already bound as a relationship variable
3420        // elsewhere in the MATCH — that's openCypher's
3421        // relationship-uniqueness rule.
3422        let mut used: HashSet<EdgeId> = HashSet::new();
3423        for var in &self.excluded_edge_vars {
3424            match ctx.lookup_binding(input_row, var) {
3425                Some(Value::Edge(e)) => {
3426                    used.insert(e.id);
3427                }
3428                Some(Value::List(items)) => {
3429                    for item in items {
3430                        if let Value::Edge(e) = item {
3431                            used.insert(e.id);
3432                        }
3433                    }
3434                }
3435                _ => {}
3436            }
3437        }
3438        // Evaluate edge-property expected values once per input row
3439        // — they may reference row bindings or `$`-parameters, but
3440        // don't vary per walked edge.
3441        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3442            Vec::new()
3443        } else {
3444            let ectx = ctx.eval_ctx(input_row);
3445            self.edge_properties
3446                .iter()
3447                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3448                .collect::<Result<Vec<_>>>()?
3449        };
3450        self.dfs(
3451            ctx,
3452            start,
3453            &expected_edge_props,
3454            &mut edge_buf,
3455            &mut node_buf,
3456            &mut used,
3457            &mut paths,
3458            &mut node_paths,
3459            &mut targets,
3460        )?;
3461        Ok((paths, node_paths, targets))
3462    }
3463
3464    #[allow(clippy::too_many_arguments)]
3465    fn dfs(
3466        &self,
3467        ctx: &ExecCtx,
3468        current_node: NodeId,
3469        expected_edge_props: &[(String, Value)],
3470        edge_buf: &mut Vec<Edge>,
3471        node_buf: &mut Vec<NodeId>,
3472        used: &mut HashSet<EdgeId>,
3473        out_paths: &mut Vec<Vec<Edge>>,
3474        out_node_paths: &mut Vec<Vec<NodeId>>,
3475        out_targets: &mut Vec<NodeId>,
3476    ) -> Result<()> {
3477        let depth = edge_buf.len() as u64;
3478
3479        if depth >= self.min_hops && depth <= self.max_hops {
3480            let terminal_ok = match ctx.store.get_node(current_node)? {
3481                Some(node) => has_all_labels(&node, &self.dst_labels),
3482                None => false,
3483            };
3484            if terminal_ok {
3485                out_paths.push(edge_buf.clone());
3486                out_node_paths.push(node_buf.clone());
3487                out_targets.push(current_node);
3488            }
3489        }
3490
3491        if depth >= self.max_hops {
3492            return Ok(());
3493        }
3494
3495        let neighbors = match self.direction {
3496            Direction::Outgoing => ctx.store.outgoing(current_node)?,
3497            Direction::Incoming => ctx.store.incoming(current_node)?,
3498            Direction::Both => {
3499                let mut all = ctx.store.outgoing(current_node)?;
3500                all.extend(ctx.store.incoming(current_node)?);
3501                all
3502            }
3503        };
3504
3505        for (eid, neighbor_id) in neighbors {
3506            if used.contains(&eid) {
3507                continue;
3508            }
3509            let edge = match ctx.store.get_edge(eid)? {
3510                Some(e) => e,
3511                None => continue,
3512            };
3513            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3514            {
3515                continue;
3516            }
3517            // Inline edge property filter: skip edges whose
3518            // properties don't equal the expected values computed
3519            // per input row. A missing property fails the check,
3520            // matching `EdgeExpandOp`.
3521            if !expected_edge_props.is_empty() {
3522                let mut ok = true;
3523                for (key, expected) in expected_edge_props {
3524                    let actual = match edge.properties.get(key) {
3525                        Some(v) => Value::Property(v.clone()),
3526                        None => {
3527                            ok = false;
3528                            break;
3529                        }
3530                    };
3531                    if !values_equal(&actual, expected) {
3532                        ok = false;
3533                        break;
3534                    }
3535                }
3536                if !ok {
3537                    continue;
3538                }
3539            }
3540            used.insert(eid);
3541            edge_buf.push(edge);
3542            node_buf.push(neighbor_id);
3543            self.dfs(
3544                ctx,
3545                neighbor_id,
3546                expected_edge_props,
3547                edge_buf,
3548                node_buf,
3549                used,
3550                out_paths,
3551                out_node_paths,
3552                out_targets,
3553            )?;
3554            edge_buf.pop();
3555            node_buf.pop();
3556            used.remove(&eid);
3557        }
3558
3559        Ok(())
3560    }
3561}
3562
3563/// Replay the edge sequence stored at `list_var` in `row` as a
3564/// var-length walk starting from `src_id`. Returns `(paths,
3565/// node_paths, targets)` in the same shape `enumerate` produces
3566/// — either one entry when the list reads as a valid connected
3567/// walk in `direction`, or empty when it doesn't.
3568///
3569/// Validation per step:
3570/// * the list element is a `Value::Edge`,
3571/// * the edge's optional type filter matches `edge_types`,
3572/// * the edge actually touches the current node and proceeds to
3573///   the other endpoint in the requested direction (undirected
3574///   accepts either).
3575///
3576/// A null / missing / non-list value, a null source, or any
3577/// failed step all produce an empty result — which, when the
3578/// caller is in optional mode, triggers the left-join null
3579/// fallback.
3580fn replay_edge_list(
3581    ctx: &ExecCtx,
3582    row: &Row,
3583    list_var: &str,
3584    src_id: Option<NodeId>,
3585    direction: Direction,
3586    edge_types: &[String],
3587) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3588    let start = match src_id {
3589        Some(id) => id,
3590        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3591    };
3592    let list = match ctx.lookup_binding(row, list_var) {
3593        Some(Value::List(items)) => items.clone(),
3594        Some(Value::Property(meshdb_core::Property::List(items))) => items
3595            .iter()
3596            .cloned()
3597            .map(Value::Property)
3598            .collect::<Vec<_>>(),
3599        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3600    };
3601    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3602    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3603    node_buf.push(start);
3604    let mut current = start;
3605    for item in list {
3606        let edge = match item {
3607            Value::Edge(e) => e,
3608            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3609        };
3610        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3611            return Ok((Vec::new(), Vec::new(), Vec::new()));
3612        }
3613        let next_node = match direction {
3614            Direction::Outgoing => {
3615                if edge.source != current {
3616                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3617                }
3618                edge.target
3619            }
3620            Direction::Incoming => {
3621                if edge.target != current {
3622                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3623                }
3624                edge.source
3625            }
3626            Direction::Both => {
3627                if edge.source == current {
3628                    edge.target
3629                } else if edge.target == current {
3630                    edge.source
3631                } else {
3632                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3633                }
3634            }
3635        };
3636        // The stored edge shape should still exist in the graph;
3637        // a missing node mid-walk means the snapshot shifted and
3638        // the list is no longer valid.
3639        if ctx.store.get_node(next_node)?.is_none() {
3640            return Ok((Vec::new(), Vec::new(), Vec::new()));
3641        }
3642        edge_buf.push(edge);
3643        node_buf.push(next_node);
3644        current = next_node;
3645    }
3646    Ok((vec![edge_buf], vec![node_buf], vec![current]))
3647}
3648
3649impl Operator for VarLengthExpandOp {
3650    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3651        loop {
3652            while self.pending_idx < self.pending_paths.len() {
3653                let i = self.pending_idx;
3654                self.pending_idx += 1;
3655
3656                let target_id = self.pending_targets[i];
3657                let target = match ctx.store.get_node(target_id)? {
3658                    Some(n) => n,
3659                    None => continue,
3660                };
3661
3662                let base = self
3663                    .current_row
3664                    .as_ref()
3665                    .expect("pending without source row");
3666                let mut out = base.clone();
3667                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3668                if let Some(ev) = &self.edge_var {
3669                    let edges: Vec<Value> = self.pending_paths[i]
3670                        .iter()
3671                        .cloned()
3672                        .map(Value::Edge)
3673                        .collect();
3674                    out.insert(ev.clone(), Value::List(edges));
3675                }
3676                if let Some(pv) = &self.path_var {
3677                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3678                    for nid in &self.pending_node_paths[i] {
3679                        match ctx.store.get_node(*nid)? {
3680                            Some(n) => nodes.push(n),
3681                            None => continue,
3682                        }
3683                    }
3684                    let edges = self.pending_paths[i].clone();
3685                    out.insert(pv.clone(), Value::Path { nodes, edges });
3686                }
3687                return Ok(Some(out));
3688            }
3689
3690            match self.input.next(ctx)? {
3691                None => return Ok(None),
3692                Some(row) => {
3693                    let src_id = match row.get(&self.src_var) {
3694                        Some(Value::Node(n)) => Some(n.id),
3695                        // Null source → no paths. Same
3696                        // null-propagating semantics as
3697                        // `EdgeExpandOp`. In optional mode we
3698                        // still emit the left-join fallback;
3699                        // otherwise we just skip.
3700                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3701                            None
3702                        }
3703                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3704                    };
3705                    // Replay path: when the hop names an already-
3706                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
3707                    // `rs = [r1, r2]` from an earlier WITH), skip
3708                    // DFS entirely and verify the listed edges
3709                    // form a connected walk from `src_id` in the
3710                    // required direction. Produces at most one
3711                    // path — the exact list that was supplied.
3712                    let (mut paths, mut node_paths, mut targets) =
3713                        if let Some(list_var) = &self.bound_edge_list_var {
3714                            replay_edge_list(
3715                                ctx,
3716                                &row,
3717                                list_var,
3718                                src_id,
3719                                self.direction,
3720                                &self.edge_types,
3721                            )?
3722                        } else {
3723                            match src_id {
3724                                Some(id) => self.enumerate(ctx, id, &row)?,
3725                                None => (Vec::new(), Vec::new(), Vec::new()),
3726                            }
3727                        };
3728                    // Bound-endpoint constraint: drop paths whose
3729                    // terminal node doesn't match the node already
3730                    // bound at the constraint var. When combined
3731                    // with `optional`, an input whose paths are
3732                    // all filtered out still triggers the
3733                    // null-fallback below.
3734                    if let Some(constraint_var) = &self.dst_constraint_var {
3735                        let target_id = match row.get(constraint_var) {
3736                            Some(Value::Node(n)) => Some(n.id),
3737                            _ => None,
3738                        };
3739                        match target_id {
3740                            Some(id) => {
3741                                let mut kept_paths = Vec::new();
3742                                let mut kept_node_paths = Vec::new();
3743                                let mut kept_targets = Vec::new();
3744                                for ((p, np), t) in paths
3745                                    .drain(..)
3746                                    .zip(node_paths.drain(..))
3747                                    .zip(targets.drain(..))
3748                                {
3749                                    if t == id {
3750                                        kept_paths.push(p);
3751                                        kept_node_paths.push(np);
3752                                        kept_targets.push(t);
3753                                    }
3754                                }
3755                                paths = kept_paths;
3756                                node_paths = kept_node_paths;
3757                                targets = kept_targets;
3758                            }
3759                            None => {
3760                                paths.clear();
3761                                node_paths.clear();
3762                                targets.clear();
3763                            }
3764                        }
3765                    }
3766                    if paths.is_empty() && self.optional {
3767                        // Left-join fallback: emit one row with
3768                        // the expansion's output vars set to Null
3769                        // so the outer OPTIONAL MATCH preserves
3770                        // this input row.
3771                        let mut out = row;
3772                        if let Some(ev) = &self.edge_var {
3773                            out.insert(ev.clone(), Value::Null);
3774                        }
3775                        out.insert(self.dst_var.clone(), Value::Null);
3776                        if let Some(pv) = &self.path_var {
3777                            out.insert(pv.clone(), Value::Null);
3778                        }
3779                        return Ok(Some(out));
3780                    }
3781                    self.pending_paths = paths;
3782                    self.pending_node_paths = node_paths;
3783                    self.pending_targets = targets;
3784                    self.pending_idx = 0;
3785                    self.current_row = Some(row);
3786                }
3787            }
3788        }
3789    }
3790}
3791
3792struct FilterOp {
3793    input: Box<dyn Operator>,
3794    predicate: Expr,
3795}
3796
3797impl FilterOp {
3798    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
3799        Self { input, predicate }
3800    }
3801}
3802
3803impl Operator for FilterOp {
3804    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3805        while let Some(row) = self.input.next(ctx)? {
3806            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
3807                Ok(v) => v,
3808                // Type mismatches and non-boolean errors in filter predicates
3809                // are treated as false (row filtered out), not hard errors
3810                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
3811                Err(e) => return Err(e),
3812            };
3813            if to_bool(&v).unwrap_or(false) {
3814                return Ok(Some(row));
3815            }
3816        }
3817        Ok(None)
3818    }
3819}
3820
3821/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
3822/// row from the input unchanged.
3823struct IdentityOp {
3824    input: Box<dyn Operator>,
3825}
3826
3827impl IdentityOp {
3828    fn new(input: Box<dyn Operator>) -> Self {
3829        Self { input }
3830    }
3831}
3832
3833impl Operator for IdentityOp {
3834    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3835        self.input.next(ctx)
3836    }
3837}
3838
3839/// Passes input rows through; if the input produces zero rows,
3840/// emits exactly one row with `null_vars` bound to Value::Null.
3841/// Implements standalone OPTIONAL MATCH semantics (e.g.
3842/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
3843/// row with n=null rather than the empty result set).
3844struct CoalesceNullRowOp {
3845    input: Box<dyn Operator>,
3846    null_vars: Vec<String>,
3847    produced_any: bool,
3848    done: bool,
3849}
3850
3851impl CoalesceNullRowOp {
3852    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
3853        Self {
3854            input,
3855            null_vars,
3856            produced_any: false,
3857            done: false,
3858        }
3859    }
3860}
3861
3862impl Operator for CoalesceNullRowOp {
3863    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3864        if self.done {
3865            return Ok(None);
3866        }
3867        match self.input.next(ctx)? {
3868            Some(row) => {
3869                self.produced_any = true;
3870                Ok(Some(row))
3871            }
3872            None => {
3873                self.done = true;
3874                if self.produced_any {
3875                    Ok(None)
3876                } else {
3877                    let mut row = Row::new();
3878                    for v in &self.null_vars {
3879                        row.insert(v.clone(), Value::Null);
3880                    }
3881                    Ok(Some(row))
3882                }
3883            }
3884        }
3885    }
3886}
3887
3888struct ProjectOp {
3889    input: Box<dyn Operator>,
3890    items: Vec<ReturnItem>,
3891}
3892
3893impl ProjectOp {
3894    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
3895        Self { input, items }
3896    }
3897}
3898
3899impl Operator for ProjectOp {
3900    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3901        match self.input.next(ctx)? {
3902            Some(row) => {
3903                let mut out = Row::new();
3904                for (i, item) in self.items.iter().enumerate() {
3905                    let name = item.alias.clone().unwrap_or_else(|| {
3906                        item.raw_text
3907                            .clone()
3908                            .unwrap_or_else(|| default_name(&item.expr, i))
3909                    });
3910                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
3911                    out.insert(name, value);
3912                }
3913                Ok(Some(out))
3914            }
3915            None => Ok(None),
3916        }
3917    }
3918}
3919
3920fn default_name(expr: &Expr, idx: usize) -> String {
3921    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
3922}
3923
3924fn render_expr_name(expr: &Expr) -> Option<String> {
3925    Some(match expr {
3926        Expr::Identifier(s) => s.clone(),
3927        Expr::Property { var, key } => format!("{var}.{key}"),
3928        Expr::PropertyAccess { base, key } => {
3929            // Match the source syntax's parenthesisation of a
3930            // bracketed base: `(list[1]).k` round-trips, while a
3931            // plain identifier base stays bare (`a.b`).
3932            if matches!(
3933                base.as_ref(),
3934                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
3935            ) {
3936                format!("({}).{key}", render_expr_name(base)?)
3937            } else {
3938                format!("{}.{key}", render_expr_name(base)?)
3939            }
3940        }
3941        Expr::Parameter(name) => format!("${name}"),
3942        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
3943        Expr::Literal(Literal::Integer(i)) => i.to_string(),
3944        Expr::Literal(Literal::Float(f)) => f.to_string(),
3945        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
3946        Expr::Literal(Literal::Null) => "NULL".into(),
3947        Expr::Call { name, args } => {
3948            let arg_str = match args {
3949                CallArgs::Star => "*".into(),
3950                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
3951                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
3952                        "DISTINCT "
3953                    } else {
3954                        ""
3955                    };
3956                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
3957                    if inner.len() != es.len() {
3958                        return None;
3959                    }
3960                    format!("{prefix}{}", inner.join(", "))
3961                }
3962            };
3963            format!("{name}({arg_str})")
3964        }
3965        Expr::BinaryOp { op, left, right } => {
3966            let op_str = match op {
3967                BinaryOp::Add => " + ",
3968                BinaryOp::Sub => " - ",
3969                BinaryOp::Mul => " * ",
3970                BinaryOp::Div => " / ",
3971                BinaryOp::Mod => " % ",
3972                BinaryOp::Pow => " ^ ",
3973            };
3974            format!(
3975                "{}{op_str}{}",
3976                render_expr_name(left)?,
3977                render_expr_name(right)?
3978            )
3979        }
3980        Expr::UnaryOp { op, operand } => {
3981            let op_str = match op {
3982                UnaryOp::Neg => "-",
3983            };
3984            format!("{op_str}{}", render_expr_name(operand)?)
3985        }
3986        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
3987        Expr::IsNull { negated, inner } => {
3988            if *negated {
3989                format!("{} IS NOT NULL", render_expr_name(inner)?)
3990            } else {
3991                format!("{} IS NULL", render_expr_name(inner)?)
3992            }
3993        }
3994        Expr::Compare { op, left, right } => {
3995            let op_str = match op {
3996                CompareOp::Eq => " = ",
3997                CompareOp::Ne => " <> ",
3998                CompareOp::Lt => " < ",
3999                CompareOp::Le => " <= ",
4000                CompareOp::Gt => " > ",
4001                CompareOp::Ge => " >= ",
4002                CompareOp::StartsWith => " STARTS WITH ",
4003                CompareOp::EndsWith => " ENDS WITH ",
4004                CompareOp::Contains => " CONTAINS ",
4005                CompareOp::RegexMatch => " =~ ",
4006            };
4007            format!(
4008                "{}{op_str}{}",
4009                render_expr_name(left)?,
4010                render_expr_name(right)?
4011            )
4012        }
4013        Expr::List(items) => {
4014            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4015            if inner.len() != items.len() {
4016                return None;
4017            }
4018            format!("[{}]", inner.join(", "))
4019        }
4020        Expr::Map(entries) => {
4021            let inner: Vec<String> = entries
4022                .iter()
4023                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4024                .collect::<Option<Vec<_>>>()?;
4025            format!("{{{}}}", inner.join(", "))
4026        }
4027        Expr::IndexAccess { base, index } => {
4028            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4029        }
4030        Expr::InList { element, list } => {
4031            format!(
4032                "{} IN {}",
4033                render_expr_name(element)?,
4034                render_expr_name(list)?
4035            )
4036        }
4037        Expr::HasLabels { expr, labels } => {
4038            let mut s = format!("({}", render_expr_name(expr)?);
4039            for l in labels {
4040                s.push(':');
4041                s.push_str(l);
4042            }
4043            s.push(')');
4044            s
4045        }
4046        _ => return None,
4047    })
4048}
4049
4050struct DistinctOp {
4051    input: Box<dyn Operator>,
4052    seen: HashSet<String>,
4053}
4054
4055impl DistinctOp {
4056    fn new(input: Box<dyn Operator>) -> Self {
4057        Self {
4058            input,
4059            seen: HashSet::new(),
4060        }
4061    }
4062}
4063
4064impl Operator for DistinctOp {
4065    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4066        while let Some(row) = self.input.next(ctx)? {
4067            let key = row_key(&row);
4068            if self.seen.insert(key) {
4069                return Ok(Some(row));
4070            }
4071        }
4072        Ok(None)
4073    }
4074}
4075
4076/// Assemble a `Value::Path` from a row's already-bound node and
4077/// edge variables. Emitted by `plan_pattern` when the source
4078/// pattern carries `path_var`. The operator pulls `node_vars[i]`
4079/// and `edge_vars[i]` out of every row, walks them in order, and
4080/// inserts the result into the row under `path_var` before
4081/// forwarding downstream.
4082///
4083/// Rows where any referenced variable is missing or not the
4084/// expected Node/Edge shape get a `Value::Null` at `path_var` —
4085/// matching how `OPTIONAL MATCH` flows null bindings through the
4086/// downstream projection without hard-erroring. Missing bindings
4087/// shouldn't normally happen (the planner fills in synthetic
4088/// names via `ensure_path_bindings`), but the null fallback
4089/// means an unexpected upstream shape degrades gracefully
4090/// instead of crashing the whole query.
4091struct BindPathOp {
4092    input: Box<dyn Operator>,
4093    path_var: String,
4094    node_vars: Vec<String>,
4095    edge_vars: Vec<String>,
4096}
4097
4098impl BindPathOp {
4099    fn new(
4100        input: Box<dyn Operator>,
4101        path_var: String,
4102        node_vars: Vec<String>,
4103        edge_vars: Vec<String>,
4104    ) -> Self {
4105        Self {
4106            input,
4107            path_var,
4108            node_vars,
4109            edge_vars,
4110        }
4111    }
4112}
4113
4114impl Operator for BindPathOp {
4115    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4116        let Some(mut row) = self.input.next(ctx)? else {
4117            return Ok(None);
4118        };
4119        // Extract the ordered node + edge sequence from the row.
4120        // Any missing or wrong-shaped entry collapses the whole
4121        // path binding to null and falls through.
4122        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4123        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4124        let mut abort = false;
4125        // Interleave node/edge vars. For each hop i:
4126        //   node_vars[i] = start/intermediate node
4127        //   edge_vars[i] = edge (or sub-path for var-length)
4128        //   node_vars[i+1] = target node
4129        // For var-length hops, edge_vars[i] may contain a
4130        // Value::Path — splice its interior into the running path.
4131        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4132            nodes.push(n.clone());
4133        } else {
4134            abort = true;
4135        }
4136        if !abort {
4137            for (i, ev) in self.edge_vars.iter().enumerate() {
4138                match row.get(ev) {
4139                    Some(Value::Edge(e)) => {
4140                        edges.push(e.clone());
4141                        match row.get(&self.node_vars[i + 1]) {
4142                            Some(Value::Node(n)) => nodes.push(n.clone()),
4143                            _ => {
4144                                abort = true;
4145                                break;
4146                            }
4147                        }
4148                    }
4149                    Some(Value::Path {
4150                        nodes: sub_nodes,
4151                        edges: sub_edges,
4152                    }) => {
4153                        // Splice the sub-path. The sub-path's first
4154                        // node is the same as nodes.last() (already
4155                        // pushed), so skip it. All sub-edges go in.
4156                        // Sub-path interior nodes go in. The sub-path's
4157                        // last node is the target for this hop.
4158                        edges.extend(sub_edges.iter().cloned());
4159                        if sub_nodes.len() > 1 {
4160                            nodes.extend(sub_nodes[1..].iter().cloned());
4161                        }
4162                    }
4163                    _ => {
4164                        abort = true;
4165                        break;
4166                    }
4167                }
4168            }
4169        }
4170        if abort {
4171            row.insert(self.path_var.clone(), Value::Null);
4172        } else {
4173            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4174        }
4175        Ok(Some(row))
4176    }
4177}
4178
4179/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
4180/// row (which must already bind both `src_var` and `dst_var`
4181/// to `Value::Node`), runs a breadth-first search from the
4182/// source node toward the target, filtering edges by
4183/// `edge_type` and walking up to `max_hops` steps. Emits one
4184/// row per successful search with `path_var` set to a
4185/// `Value::Path` carrying the traversed node/edge sequence;
4186/// rows where BFS finds no path are dropped entirely (matching
4187/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
4188///
4189/// The BFS uses classic parent-pointer reconstruction: each
4190/// visited node's `(parent_node_id, edge_id)` pair is stored
4191/// in a hashmap, and once the target is reached we walk back
4192/// to the source, reversing the accumulated edges to produce
4193/// a forward-order path. Cycle detection is a side-effect of
4194/// the visited set — the first time BFS sees a node is
4195/// necessarily via a shortest path, so later visits are
4196/// ignored.
4197struct ShortestPathOp {
4198    input: Box<dyn Operator>,
4199    src_var: String,
4200    dst_var: String,
4201    path_var: String,
4202    edge_types: Vec<String>,
4203    direction: meshdb_cypher::Direction,
4204    max_hops: u64,
4205    kind: meshdb_cypher::ShortestKind,
4206    /// Buffered `(base_row, path)` pairs from the current
4207    /// input row, used only by `AllShortest` mode where a
4208    /// single input can expand into multiple shortest paths
4209    /// of the same length. Each call to `next` drains one
4210    /// entry before pulling a fresh input row; in `Shortest`
4211    /// mode the buffer is always empty or has one element.
4212    pending: std::collections::VecDeque<(Row, Value)>,
4213}
4214
4215impl ShortestPathOp {
4216    #[allow(clippy::too_many_arguments)]
4217    fn new(
4218        input: Box<dyn Operator>,
4219        src_var: String,
4220        dst_var: String,
4221        path_var: String,
4222        edge_types: Vec<String>,
4223        direction: meshdb_cypher::Direction,
4224        max_hops: u64,
4225        kind: meshdb_cypher::ShortestKind,
4226    ) -> Self {
4227        Self {
4228            input,
4229            src_var,
4230            dst_var,
4231            path_var,
4232            edge_types,
4233            direction,
4234            max_hops,
4235            kind,
4236            pending: std::collections::VecDeque::new(),
4237        }
4238    }
4239}
4240
4241impl Operator for ShortestPathOp {
4242    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4243        loop {
4244            // Drain buffered paths from a prior input row
4245            // before pulling the next one. Only reachable in
4246            // `AllShortest` mode; `Shortest` mode's buffer
4247            // never accumulates more than one entry.
4248            if let Some((mut row, path)) = self.pending.pop_front() {
4249                row.insert(self.path_var.clone(), path);
4250                return Ok(Some(row));
4251            }
4252            let Some(row) = self.input.next(ctx)? else {
4253                return Ok(None);
4254            };
4255            let src = match row.get(&self.src_var) {
4256                Some(Value::Node(n)) => n.clone(),
4257                _ => continue,
4258            };
4259            let dst = match row.get(&self.dst_var) {
4260                Some(Value::Node(n)) => n.clone(),
4261                _ => continue,
4262            };
4263            let paths = bfs_shortest_paths(
4264                &src,
4265                &dst,
4266                &self.edge_types,
4267                self.direction,
4268                self.max_hops,
4269                self.kind,
4270                ctx.store,
4271            )?;
4272            if paths.is_empty() {
4273                // No path of length ≤ max_hops — drop the row.
4274                continue;
4275            }
4276            for path in paths {
4277                self.pending.push_back((row.clone(), path));
4278            }
4279        }
4280    }
4281}
4282
4283/// Layered breadth-first search from `src` toward `dst`,
4284/// constrained by edge type, direction, and max hop count.
4285/// Returns every path of minimum length when `kind` is
4286/// `AllShortest`, or just the first discovered shortest path
4287/// when `kind` is `Shortest`. An empty vector means no path
4288/// of length ≤ `max_hops` exists.
4289///
4290/// `src == dst` is a zero-hop special case that always
4291/// returns a singleton path regardless of `max_hops`.
4292///
4293/// The algorithm builds a parent DAG as it expands each
4294/// level: every node that was first reached at depth `d+1`
4295/// records every `(parent, edge)` pair from frontier[d] that
4296/// reaches it, not just the first one. This lets the
4297/// reconstruction walk enumerate all source-to-target paths
4298/// of the discovered shortest length. For `Shortest` mode
4299/// the reconstruction short-circuits on the first complete
4300/// path.
4301fn bfs_shortest_paths(
4302    src: &Node,
4303    dst: &Node,
4304    edge_types: &[String],
4305    direction: meshdb_cypher::Direction,
4306    max_hops: u64,
4307    kind: meshdb_cypher::ShortestKind,
4308    reader: &dyn crate::reader::GraphReader,
4309) -> Result<Vec<Value>> {
4310    use meshdb_cypher::Direction;
4311
4312    if src.id == dst.id {
4313        return Ok(vec![Value::Path {
4314            nodes: vec![src.clone()],
4315            edges: vec![],
4316        }]);
4317    }
4318
4319    // `dist` tracks the shortest distance from `src` to each
4320    // reached node. `parents` maps each reached node id to
4321    // every `(parent_id, edge_id)` pair that reached it at
4322    // that shortest distance — a node may have multiple
4323    // parents in the parent DAG for `AllShortest`.
4324    let mut dist: HashMap<NodeId, u64> = HashMap::new();
4325    dist.insert(src.id, 0);
4326    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4327
4328    let mut frontier: Vec<NodeId> = vec![src.id];
4329    let mut depth: u64 = 0;
4330    let mut found = false;
4331
4332    while !frontier.is_empty() && depth < max_hops && !found {
4333        let mut next_frontier: Vec<NodeId> = Vec::new();
4334        for node_id in &frontier {
4335            let neighbors = match direction {
4336                Direction::Outgoing => reader.outgoing(*node_id)?,
4337                Direction::Incoming => reader.incoming(*node_id)?,
4338                Direction::Both => {
4339                    let mut out = reader.outgoing(*node_id)?;
4340                    out.extend(reader.incoming(*node_id)?);
4341                    out
4342                }
4343            };
4344            for (edge_id, neighbor_id) in neighbors {
4345                // Edge-type filter. Only fetch the edge record
4346                // when a type constraint is present.
4347                if !edge_types.is_empty() {
4348                    let edge = match reader.get_edge(edge_id)? {
4349                        Some(e) => e,
4350                        None => continue,
4351                    };
4352                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
4353                        continue;
4354                    }
4355                }
4356                match dist.get(&neighbor_id) {
4357                    Some(&d) if d == depth + 1 => {
4358                        // Alternate parent at the same
4359                        // shortest-path level — record the
4360                        // additional edge so the reconstruction
4361                        // can enumerate it, but don't re-add to
4362                        // the next frontier.
4363                        parents
4364                            .entry(neighbor_id)
4365                            .or_default()
4366                            .push((*node_id, edge_id));
4367                    }
4368                    Some(_) => {
4369                        // Already reached at a strictly shorter
4370                        // depth; this edge isn't on a shortest
4371                        // path.
4372                    }
4373                    None => {
4374                        dist.insert(neighbor_id, depth + 1);
4375                        parents
4376                            .entry(neighbor_id)
4377                            .or_default()
4378                            .push((*node_id, edge_id));
4379                        if neighbor_id == dst.id {
4380                            found = true;
4381                        } else {
4382                            next_frontier.push(neighbor_id);
4383                        }
4384                    }
4385                }
4386            }
4387        }
4388        depth += 1;
4389        if !found {
4390            frontier = next_frontier;
4391        }
4392    }
4393
4394    if !found {
4395        return Ok(Vec::new());
4396    }
4397
4398    // Enumerate all shortest paths from the parent DAG.
4399    // `Shortest` mode short-circuits after the first complete
4400    // walk; `AllShortest` collects every distinct path.
4401    let mut out: Vec<Value> = Vec::new();
4402    let mut nodes_rev: Vec<Node> = Vec::new();
4403    let mut edges_rev: Vec<Edge> = Vec::new();
4404    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4405    collect_shortest_paths(
4406        src,
4407        dst,
4408        &parents,
4409        reader,
4410        &mut nodes_rev,
4411        &mut edges_rev,
4412        &mut out,
4413        only_first,
4414    )?;
4415    Ok(out)
4416}
4417
4418/// Depth-first walk through the parent DAG built by
4419/// `bfs_shortest_paths`, collecting every source-to-target
4420/// path of the BFS-determined shortest length. The recursion
4421/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
4422/// representing the partially-reconstructed path in reverse
4423/// order; on reaching `src` we copy the reversed accumulators
4424/// into a forward-order `Value::Path` and push it into `out`.
4425///
4426/// `only_first = true` short-circuits after the first
4427/// complete path, giving `Shortest` mode the optimal-case
4428/// early exit.
4429#[allow(clippy::too_many_arguments)]
4430fn collect_shortest_paths(
4431    src: &Node,
4432    current: &Node,
4433    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4434    reader: &dyn crate::reader::GraphReader,
4435    nodes_rev: &mut Vec<Node>,
4436    edges_rev: &mut Vec<Edge>,
4437    out: &mut Vec<Value>,
4438    only_first: bool,
4439) -> Result<()> {
4440    if current.id == src.id {
4441        // Complete walk: `nodes_rev` holds dst, ..., (last
4442        // node before src) in reverse; prepend src and
4443        // reverse to produce the forward-order node list.
4444        // Edges are similarly reversed.
4445        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4446        nodes.push(src.clone());
4447        nodes.extend(nodes_rev.iter().rev().cloned());
4448        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4449        out.push(Value::Path { nodes, edges });
4450        return Ok(());
4451    }
4452    let Some(parent_edges) = parents.get(&current.id) else {
4453        // Unreachable in normal flow — BFS only inserts dst
4454        // into `parents` when it found at least one incoming
4455        // edge — but handled defensively.
4456        return Ok(());
4457    };
4458    for (parent_id, edge_id) in parent_edges {
4459        if only_first && !out.is_empty() {
4460            return Ok(());
4461        }
4462        let edge = reader
4463            .get_edge(*edge_id)?
4464            .expect("BFS inserted this edge id; it must still exist");
4465        let parent_node = reader
4466            .get_node(*parent_id)?
4467            .expect("BFS visited this node id; it must still exist");
4468        nodes_rev.push(current.clone());
4469        edges_rev.push(edge);
4470        collect_shortest_paths(
4471            src,
4472            &parent_node,
4473            parents,
4474            reader,
4475            nodes_rev,
4476            edges_rev,
4477            out,
4478            only_first,
4479        )?;
4480        nodes_rev.pop();
4481        edges_rev.pop();
4482    }
4483    Ok(())
4484}
4485
4486/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
4487/// its rows through. For plain `UNION` (`all = false`) we
4488/// deduplicate across the combined stream using the same
4489/// `row_key` the `DistinctOp` uses, so the semantics match
4490/// Neo4j's "set union" shape regardless of whether duplicates
4491/// sit inside a single branch or straddle branches. For
4492/// `UNION ALL` the `seen` set stays `None` and every produced
4493/// row is forwarded as-is.
4494struct UnionOp {
4495    branches: Vec<Box<dyn Operator>>,
4496    current: usize,
4497    seen: Option<HashSet<String>>,
4498}
4499
4500impl UnionOp {
4501    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4502        Self {
4503            branches,
4504            current: 0,
4505            seen: if all { None } else { Some(HashSet::new()) },
4506        }
4507    }
4508}
4509
4510impl Operator for UnionOp {
4511    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4512        while self.current < self.branches.len() {
4513            match self.branches[self.current].next(ctx)? {
4514                Some(row) => {
4515                    if let Some(seen) = self.seen.as_mut() {
4516                        let key = row_key(&row);
4517                        if !seen.insert(key) {
4518                            continue;
4519                        }
4520                    }
4521                    return Ok(Some(row));
4522                }
4523                None => {
4524                    self.current += 1;
4525                }
4526            }
4527        }
4528        Ok(None)
4529    }
4530}
4531
4532struct OrderByOp {
4533    input: Box<dyn Operator>,
4534    sort_items: Vec<SortItem>,
4535    sorted: Option<Vec<Row>>,
4536    cursor: usize,
4537}
4538
4539impl OrderByOp {
4540    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4541        Self {
4542            input,
4543            sort_items,
4544            sorted: None,
4545            cursor: 0,
4546        }
4547    }
4548}
4549
4550impl Operator for OrderByOp {
4551    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4552        if self.sorted.is_none() {
4553            let mut rows: Vec<Row> = Vec::new();
4554            while let Some(row) = self.input.next(ctx)? {
4555                rows.push(row);
4556            }
4557            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4558            for row in rows {
4559                let mut keys = Vec::with_capacity(self.sort_items.len());
4560                for item in &self.sort_items {
4561                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4562                }
4563                keyed.push((keys, row));
4564            }
4565            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4566            keyed.sort_by(|a, b| {
4567                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4568                    let ord = compare_values(va, vb);
4569                    let ord = if descs[i] { ord.reverse() } else { ord };
4570                    if ord != Ordering::Equal {
4571                        return ord;
4572                    }
4573                }
4574                Ordering::Equal
4575            });
4576            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4577        }
4578        let rows = self.sorted.as_ref().unwrap();
4579        if self.cursor < rows.len() {
4580            let row = rows[self.cursor].clone();
4581            self.cursor += 1;
4582            Ok(Some(row))
4583        } else {
4584            Ok(None)
4585        }
4586    }
4587}
4588
4589struct AggregateOp {
4590    input: Box<dyn Operator>,
4591    group_keys: Vec<ReturnItem>,
4592    aggregates: Vec<AggregateSpec>,
4593    results: Option<Vec<Row>>,
4594    cursor: usize,
4595}
4596
4597impl AggregateOp {
4598    fn new(
4599        input: Box<dyn Operator>,
4600        group_keys: Vec<ReturnItem>,
4601        aggregates: Vec<AggregateSpec>,
4602    ) -> Self {
4603        Self {
4604            input,
4605            group_keys,
4606            aggregates,
4607            results: None,
4608            cursor: 0,
4609        }
4610    }
4611
4612    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4613        let mut groups: HashMap<String, GroupState> = HashMap::new();
4614        let mut order: Vec<String> = Vec::new();
4615
4616        // If there are no input rows AND no group keys, we still emit one row
4617        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
4618        let mut saw_any = false;
4619
4620        while let Some(row) = self.input.next(ctx)? {
4621            saw_any = true;
4622            let mut key_values = Vec::with_capacity(self.group_keys.len());
4623            for item in &self.group_keys {
4624                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4625            }
4626            let mut hash_key = String::new();
4627            for v in &key_values {
4628                hash_key.push_str(&value_key(v));
4629                hash_key.push('|');
4630            }
4631            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4632                order.push(hash_key.clone());
4633                GroupState {
4634                    key_values: key_values.clone(),
4635                    agg_states: self
4636                        .aggregates
4637                        .iter()
4638                        .map(|a| AggState::initial(a.function))
4639                        .collect(),
4640                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4641                }
4642            });
4643            for (i, spec) in self.aggregates.iter().enumerate() {
4644                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4645                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4646                    if matches!(v, Value::Null) {
4647                        continue;
4648                    }
4649                    let key = value_key(&v);
4650                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4651                    if !seen.insert(key) {
4652                        continue;
4653                    }
4654                }
4655                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4656                // The percentile is a constant expression — evaluate
4657                // it once against the first row we see and stash it
4658                // in the state so finalize() has a number to use.
4659                if let Some(extra_expr) = &spec.extra_arg {
4660                    let need_resolve = matches!(
4661                        &entry.agg_states[i],
4662                        AggState::PercentileDisc {
4663                            percentile: None,
4664                            ..
4665                        } | AggState::PercentileCont {
4666                            percentile: None,
4667                            ..
4668                        }
4669                    );
4670                    if need_resolve {
4671                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4672                        let p = match pv {
4673                            Value::Property(Property::Float64(f)) => f,
4674                            Value::Property(Property::Int64(i)) => i as f64,
4675                            _ => 0.0,
4676                        };
4677                        // openCypher requires the percentile to
4678                        // lie in [0.0, 1.0]; anything else is an
4679                        // `ArgumentError: NumberOutOfRange`.
4680                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
4681                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
4682                        }
4683                        match &mut entry.agg_states[i] {
4684                            AggState::PercentileDisc { percentile, .. }
4685                            | AggState::PercentileCont { percentile, .. } => {
4686                                *percentile = Some(p);
4687                            }
4688                            _ => {}
4689                        }
4690                    }
4691                }
4692            }
4693        }
4694
4695        let mut out = Vec::new();
4696        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4697            // Empty group, single aggregate row
4698            let mut row = Row::new();
4699            for spec in &self.aggregates {
4700                row.insert(
4701                    spec.alias.clone(),
4702                    AggState::initial(spec.function).finalize(),
4703                );
4704            }
4705            out.push(row);
4706        } else {
4707            for key in order {
4708                let state = groups.remove(&key).unwrap();
4709                let mut row = Row::new();
4710                for (i, item) in self.group_keys.iter().enumerate() {
4711                    let name = item
4712                        .alias
4713                        .clone()
4714                        .unwrap_or_else(|| default_name(&item.expr, i));
4715                    row.insert(name, state.key_values[i].clone());
4716                }
4717                for (i, spec) in self.aggregates.iter().enumerate() {
4718                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4719                }
4720                out.push(row);
4721            }
4722        }
4723        self.results = Some(out);
4724        Ok(())
4725    }
4726}
4727
4728impl Operator for AggregateOp {
4729    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4730        if self.results.is_none() {
4731            self.compute(ctx)?;
4732        }
4733        let rows = self.results.as_ref().unwrap();
4734        if self.cursor < rows.len() {
4735            let row = rows[self.cursor].clone();
4736            self.cursor += 1;
4737            Ok(Some(row))
4738        } else {
4739            Ok(None)
4740        }
4741    }
4742}
4743
4744struct GroupState {
4745    key_values: Vec<Value>,
4746    agg_states: Vec<AggState>,
4747    distinct_seen: Vec<Option<HashSet<String>>>,
4748}
4749
4750enum AggState {
4751    Count(i64),
4752    Sum {
4753        int_part: i64,
4754        float_part: f64,
4755        is_float: bool,
4756    },
4757    Avg {
4758        total: f64,
4759        count: i64,
4760    },
4761    Min(Option<Value>),
4762    Max(Option<Value>),
4763    Collect(Vec<Value>),
4764    StDev {
4765        sum: f64,
4766        sum_sq: f64,
4767        count: i64,
4768    },
4769    StDevP {
4770        sum: f64,
4771        sum_sq: f64,
4772        count: i64,
4773    },
4774    PercentileDisc {
4775        items: Vec<Value>,
4776        percentile: Option<f64>,
4777    },
4778    PercentileCont {
4779        items: Vec<Value>,
4780        percentile: Option<f64>,
4781    },
4782}
4783
4784impl AggState {
4785    fn initial(func: AggregateFn) -> Self {
4786        match func {
4787            AggregateFn::Count => AggState::Count(0),
4788            AggregateFn::Sum => AggState::Sum {
4789                int_part: 0,
4790                float_part: 0.0,
4791                is_float: false,
4792            },
4793            AggregateFn::Avg => AggState::Avg {
4794                total: 0.0,
4795                count: 0,
4796            },
4797            AggregateFn::Min => AggState::Min(None),
4798            AggregateFn::Max => AggState::Max(None),
4799            AggregateFn::Collect => AggState::Collect(Vec::new()),
4800            AggregateFn::StDev => AggState::StDev {
4801                sum: 0.0,
4802                sum_sq: 0.0,
4803                count: 0,
4804            },
4805            AggregateFn::StDevP => AggState::StDevP {
4806                sum: 0.0,
4807                sum_sq: 0.0,
4808                count: 0,
4809            },
4810            AggregateFn::PercentileDisc => AggState::PercentileDisc {
4811                items: Vec::new(),
4812                percentile: None,
4813            },
4814            AggregateFn::PercentileCont => AggState::PercentileCont {
4815                items: Vec::new(),
4816                percentile: None,
4817            },
4818        }
4819    }
4820
4821    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
4822        match self {
4823            AggState::Count(c) => match arg {
4824                AggregateArg::Star => *c += 1,
4825                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
4826                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
4827                        *c += 1;
4828                    }
4829                }
4830            },
4831            AggState::Sum {
4832                int_part,
4833                float_part,
4834                is_float,
4835            } => {
4836                let v = expr_arg_value(arg, ctx)?;
4837                match v {
4838                    Value::Null => {}
4839                    Value::Property(Property::Int64(i)) => *int_part += i,
4840                    Value::Property(Property::Float64(f)) => {
4841                        *float_part += f;
4842                        *is_float = true;
4843                    }
4844                    _ => return Err(Error::AggregateTypeError),
4845                }
4846            }
4847            AggState::Avg { total, count } => {
4848                let v = expr_arg_value(arg, ctx)?;
4849                match v {
4850                    Value::Null => {}
4851                    Value::Property(Property::Int64(i)) => {
4852                        *total += i as f64;
4853                        *count += 1;
4854                    }
4855                    Value::Property(Property::Float64(f)) => {
4856                        *total += f;
4857                        *count += 1;
4858                    }
4859                    _ => return Err(Error::AggregateTypeError),
4860                }
4861            }
4862            AggState::Min(slot) => {
4863                // `min` / `max` ignore null inputs and operate
4864                // over any `Value` (including lists, nodes etc.),
4865                // not just scalar Properties. The ordering is
4866                // `compare_values`: within a single type the
4867                // natural order, across types the
4868                // `type_order_value` rank.
4869                let v = expr_arg_value(arg, ctx)?;
4870                if matches!(v, Value::Null | Value::Property(Property::Null)) {
4871                    // skip
4872                } else {
4873                    match slot {
4874                        None => *slot = Some(v),
4875                        Some(cur) => {
4876                            if compare_values(&v, cur) == Ordering::Less {
4877                                *cur = v;
4878                            }
4879                        }
4880                    }
4881                }
4882            }
4883            AggState::Max(slot) => {
4884                let v = expr_arg_value(arg, ctx)?;
4885                if matches!(v, Value::Null | Value::Property(Property::Null)) {
4886                    // skip
4887                } else {
4888                    match slot {
4889                        None => *slot = Some(v),
4890                        Some(cur) => {
4891                            if compare_values(&v, cur) == Ordering::Greater {
4892                                *cur = v;
4893                            }
4894                        }
4895                    }
4896                }
4897            }
4898            AggState::Collect(items) => {
4899                let v = expr_arg_value(arg, ctx)?;
4900                if !matches!(v, Value::Null) {
4901                    items.push(v);
4902                }
4903            }
4904            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
4905                let v = expr_arg_value(arg, ctx)?;
4906                if !matches!(v, Value::Null) {
4907                    items.push(v);
4908                }
4909            }
4910            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
4911                let v = expr_arg_value(arg, ctx)?;
4912                match v {
4913                    Value::Null => {}
4914                    Value::Property(Property::Int64(i)) => {
4915                        let f = i as f64;
4916                        *sum += f;
4917                        *sum_sq += f * f;
4918                        *count += 1;
4919                    }
4920                    Value::Property(Property::Float64(f)) => {
4921                        *sum += f;
4922                        *sum_sq += f * f;
4923                        *count += 1;
4924                    }
4925                    _ => return Err(Error::AggregateTypeError),
4926                }
4927            }
4928        }
4929        Ok(())
4930    }
4931
4932    fn finalize(&self) -> Value {
4933        match self {
4934            AggState::Count(c) => Value::Property(Property::Int64(*c)),
4935            AggState::Sum {
4936                int_part,
4937                float_part,
4938                is_float,
4939            } => {
4940                if *is_float {
4941                    Value::Property(Property::Float64(*float_part + *int_part as f64))
4942                } else {
4943                    Value::Property(Property::Int64(*int_part))
4944                }
4945            }
4946            AggState::Avg { total, count } => {
4947                if *count == 0 {
4948                    Value::Null
4949                } else {
4950                    Value::Property(Property::Float64(*total / *count as f64))
4951                }
4952            }
4953            AggState::Min(slot) | AggState::Max(slot) => match slot {
4954                Some(v) => v.clone(),
4955                None => Value::Null,
4956            },
4957            AggState::Collect(items) => Value::List(items.clone()),
4958            AggState::StDevP { sum, sum_sq, count } => {
4959                if *count == 0 {
4960                    Value::Property(Property::Float64(0.0))
4961                } else {
4962                    let n = *count as f64;
4963                    let variance = *sum_sq / n - (*sum / n).powi(2);
4964                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
4965                }
4966            }
4967            AggState::StDev { sum, sum_sq, count } => {
4968                if *count < 2 {
4969                    Value::Property(Property::Float64(0.0))
4970                } else {
4971                    let n = *count as f64;
4972                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
4973                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
4974                }
4975            }
4976            AggState::PercentileDisc { items, percentile } => {
4977                percentile_disc(items, percentile.unwrap_or(0.0))
4978            }
4979            AggState::PercentileCont { items, percentile } => {
4980                percentile_cont(items, percentile.unwrap_or(0.0))
4981            }
4982        }
4983    }
4984}
4985
4986fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
4987    match arg {
4988        AggregateArg::Star => Err(Error::AggregateTypeError),
4989        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
4990    }
4991}
4992
4993/// Coerce a collected aggregate value into an f64 for percentile
4994/// math. Unhandled types fall back to NaN; the caller sorts NaN
4995/// out of the stream before computing the percentile.
4996fn value_to_f64(v: &Value) -> f64 {
4997    match v {
4998        Value::Property(Property::Int64(i)) => *i as f64,
4999        Value::Property(Property::Float64(f)) => *f,
5000        _ => f64::NAN,
5001    }
5002}
5003
5004/// `percentileDisc(expr, p)` — discrete percentile. Returns the
5005/// smallest value at or above the `p`-ranked position. Numbers only;
5006/// non-numeric values get sorted to the end and are effectively
5007/// ignored unless the percentile lands on one.
5008fn percentile_disc(items: &[Value], p: f64) -> Value {
5009    let mut nums: Vec<(f64, Value)> = items
5010        .iter()
5011        .map(|v| (value_to_f64(v), v.clone()))
5012        .filter(|(f, _)| !f.is_nan())
5013        .collect();
5014    if nums.is_empty() {
5015        return Value::Null;
5016    }
5017    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5018    let p = p.clamp(0.0, 1.0);
5019    let n = nums.len();
5020    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
5021    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5022    nums[idx.min(n - 1)].1.clone()
5023}
5024
5025/// `percentileCont(expr, p)` — continuous percentile. Linearly
5026/// interpolates between the two ranks that bracket the fractional
5027/// position `p * (n - 1)`. Returns a Float64.
5028fn percentile_cont(items: &[Value], p: f64) -> Value {
5029    let mut nums: Vec<f64> = items
5030        .iter()
5031        .map(value_to_f64)
5032        .filter(|f| !f.is_nan())
5033        .collect();
5034    if nums.is_empty() {
5035        return Value::Null;
5036    }
5037    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5038    let p = p.clamp(0.0, 1.0);
5039    let n = nums.len();
5040    if n == 1 {
5041        return Value::Property(Property::Float64(nums[0]));
5042    }
5043    let pos = p * (n as f64 - 1.0);
5044    let lo = pos.floor() as usize;
5045    let hi = pos.ceil() as usize;
5046    let frac = pos - lo as f64;
5047    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5048    Value::Property(Property::Float64(v))
5049}
5050
5051struct SkipOp {
5052    input: Box<dyn Operator>,
5053    count_expr: Expr,
5054    remaining: Option<i64>,
5055}
5056
5057impl SkipOp {
5058    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5059        Self {
5060            input,
5061            count_expr,
5062            remaining: None,
5063        }
5064    }
5065}
5066
5067impl Operator for SkipOp {
5068    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5069        if self.remaining.is_none() {
5070            let empty = Row::new();
5071            let ectx = ctx.eval_ctx(&empty);
5072            let val = eval_expr(&self.count_expr, &ectx)?;
5073            self.remaining = Some(expr_to_count(val)?);
5074        }
5075        let rem = self.remaining.as_mut().unwrap();
5076        while *rem > 0 {
5077            if self.input.next(ctx)?.is_none() {
5078                return Ok(None);
5079            }
5080            *rem -= 1;
5081        }
5082        self.input.next(ctx)
5083    }
5084}
5085
5086struct LimitOp {
5087    input: Box<dyn Operator>,
5088    count_expr: Expr,
5089    remaining: Option<i64>,
5090}
5091
5092impl LimitOp {
5093    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5094        Self {
5095            input,
5096            count_expr,
5097            remaining: None,
5098        }
5099    }
5100}
5101
5102impl Operator for LimitOp {
5103    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5104        if self.remaining.is_none() {
5105            let empty = Row::new();
5106            let ectx = ctx.eval_ctx(&empty);
5107            let val = eval_expr(&self.count_expr, &ectx)?;
5108            self.remaining = Some(expr_to_count(val)?);
5109        }
5110        let rem = self.remaining.as_mut().unwrap();
5111        if *rem <= 0 {
5112            return Ok(None);
5113        }
5114        match self.input.next(ctx)? {
5115            Some(row) => {
5116                *rem -= 1;
5117                Ok(Some(row))
5118            }
5119            None => Ok(None),
5120        }
5121    }
5122}
5123
5124fn expr_to_count(val: Value) -> Result<i64> {
5125    match val {
5126        Value::Null | Value::Property(Property::Null) => Ok(0),
5127        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5128        // openCypher: SKIP/LIMIT require an integer. Float values
5129        // (including integer-valued floats) are rejected as
5130        // InvalidArgumentType — they'd only be valid after an
5131        // explicit cast, which the user must write themselves.
5132        _ => Err(Error::TypeMismatch),
5133    }
5134}