Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12    ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13    Literal, LogicalPlan, PropertyType as CypherPropertyType, RemoveSpec, ReturnItem,
14    SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17    ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18    PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24/// Shared tombstone set for nodes / edges that have been DELETEd
25/// earlier in the current query. Property / label / type / keys /
26/// id accesses consult this set and raise `DeletedEntityAccess`
27/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
28/// runtime instead of reading off a stale clone.
29#[derive(Default)]
30pub struct Tombstones {
31    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36    pub store: &'a dyn GraphReader,
37    pub writer: &'a dyn GraphWriter,
38    /// Per-query parameter bindings. Empty for the single-node and Raft
39    /// entry points that don't carry a Bolt RUN; populated with the
40    /// driver-supplied params on the Bolt path. Threaded into every
41    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
42    pub params: &'a ParamMap,
43    /// Procedures known to this execution. Defaults to an empty
44    /// registry on call sites that don't care; the TCK harness and
45    /// any future server startup plug in a populated registry so
46    /// `CALL ns.name(...)` resolves.
47    pub procedures: &'a ProcedureRegistry,
48    /// Outer-scope rows contributed by enclosing operators. The
49    /// innermost slot (first entry) is the nearest outer. Set by
50    /// [`CartesianProductOp`] when running its right side so that
51    /// operators inside — typically `EdgeExpandOp` /
52    /// `VarLengthExpandOp` constraint lookups — can resolve
53    /// variables the right-side scan didn't bind directly. Empty
54    /// at the top level.
55    pub outer_rows: &'a [&'a Row],
56    /// Tombstones for entities deleted earlier in the same query.
57    /// Shared by reference so DeleteOp can insert and downstream
58    /// eval can check.
59    pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64    fn put_node(&self, _: &Node) -> Result<()> {
65        Ok(())
66    }
67    fn put_edge(&self, _: &Edge) -> Result<()> {
68        Ok(())
69    }
70    fn delete_edge(&self, _: EdgeId) -> Result<()> {
71        Ok(())
72    }
73    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74        Ok(())
75    }
76}
77
78impl<'a> ExecCtx<'a> {
79    /// Build an `EvalCtx` wrapping the given row alongside this
80    /// exec context's params and reader. Used by operators to
81    /// bridge the per-operator context into the expression
82    /// evaluator without repeating the field list at every
83    /// `eval_expr` call site.
84    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85    where
86        'a: 'b,
87    {
88        EvalCtx {
89            row,
90            params: self.params,
91            reader: self.store,
92            procedures: self.procedures,
93            outer_rows: self.outer_rows,
94            tombstones: self.tombstones,
95        }
96    }
97
98    /// Look up a variable in `row` first, then in each outer-scope
99    /// row in order. Returns the nearest match. Used for constraint
100    /// lookups inside right-side operators of a `CartesianProduct`
101    /// so a fresh scan that references an outer binding
102    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
103    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104        if let Some(v) = row.get(name) {
105            return Some(v);
106        }
107        for outer in self.outer_rows {
108            if let Some(v) = outer.get(name) {
109                return Some(v);
110            }
111        }
112        None
113    }
114}
115
116pub trait Operator {
117    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120/// Execute a plan using the given store for both reads and writes.
121/// Equivalent to [`execute_with_reader`] with the store acting as both
122/// and an empty parameter map.
123pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124    let params = ParamMap::new();
125    execute_with_reader(
126        plan,
127        store as &dyn GraphReader,
128        store as &dyn GraphWriter,
129        &params,
130    )
131}
132
133/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
134/// sending mutations to a separate [`GraphWriter`]. Used by in-process
135/// setups where reads are always cheap-local (single node or
136/// full-replica Raft). No parameters.
137pub fn execute_with_writer(
138    plan: &LogicalPlan,
139    store: &RocksDbStorageEngine,
140    writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142    let params = ParamMap::new();
143    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
144}
145
146/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
147/// parameter map for `$param` resolution. Routing mode uses a
148/// partitioned reader that fans out reads across peers; the Bolt
149/// listener supplies the driver-bound param map.
150pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151    let text = meshdb_cypher::format_plan(plan);
152    let mut row = Row::new();
153    row.insert("plan".to_string(), Value::Property(Property::String(text)));
154    vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158    let result_rows = execute(plan, store)?;
159    let row_count = result_rows.len() as i64;
160    let plan_text = meshdb_cypher::format_plan(plan);
161    let summary = format!("{plan_text}\nRows: {row_count}");
162    let mut row = Row::new();
163    row.insert(
164        "profile".to_string(),
165        Value::Property(Property::String(summary)),
166    );
167    row.insert(
168        "rows".to_string(),
169        Value::Property(Property::Int64(row_count)),
170    );
171    Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175    plan: &LogicalPlan,
176    reader: &dyn GraphReader,
177    writer: &dyn GraphWriter,
178    params: &ParamMap,
179) -> Result<Vec<Row>> {
180    let empty_procs = ProcedureRegistry::new();
181    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184/// Like [`execute_with_reader`] but with an explicit procedure
185/// registry in scope. Used by the TCK harness to mount mock
186/// procedures declared by `there exists a procedure ...` steps,
187/// and reserved for the server startup path once built-in
188/// procedures land.
189pub fn execute_with_reader_and_procs(
190    plan: &LogicalPlan,
191    reader: &dyn GraphReader,
192    writer: &dyn GraphWriter,
193    params: &ParamMap,
194    procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196    // `datetime()`, `localtime()`, and friends cache "now" in a
197    // thread-local so multiple calls inside the same statement see
198    // the same instant. Clear it at the start of every execution so
199    // the next query gets a fresh reading.
200    crate::eval::reset_statement_time();
201    // Schema DDL never enters the operator pipeline — it's a
202    // side-effect on the backing store, not a row-producing query.
203    // Short-circuit here so `build_op` stays a pure plan-to-operator
204    // mapping and doesn't need DDL-specific cases.
205    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206        return Ok(rows);
207    }
208    let suppress_output = is_write_only_plan(plan);
209    let mut op = build_op(plan);
210    let tombstones = Tombstones::default();
211    let ctx = ExecCtx {
212        store: reader,
213        writer,
214        params,
215        procedures,
216        outer_rows: &[],
217        tombstones: &tombstones,
218    };
219    let mut rows = Vec::new();
220    while let Some(row) = op.next(&ctx)? {
221        rows.push(row);
222    }
223    if suppress_output {
224        Ok(Vec::new())
225    } else {
226        Ok(rows)
227    }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231    // A plan is write-only when its top-level operator is a mutation
232    // with no projection (Project/Identity) wrapping it. Mutations
233    // with RETURN have a Project on top.
234    match plan {
235        LogicalPlan::CreatePath { .. }
236        | LogicalPlan::Delete { .. }
237        | LogicalPlan::SetProperty { .. }
238        | LogicalPlan::Remove { .. }
239        | LogicalPlan::Foreach { .. }
240        | LogicalPlan::MergeNode { .. }
241        | LogicalPlan::MergeEdge { .. } => true,
242        _ => false,
243    }
244}
245
246/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
247/// statement and was handled; `Ok(None)` when it's a regular graph
248/// operation that the operator pipeline should execute normally.
249///
250/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
251/// value so Bolt clients see a non-empty RECORD stream rather than
252/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
253/// `SHOW INDEXES` returns one row per registered index with
254/// `label`, `property`, and `state` columns.
255fn try_execute_ddl(
256    plan: &LogicalPlan,
257    reader: &dyn GraphReader,
258    writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260    match plan {
261        LogicalPlan::CreatePropertyIndex { label, property } => {
262            writer.create_property_index(label, property)?;
263            Ok(Some(vec![ddl_ack_row("created", label, property)]))
264        }
265        LogicalPlan::DropPropertyIndex { label, property } => {
266            writer.drop_property_index(label, property)?;
267            Ok(Some(vec![ddl_ack_row("dropped", label, property)]))
268        }
269        LogicalPlan::ShowPropertyIndexes => {
270            // SHOW is a pure read — source it from the reader so
271            // overlay/partitioned readers deliver the right view
272            // without round-tripping through a writer.
273            let specs = reader.list_property_indexes()?;
274            let rows = specs
275                .into_iter()
276                .map(|(label, property)| {
277                    let mut row = Row::default();
278                    row.insert("label".into(), Value::Property(Property::String(label)));
279                    row.insert(
280                        "property".into(),
281                        Value::Property(Property::String(property)),
282                    );
283                    row.insert(
284                        "state".into(),
285                        Value::Property(Property::String("online".into())),
286                    );
287                    row
288                })
289                .collect();
290            Ok(Some(rows))
291        }
292        LogicalPlan::CreatePropertyConstraint {
293            name,
294            scope,
295            properties,
296            kind,
297            if_not_exists,
298        } => {
299            let storage_kind = match kind {
300                ConstraintKind::Unique => PropertyConstraintKind::Unique,
301                ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
302                ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
303                ConstraintKind::PropertyType(t) => {
304                    PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
305                }
306            };
307            let storage_scope = cypher_to_storage_scope(scope);
308            let spec = writer.create_property_constraint(
309                name.as_deref(),
310                &storage_scope,
311                properties,
312                storage_kind,
313                *if_not_exists,
314            )?;
315            Ok(Some(vec![constraint_ack_row("created", &spec)]))
316        }
317        LogicalPlan::DropPropertyConstraint { name, if_exists } => {
318            writer.drop_property_constraint(name, *if_exists)?;
319            let mut row = Row::default();
320            row.insert(
321                "state".into(),
322                Value::Property(Property::String("dropped".into())),
323            );
324            row.insert(
325                "name".into(),
326                Value::Property(Property::String(name.clone())),
327            );
328            Ok(Some(vec![row]))
329        }
330        LogicalPlan::ShowPropertyConstraints => {
331            let specs = reader.list_property_constraints()?;
332            let rows = specs.into_iter().map(constraint_show_row).collect();
333            Ok(Some(rows))
334        }
335        _ => Ok(None),
336    }
337}
338
339/// One-row acknowledgement emitted after `CREATE CONSTRAINT`. Carries
340/// the resolved spec so callers see the final name when they omitted
341/// it. Columns match `SHOW CONSTRAINTS` (plus `state`) so Bolt clients
342/// can uniformly format DDL responses.
343fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
344    let mut row = constraint_show_row(spec.clone());
345    row.insert(
346        "state".into(),
347        Value::Property(Property::String(state.into())),
348    );
349    row
350}
351
352/// Row shape for `SHOW CONSTRAINTS` and `db.constraints()`. Columns:
353/// `name`, `scope` ("NODE" / "RELATIONSHIP"), `label` (label or edge
354/// type, depending on scope), `properties` (list), and `type`.
355/// Single-property constraints still carry a one-element `properties`
356/// list; composite kinds (e.g. `NodeKey`) carry the full tuple.
357fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
358    let mut row = Row::default();
359    row.insert("name".into(), Value::Property(Property::String(spec.name)));
360    let (scope_tag, target) = match spec.scope {
361        meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
362        meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
363    };
364    row.insert(
365        "scope".into(),
366        Value::Property(Property::String(scope_tag.into())),
367    );
368    // `label` stays for backwards compatibility — it carries the
369    // scope's target regardless of node vs relationship, matching
370    // the index DDL row's column for historical consistency. A
371    // separate `scope` column tells you what kind of target it is.
372    row.insert("label".into(), Value::Property(Property::String(target)));
373    let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
374    row.insert("properties".into(), Value::Property(Property::List(props)));
375    row.insert(
376        "type".into(),
377        Value::Property(Property::String(spec.kind.as_string())),
378    );
379    row
380}
381
382/// Convert the Cypher AST's `ConstraintScope` into the storage-layer
383/// enum. Narrow bridge — same shape on both sides; kept in
384/// different crates to preserve the dependency direction.
385fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
386    match scope {
387        CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
388        CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
389    }
390}
391
392/// Convert the Cypher AST's `PropertyType` into the storage-layer
393/// enum. Narrow bridge — the two enums carry the same four variants
394/// but live in different crates to keep the dependency direction
395/// clean (cypher → executor → storage, never the reverse).
396fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
397    match t {
398        CypherPropertyType::String => StoragePropertyType::String,
399        CypherPropertyType::Integer => StoragePropertyType::Integer,
400        CypherPropertyType::Float => StoragePropertyType::Float,
401        CypherPropertyType::Boolean => StoragePropertyType::Boolean,
402    }
403}
404
405fn ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
406    let mut row = Row::default();
407    row.insert(
408        "state".into(),
409        Value::Property(Property::String(state.into())),
410    );
411    row.insert(
412        "label".into(),
413        Value::Property(Property::String(label.into())),
414    );
415    row.insert(
416        "property".into(),
417        Value::Property(Property::String(property.into())),
418    );
419    row
420}
421
422fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
423    build_op_inner(plan, None)
424}
425
426pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
427    macro_rules! child {
428        ($p:expr) => {
429            build_op_inner($p, seed)
430        };
431    }
432    match plan {
433        LogicalPlan::CreatePath {
434            input,
435            nodes,
436            edges,
437        } => Box::new(CreatePathOp::new(
438            input.as_ref().map(|p| child!(p)),
439            nodes.clone(),
440            edges.clone(),
441        )),
442        LogicalPlan::CartesianProduct { left, right } => {
443            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
444        }
445        LogicalPlan::Delete {
446            input,
447            detach,
448            vars,
449            exprs,
450        } => Box::new(DeleteOp::new(
451            child!(input),
452            *detach,
453            vars.clone(),
454            exprs.clone(),
455        )),
456        LogicalPlan::SetProperty { input, assignments } => {
457            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
458        }
459        LogicalPlan::Remove { input, items } => {
460            Box::new(RemoveOp::new(child!(input), items.clone()))
461        }
462        LogicalPlan::LoadCsv {
463            input,
464            path_expr,
465            var,
466            with_headers,
467        } => Box::new(LoadCsvOp::new(
468            input.as_ref().map(|p| child!(p)),
469            path_expr.clone(),
470            var.clone(),
471            *with_headers,
472        )),
473        LogicalPlan::Foreach {
474            input,
475            var,
476            list_expr,
477            set_assignments,
478            remove_items,
479        } => Box::new(ForeachOp::new(
480            child!(input),
481            var.clone(),
482            list_expr.clone(),
483            set_assignments.clone(),
484            remove_items.clone(),
485        )),
486        LogicalPlan::CallSubquery { input, body } => {
487            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
488        }
489        LogicalPlan::OptionalApply {
490            input,
491            body,
492            null_vars,
493        } => Box::new(OptionalApplyOp::new(
494            child!(input),
495            (**body).clone(),
496            null_vars.clone(),
497        )),
498        LogicalPlan::ProcedureCall {
499            input,
500            qualified_name,
501            args,
502            yield_spec,
503            standalone,
504        } => Box::new(ProcedureCallOp::new(
505            input.as_ref().map(|p| child!(p)),
506            qualified_name.clone(),
507            args.clone(),
508            yield_spec.clone(),
509            *standalone,
510        )),
511        LogicalPlan::SeedRow => match seed {
512            Some(r) => Box::new(SeededRowOp {
513                row: Some(r.clone()),
514            }),
515            None => Box::new(SeedRowOp { done: false }),
516        },
517        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
518        LogicalPlan::NodeScanByLabels { var, labels } => {
519            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
520        }
521        LogicalPlan::EdgeExpand {
522            input,
523            src_var,
524            edge_var,
525            dst_var,
526            dst_labels,
527            edge_properties,
528            edge_types,
529            direction,
530            edge_constraint_var,
531        } => Box::new(EdgeExpandOp::new(
532            child!(input),
533            src_var.clone(),
534            edge_var.clone(),
535            dst_var.clone(),
536            dst_labels.clone(),
537            edge_properties.clone(),
538            edge_types.clone(),
539            *direction,
540            edge_constraint_var.clone(),
541        )),
542        LogicalPlan::OptionalEdgeExpand {
543            input,
544            src_var,
545            edge_var,
546            dst_var,
547            dst_labels,
548            dst_properties,
549            edge_types,
550            direction,
551            dst_constraint_var,
552            edge_constraint_var,
553        } => Box::new(OptionalEdgeExpandOp::new(
554            child!(input),
555            src_var.clone(),
556            edge_var.clone(),
557            dst_var.clone(),
558            dst_labels.clone(),
559            dst_properties.clone(),
560            edge_types.clone(),
561            *direction,
562            dst_constraint_var.clone(),
563            edge_constraint_var.clone(),
564        )),
565        LogicalPlan::VarLengthExpand {
566            input,
567            src_var,
568            edge_var,
569            dst_var,
570            dst_labels,
571            edge_types,
572            edge_properties,
573            direction,
574            min_hops,
575            max_hops,
576            path_var,
577            optional,
578            dst_constraint_var,
579            bound_edge_list_var,
580            excluded_edge_vars,
581        } => Box::new(VarLengthExpandOp::new(
582            child!(input),
583            src_var.clone(),
584            edge_var.clone(),
585            dst_var.clone(),
586            dst_labels.clone(),
587            edge_types.clone(),
588            edge_properties.clone(),
589            *direction,
590            *min_hops,
591            *max_hops,
592            path_var.clone(),
593            *optional,
594            dst_constraint_var.clone(),
595            bound_edge_list_var.clone(),
596            excluded_edge_vars.clone(),
597        )),
598        LogicalPlan::Filter { input, predicate } => {
599            Box::new(FilterOp::new(child!(input), predicate.clone()))
600        }
601        LogicalPlan::Project { input, items } => {
602            Box::new(ProjectOp::new(child!(input), items.clone()))
603        }
604        LogicalPlan::Aggregate {
605            input,
606            group_keys,
607            aggregates,
608        } => Box::new(AggregateOp::new(
609            child!(input),
610            group_keys.clone(),
611            aggregates.clone(),
612        )),
613        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
614        LogicalPlan::CoalesceNullRow { input, null_vars } => {
615            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
616        }
617        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
618        LogicalPlan::OrderBy { input, sort_items } => {
619            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
620        }
621        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
622        LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
623        LogicalPlan::MergeNode {
624            input,
625            var,
626            labels,
627            properties,
628            on_create,
629            on_match,
630        } => Box::new(MergeNodeOp::new(
631            input.as_ref().map(|p| child!(p)),
632            var.clone(),
633            labels.clone(),
634            properties.clone(),
635            on_create.clone(),
636            on_match.clone(),
637        )),
638        LogicalPlan::MergeEdge {
639            input,
640            edge_var,
641            src_var,
642            dst_var,
643            edge_type,
644            undirected,
645            properties,
646            on_create,
647            on_match,
648        } => Box::new(MergeEdgeOp::new(
649            child!(input),
650            edge_var.clone(),
651            src_var.clone(),
652            dst_var.clone(),
653            edge_type.clone(),
654            *undirected,
655            properties.clone(),
656            on_create.clone(),
657            on_match.clone(),
658        )),
659        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
660        LogicalPlan::UnwindChain { input, var, expr } => {
661            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
662        }
663        LogicalPlan::IndexSeek {
664            var,
665            label,
666            property,
667            value,
668        } => Box::new(IndexSeekOp::new(
669            var.clone(),
670            label.clone(),
671            property.clone(),
672            value.clone(),
673        )),
674        // DDL plans are handled by `try_execute_ddl` before the
675        // operator pipeline is built, so they should never reach
676        // this point. An assertion here catches any future
677        // refactor that forgets to gate them.
678        LogicalPlan::Union { branches, all } => {
679            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
680            Box::new(UnionOp::new(branch_ops, *all))
681        }
682        LogicalPlan::BindPath {
683            input,
684            path_var,
685            node_vars,
686            edge_vars,
687        } => Box::new(BindPathOp::new(
688            child!(input),
689            path_var.clone(),
690            node_vars.clone(),
691            edge_vars.clone(),
692        )),
693        LogicalPlan::ShortestPath {
694            input,
695            src_var,
696            dst_var,
697            path_var,
698            edge_types,
699            direction,
700            max_hops,
701            kind,
702        } => Box::new(ShortestPathOp::new(
703            child!(input),
704            src_var.clone(),
705            dst_var.clone(),
706            path_var.clone(),
707            edge_types.clone(),
708            *direction,
709            *max_hops,
710            *kind,
711        )),
712        LogicalPlan::CreatePropertyIndex { .. }
713        | LogicalPlan::DropPropertyIndex { .. }
714        | LogicalPlan::ShowPropertyIndexes
715        | LogicalPlan::CreatePropertyConstraint { .. }
716        | LogicalPlan::DropPropertyConstraint { .. }
717        | LogicalPlan::ShowPropertyConstraints => {
718            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
719        }
720    }
721}
722
723struct UnwindOp {
724    var: String,
725    expr: Expr,
726    items: Option<Vec<Value>>,
727    cursor: usize,
728}
729
730impl UnwindOp {
731    fn new(var: String, expr: Expr) -> Self {
732        Self {
733            var,
734            expr,
735            items: None,
736            cursor: 0,
737        }
738    }
739}
740
741impl Operator for UnwindOp {
742    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
743        if self.items.is_none() {
744            let empty = Row::new();
745            let ectx = EvalCtx {
746                row: &empty,
747                params: ctx.params,
748                reader: ctx.store,
749                procedures: ctx.procedures,
750                outer_rows: ctx.outer_rows,
751                tombstones: ctx.tombstones,
752            };
753            let val = eval_expr(&self.expr, &ectx)?;
754            self.items = Some(coerce_unwind_list(val)?);
755        }
756        let items = self.items.as_ref().unwrap();
757        if self.cursor < items.len() {
758            let v = items[self.cursor].clone();
759            self.cursor += 1;
760            let mut row = Row::new();
761            row.insert(self.var.clone(), v);
762            Ok(Some(row))
763        } else {
764            Ok(None)
765        }
766    }
767}
768
769/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
770/// to produce a list, and emits one output row per list element.
771/// Each output row inherits every binding from the input row plus
772/// a new `var` binding. Empty / null lists drop the input row and
773/// the operator immediately pulls the next input row.
774struct UnwindChainOp {
775    input: Box<dyn Operator>,
776    var: String,
777    expr: Expr,
778    current_row: Option<Row>,
779    items: Vec<Value>,
780    cursor: usize,
781}
782
783impl UnwindChainOp {
784    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
785        Self {
786            input,
787            var,
788            expr,
789            current_row: None,
790            items: Vec::new(),
791            cursor: 0,
792        }
793    }
794}
795
796impl Operator for UnwindChainOp {
797    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
798        loop {
799            if let Some(base) = &self.current_row {
800                if self.cursor < self.items.len() {
801                    let v = self.items[self.cursor].clone();
802                    self.cursor += 1;
803                    let mut row = base.clone();
804                    row.insert(self.var.clone(), v);
805                    return Ok(Some(row));
806                }
807                self.current_row = None;
808                self.items.clear();
809                self.cursor = 0;
810            }
811            let base = match self.input.next(ctx)? {
812                Some(r) => r,
813                None => return Ok(None),
814            };
815            let ectx = EvalCtx {
816                row: &base,
817                params: ctx.params,
818                reader: ctx.store,
819                procedures: ctx.procedures,
820                outer_rows: ctx.outer_rows,
821                tombstones: ctx.tombstones,
822            };
823            let val = eval_expr(&self.expr, &ectx)?;
824            self.items = coerce_unwind_list(val)?;
825            self.current_row = Some(base);
826        }
827    }
828}
829
830/// Shared list-coercion used by both UNWIND operators. Accepts a
831/// native executor `Value::List`, a property-list
832/// `Property::List`, or `Null` (treated as an empty list). Any
833/// other shape is a type error — UNWIND is defined over lists.
834fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
835    match val {
836        Value::List(items) => Ok(items),
837        Value::Property(Property::List(props)) => {
838            Ok(props.into_iter().map(Value::Property).collect())
839        }
840        Value::Null => Ok(Vec::new()),
841        _ => Err(Error::TypeMismatch),
842    }
843}
844
845struct CreatePathOp {
846    input: Option<Box<dyn Operator>>,
847    nodes: Vec<CreateNodeSpec>,
848    edges: Vec<CreateEdgeSpec>,
849    done: bool,
850    buffered: Option<Vec<Row>>,
851    cursor: usize,
852}
853
854impl CreatePathOp {
855    fn new(
856        input: Option<Box<dyn Operator>>,
857        nodes: Vec<CreateNodeSpec>,
858        edges: Vec<CreateEdgeSpec>,
859    ) -> Self {
860        Self {
861            input,
862            nodes,
863            edges,
864            done: false,
865            buffered: None,
866            cursor: 0,
867        }
868    }
869
870    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
871        let mut out = row.clone();
872        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
873        for spec in &self.nodes {
874            match spec {
875                CreateNodeSpec::New {
876                    var,
877                    labels,
878                    properties,
879                } => {
880                    let mut node = Node::new();
881                    for label in labels {
882                        node.labels.push(label.clone());
883                    }
884                    // Property values are `Expr` (literal | parameter
885                    // per the grammar). Evaluate against the current row
886                    // + params and convert to a stored Property via the
887                    // existing helper, which rejects Node/Edge values.
888                    // Null-valued properties aren't stored — openCypher
889                    // treats `{k: null}` as "no such property," so
890                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
891                    for (k, expr) in properties {
892                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
893                        let prop = value_to_property(value)?;
894                        if matches!(prop, Property::Null) {
895                            continue;
896                        }
897                        node.properties.insert(k.clone(), prop);
898                    }
899                    ctx.writer.put_node(&node)?;
900                    node_ids.push(node.id);
901                    if let Some(v) = var {
902                        out.insert(v.clone(), Value::Node(node));
903                    }
904                }
905                CreateNodeSpec::Reference(name) => {
906                    let id = match out.get(name) {
907                        Some(Value::Node(n)) => n.id,
908                        _ => return Err(Error::UnboundVariable(name.clone())),
909                    };
910                    node_ids.push(id);
911                }
912            }
913        }
914        for spec in &self.edges {
915            let src = node_ids[spec.src_idx];
916            let dst = node_ids[spec.dst_idx];
917            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
918            for (k, expr) in &spec.properties {
919                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
920                let prop = value_to_property(value)?;
921                if matches!(prop, Property::Null) {
922                    continue;
923                }
924                edge.properties.insert(k.clone(), prop);
925            }
926            ctx.writer.put_edge(&edge)?;
927            if let Some(v) = &spec.var {
928                out.insert(v.clone(), Value::Edge(edge));
929            }
930        }
931        Ok(out)
932    }
933}
934
935impl Operator for CreatePathOp {
936    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
937        if self.input.is_some() {
938            // Drain the whole input first, then replay sequentially. Draining up
939            // front avoids aliasing the source scan's cursor while we're writing
940            // to the store (node-label scans cache ids lazily on first call).
941            if let Some(buffered) = self.buffered.as_mut() {
942                if self.cursor < buffered.len() {
943                    let row = buffered[self.cursor].clone();
944                    self.cursor += 1;
945                    return Ok(Some(self.apply(ctx, &row)?));
946                }
947                return Ok(None);
948            }
949            let mut rows: Vec<Row> = Vec::new();
950            {
951                let input = self.input.as_mut().unwrap();
952                while let Some(row) = input.next(ctx)? {
953                    rows.push(row);
954                }
955            }
956            self.buffered = Some(rows);
957            self.cursor = 0;
958            // Fall through to next call via recursion.
959            self.next(ctx)
960        } else {
961            if self.done {
962                return Ok(None);
963            }
964            self.done = true;
965            let empty = Row::new();
966            Ok(Some(self.apply(ctx, &empty)?))
967        }
968    }
969}
970
971struct CartesianProductOp {
972    left: Box<dyn Operator>,
973    right_plan: LogicalPlan,
974    left_row: Option<Row>,
975    right_op: Option<Box<dyn Operator>>,
976}
977
978impl CartesianProductOp {
979    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
980        Self {
981            left,
982            right_plan,
983            left_row: None,
984            right_op: None,
985        }
986    }
987}
988
989impl Operator for CartesianProductOp {
990    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
991        loop {
992            if self.left_row.is_none() {
993                match self.left.next(ctx)? {
994                    None => return Ok(None),
995                    Some(row) => {
996                        self.left_row = Some(row);
997                        self.right_op = Some(build_op(&self.right_plan));
998                    }
999                }
1000            }
1001            let right_op = self.right_op.as_mut().expect("right_op set");
1002            // Expose the current left row to right-side operators
1003            // via a shadow context so constraint lookups (for
1004            // bound edges / bound edge lists) can find vars that
1005            // the right-side scan itself doesn't bind.
1006            let left_ref = self.left_row.as_ref().unwrap();
1007            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1008            stacked.push(left_ref);
1009            stacked.extend_from_slice(ctx.outer_rows);
1010            let inner_ctx = ExecCtx {
1011                store: ctx.store,
1012                writer: ctx.writer,
1013                params: ctx.params,
1014                procedures: ctx.procedures,
1015                outer_rows: &stacked,
1016                tombstones: ctx.tombstones,
1017            };
1018            match right_op.next(&inner_ctx)? {
1019                Some(right_row) => {
1020                    let mut combined = left_ref.clone();
1021                    for (k, v) in right_row {
1022                        combined.insert(k, v);
1023                    }
1024                    return Ok(Some(combined));
1025                }
1026                None => {
1027                    self.left_row = None;
1028                    self.right_op = None;
1029                }
1030            }
1031        }
1032    }
1033}
1034
1035struct DeleteOp {
1036    input: Box<dyn Operator>,
1037    detach: bool,
1038    #[allow(dead_code)]
1039    vars: Vec<String>,
1040    exprs: Vec<Expr>,
1041    /// Rows drained from `input` before any deletes have run.
1042    /// Populated on first call to `next`. openCypher treats DELETE
1043    /// as a batch mutation: the whole preceding row set is
1044    /// materialised, then the mutations happen, then rows are
1045    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
1046    /// a, b RETURN count(*)` deletes the first row's nodes before
1047    /// the pipelined scan reaches the second row.
1048    buffered: Option<Vec<Row>>,
1049    cursor: usize,
1050}
1051
1052impl DeleteOp {
1053    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1054        Self {
1055            input,
1056            detach,
1057            vars,
1058            exprs,
1059            buffered: None,
1060            cursor: 0,
1061        }
1062    }
1063
1064    /// Gather every edge and node id referenced by the row's
1065    /// DELETE expressions and run the delete in two phases —
1066    /// edges first, then nodes. The two-phase ordering is what
1067    /// lets `DELETE p1, p2` succeed when two paths share nodes
1068    /// connected by each other's edges: by the time the node
1069    /// phase runs, the in-batch edges are already gone. The
1070    /// non-detach check probes the graph then filters out
1071    /// any edge we just deleted, so the batch-level detachment
1072    /// is counted as "no longer attached" rather than failing.
1073    fn apply_deletes(
1074        &self,
1075        ctx: &ExecCtx,
1076        row: &Row,
1077        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1078        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1079    ) -> Result<()> {
1080        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1081        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1082        for expr in &self.exprs {
1083            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1084            match v {
1085                Value::Node(n) => node_ids.push(n.id),
1086                Value::Edge(e) => edge_ids.push(e.id),
1087                Value::Path { nodes, edges } => {
1088                    for e in edges {
1089                        edge_ids.push(e.id);
1090                    }
1091                    for n in nodes {
1092                        node_ids.push(n.id);
1093                    }
1094                }
1095                Value::Null | Value::Property(Property::Null) => continue,
1096                _ => return Err(Error::TypeMismatch),
1097            }
1098        }
1099        for eid in &edge_ids {
1100            if seen_edges.insert(*eid) {
1101                ctx.writer.delete_edge(*eid)?;
1102                ctx.tombstones.edges.borrow_mut().insert(*eid);
1103            }
1104        }
1105        for nid in &node_ids {
1106            if !seen_nodes.insert(*nid) {
1107                continue;
1108            }
1109            if self.detach {
1110                // DETACH DELETE also removes every attached edge;
1111                // tombstone them too so a later projection over a
1112                // formerly-attached edge clone also raises rather
1113                // than reading stale properties.
1114                for (eid, _) in ctx.store.outgoing(*nid)? {
1115                    ctx.tombstones.edges.borrow_mut().insert(eid);
1116                }
1117                for (eid, _) in ctx.store.incoming(*nid)? {
1118                    ctx.tombstones.edges.borrow_mut().insert(eid);
1119                }
1120                ctx.writer.detach_delete_node(*nid)?;
1121            } else {
1122                let out = ctx.store.outgoing(*nid)?;
1123                let inc = ctx.store.incoming(*nid)?;
1124                let still_attached = out
1125                    .iter()
1126                    .chain(inc.iter())
1127                    .any(|(eid, _)| !seen_edges.contains(eid));
1128                if still_attached {
1129                    return Err(Error::CannotDeleteAttachedNode);
1130                }
1131                ctx.writer.detach_delete_node(*nid)?;
1132            }
1133            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1134        }
1135        Ok(())
1136    }
1137}
1138
1139impl Operator for DeleteOp {
1140    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1141        // First call: drain the input, run all deletes, then
1142        // start forwarding rows. Buffering protects against the
1143        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1144        // DELETE r, a, b RETURN count(*)` must observe every
1145        // MATCH row before any node or edge disappears, otherwise
1146        // the undirected expansion hits an already-deleted source
1147        // on the second iteration.
1148        if self.buffered.is_none() {
1149            let mut rows: Vec<Row> = Vec::new();
1150            while let Some(row) = self.input.next(ctx)? {
1151                rows.push(row);
1152            }
1153            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1154            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1155            for row in &rows {
1156                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1157            }
1158            self.buffered = Some(rows);
1159            self.cursor = 0;
1160        }
1161        let rows = self.buffered.as_ref().unwrap();
1162        if self.cursor < rows.len() {
1163            let row = rows[self.cursor].clone();
1164            self.cursor += 1;
1165            return Ok(Some(row));
1166        }
1167        Ok(None)
1168    }
1169}
1170
1171struct SetPropertyOp {
1172    input: Box<dyn Operator>,
1173    assignments: Vec<SetAssignment>,
1174}
1175
1176impl SetPropertyOp {
1177    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1178        Self { input, assignments }
1179    }
1180}
1181
1182impl Operator for SetPropertyOp {
1183    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1184        match self.input.next(ctx)? {
1185            None => Ok(None),
1186            Some(mut row) => {
1187                // Phase 1: evaluate any RHSes against the original row bindings.
1188                enum Action {
1189                    SetKey {
1190                        var: String,
1191                        key: String,
1192                        prop: Property,
1193                    },
1194                    AddLabels {
1195                        var: String,
1196                        labels: Vec<String>,
1197                    },
1198                    Replace {
1199                        var: String,
1200                        props: Vec<(String, Property)>,
1201                    },
1202                    Merge {
1203                        var: String,
1204                        props: Vec<(String, Property)>,
1205                    },
1206                }
1207                let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1208                for a in &self.assignments {
1209                    match a {
1210                        SetAssignment::Property { var, key, value } => {
1211                            let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1212                            let prop = value_to_property(evaluated)?;
1213                            actions.push(Action::SetKey {
1214                                var: var.clone(),
1215                                key: key.clone(),
1216                                prop,
1217                            });
1218                        }
1219                        SetAssignment::Labels { var, labels } => {
1220                            actions.push(Action::AddLabels {
1221                                var: var.clone(),
1222                                labels: labels.clone(),
1223                            });
1224                        }
1225                        SetAssignment::Replace { var, properties } => {
1226                            // Property values are now `Expr`. Evaluate
1227                            // each against the current row + params and
1228                            // convert via the shared helper, which
1229                            // surfaces InvalidSetValue for Node/Edge.
1230                            let props = properties
1231                                .iter()
1232                                .map(|(k, expr)| {
1233                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1234                                    Ok((k.clone(), value_to_property(v)?))
1235                                })
1236                                .collect::<Result<Vec<(String, Property)>>>()?;
1237                            actions.push(Action::Replace {
1238                                var: var.clone(),
1239                                props,
1240                            });
1241                        }
1242                        SetAssignment::Merge { var, properties } => {
1243                            let props = properties
1244                                .iter()
1245                                .map(|(k, expr)| {
1246                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1247                                    Ok((k.clone(), value_to_property(v)?))
1248                                })
1249                                .collect::<Result<Vec<(String, Property)>>>()?;
1250                            actions.push(Action::Merge {
1251                                var: var.clone(),
1252                                props,
1253                            });
1254                        }
1255                        SetAssignment::ReplaceFromExpr {
1256                            var,
1257                            source,
1258                            replace,
1259                        } => {
1260                            let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1261                            let props = extract_property_map(&v)?;
1262                            if *replace {
1263                                actions.push(Action::Replace {
1264                                    var: var.clone(),
1265                                    props,
1266                                });
1267                            } else {
1268                                actions.push(Action::Merge {
1269                                    var: var.clone(),
1270                                    props,
1271                                });
1272                            }
1273                        }
1274                    }
1275                }
1276
1277                // Phase 2: apply updates in-place to the row bindings.
1278                let mut updated_nodes: HashSet<String> = HashSet::new();
1279                let mut updated_edges: HashSet<String> = HashSet::new();
1280                for action in actions {
1281                    match action {
1282                        Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1283                            // openCypher: SET on a null target (from
1284                            // OPTIONAL MATCH that didn't bind) is a
1285                            // silent no-op rather than an error.
1286                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1287                                continue
1288                            }
1289                            // openCypher: SET a.key = null removes the
1290                            // property rather than storing a null value.
1291                            Some(Value::Node(n)) => {
1292                                if matches!(prop, Property::Null) {
1293                                    n.properties.remove(&key);
1294                                } else {
1295                                    n.properties.insert(key, prop);
1296                                }
1297                                updated_nodes.insert(var);
1298                            }
1299                            Some(Value::Edge(e)) => {
1300                                if matches!(prop, Property::Null) {
1301                                    e.properties.remove(&key);
1302                                } else {
1303                                    e.properties.insert(key, prop);
1304                                }
1305                                updated_edges.insert(var);
1306                            }
1307                            _ => return Err(Error::UnboundVariable(var)),
1308                        },
1309                        Action::AddLabels { var, labels } => match row.get_mut(&var) {
1310                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1311                                continue
1312                            }
1313                            Some(Value::Node(n)) => {
1314                                for label in labels {
1315                                    if !n.labels.contains(&label) {
1316                                        n.labels.push(label);
1317                                    }
1318                                }
1319                                updated_nodes.insert(var);
1320                            }
1321                            _ => return Err(Error::UnboundVariable(var)),
1322                        },
1323                        Action::Replace { var, props } => match row.get_mut(&var) {
1324                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1325                                continue
1326                            }
1327                            Some(Value::Node(n)) => {
1328                                n.properties.clear();
1329                                for (k, v) in props {
1330                                    if !matches!(v, Property::Null) {
1331                                        n.properties.insert(k, v);
1332                                    }
1333                                }
1334                                updated_nodes.insert(var);
1335                            }
1336                            Some(Value::Edge(e)) => {
1337                                e.properties.clear();
1338                                for (k, v) in props {
1339                                    if !matches!(v, Property::Null) {
1340                                        e.properties.insert(k, v);
1341                                    }
1342                                }
1343                                updated_edges.insert(var);
1344                            }
1345                            _ => return Err(Error::UnboundVariable(var)),
1346                        },
1347                        Action::Merge { var, props } => match row.get_mut(&var) {
1348                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1349                                continue
1350                            }
1351                            Some(Value::Node(n)) => {
1352                                for (k, v) in props {
1353                                    if matches!(v, Property::Null) {
1354                                        n.properties.remove(&k);
1355                                    } else {
1356                                        n.properties.insert(k, v);
1357                                    }
1358                                }
1359                                updated_nodes.insert(var);
1360                            }
1361                            Some(Value::Edge(e)) => {
1362                                for (k, v) in props {
1363                                    if matches!(v, Property::Null) {
1364                                        e.properties.remove(&k);
1365                                    } else {
1366                                        e.properties.insert(k, v);
1367                                    }
1368                                }
1369                                updated_edges.insert(var);
1370                            }
1371                            _ => return Err(Error::UnboundVariable(var)),
1372                        },
1373                    }
1374                }
1375
1376                // Phase 3: flush each mutated entity once to the writer.
1377                for var in &updated_nodes {
1378                    if let Some(Value::Node(n)) = row.get(var) {
1379                        ctx.writer.put_node(n)?;
1380                    }
1381                }
1382                for var in &updated_edges {
1383                    if let Some(Value::Edge(e)) = row.get(var) {
1384                        ctx.writer.put_edge(e)?;
1385                    }
1386                }
1387
1388                Ok(Some(row))
1389            }
1390        }
1391    }
1392}
1393
1394struct RemoveOp {
1395    input: Box<dyn Operator>,
1396    items: Vec<RemoveSpec>,
1397}
1398
1399impl RemoveOp {
1400    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1401        Self { input, items }
1402    }
1403}
1404
1405impl Operator for RemoveOp {
1406    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1407        match self.input.next(ctx)? {
1408            None => Ok(None),
1409            Some(mut row) => {
1410                let mut updated_nodes: HashSet<String> = HashSet::new();
1411                let mut updated_edges: HashSet<String> = HashSet::new();
1412                for item in &self.items {
1413                    match item {
1414                        RemoveSpec::Property { var, key } => match row.get_mut(var) {
1415                            // Null target (from OPTIONAL MATCH) is a
1416                            // no-op, matching Neo4j's REMOVE semantics.
1417                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1418                                continue
1419                            }
1420                            Some(Value::Node(n)) => {
1421                                n.properties.remove(key);
1422                                updated_nodes.insert(var.clone());
1423                            }
1424                            Some(Value::Edge(e)) => {
1425                                e.properties.remove(key);
1426                                updated_edges.insert(var.clone());
1427                            }
1428                            _ => return Err(Error::UnboundVariable(var.clone())),
1429                        },
1430                        RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1431                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1432                                continue
1433                            }
1434                            Some(Value::Node(n)) => {
1435                                n.labels.retain(|l| !labels.contains(l));
1436                                updated_nodes.insert(var.clone());
1437                            }
1438                            _ => return Err(Error::UnboundVariable(var.clone())),
1439                        },
1440                    }
1441                }
1442                for var in &updated_nodes {
1443                    if let Some(Value::Node(n)) = row.get(var) {
1444                        ctx.writer.put_node(n)?;
1445                    }
1446                }
1447                for var in &updated_edges {
1448                    if let Some(Value::Edge(e)) = row.get(var) {
1449                        ctx.writer.put_edge(e)?;
1450                    }
1451                }
1452                Ok(Some(row))
1453            }
1454        }
1455    }
1456}
1457
1458struct LoadCsvOp {
1459    input: Option<Box<dyn Operator>>,
1460    path_expr: Expr,
1461    var: String,
1462    with_headers: bool,
1463    rows: Option<Vec<Value>>,
1464    cursor: usize,
1465}
1466
1467impl LoadCsvOp {
1468    fn new(
1469        input: Option<Box<dyn Operator>>,
1470        path_expr: Expr,
1471        var: String,
1472        with_headers: bool,
1473    ) -> Self {
1474        Self {
1475            input,
1476            path_expr,
1477            var,
1478            with_headers,
1479            rows: None,
1480            cursor: 0,
1481        }
1482    }
1483
1484    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1485        let ectx = ctx.eval_ctx(base_row);
1486        let path_val = eval_expr(&self.path_expr, &ectx)?;
1487        let path = match path_val {
1488            Value::Property(Property::String(s)) => s,
1489            _ => return Err(Error::TypeMismatch),
1490        };
1491        let content = std::fs::read_to_string(&path).map_err(|e| {
1492            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1493        })?;
1494        let mut lines = content.lines();
1495        let headers: Option<Vec<String>> = if self.with_headers {
1496            lines
1497                .next()
1498                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1499        } else {
1500            None
1501        };
1502        let mut csv_rows = Vec::new();
1503        for line in lines {
1504            if line.trim().is_empty() {
1505                continue;
1506            }
1507            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1508            if let Some(hdrs) = &headers {
1509                let mut map = std::collections::HashMap::new();
1510                for (i, h) in hdrs.iter().enumerate() {
1511                    let val = fields.get(i).cloned().unwrap_or_default();
1512                    map.insert(h.clone(), Property::String(val));
1513                }
1514                csv_rows.push(Value::Property(Property::Map(map)));
1515            } else {
1516                let list: Vec<Value> = fields
1517                    .into_iter()
1518                    .map(|f| Value::Property(Property::String(f)))
1519                    .collect();
1520                csv_rows.push(Value::List(list));
1521            }
1522        }
1523        self.rows = Some(csv_rows);
1524        self.cursor = 0;
1525        Ok(())
1526    }
1527}
1528
1529impl Operator for LoadCsvOp {
1530    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1531        if self.rows.is_none() {
1532            let base = if let Some(input) = &mut self.input {
1533                match input.next(ctx)? {
1534                    Some(r) => r,
1535                    None => return Ok(None),
1536                }
1537            } else {
1538                Row::new()
1539            };
1540            self.load(ctx, &base)?;
1541        }
1542        let rows = self.rows.as_ref().unwrap();
1543        if self.cursor < rows.len() {
1544            let val = rows[self.cursor].clone();
1545            self.cursor += 1;
1546            let mut row = Row::new();
1547            row.insert(self.var.clone(), val);
1548            Ok(Some(row))
1549        } else {
1550            Ok(None)
1551        }
1552    }
1553}
1554
1555struct ForeachOp {
1556    input: Box<dyn Operator>,
1557    var: String,
1558    list_expr: Expr,
1559    set_assignments: Vec<SetAssignment>,
1560    remove_items: Vec<RemoveSpec>,
1561}
1562
1563impl ForeachOp {
1564    fn new(
1565        input: Box<dyn Operator>,
1566        var: String,
1567        list_expr: Expr,
1568        set_assignments: Vec<SetAssignment>,
1569        remove_items: Vec<RemoveSpec>,
1570    ) -> Self {
1571        Self {
1572            input,
1573            var,
1574            list_expr,
1575            set_assignments,
1576            remove_items,
1577        }
1578    }
1579}
1580
1581impl Operator for ForeachOp {
1582    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1583        let Some(row) = self.input.next(ctx)? else {
1584            return Ok(None);
1585        };
1586        let ectx = ctx.eval_ctx(&row);
1587        let list_val = eval_expr(&self.list_expr, &ectx)?;
1588        let items = match list_val {
1589            Value::List(items) => items,
1590            Value::Property(Property::List(props)) => {
1591                props.into_iter().map(Value::Property).collect()
1592            }
1593            Value::Null | Value::Property(Property::Null) => Vec::new(),
1594            _ => return Err(Error::TypeMismatch),
1595        };
1596        for item in items {
1597            let mut scratch = row.clone();
1598            scratch.insert(self.var.clone(), item);
1599            for a in &self.set_assignments {
1600                match a {
1601                    SetAssignment::Property { var, key, value } => {
1602                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1603                        let prop = value_to_property(evaluated)?;
1604                        match scratch.get_mut(var) {
1605                            Some(Value::Node(n)) => {
1606                                n.properties.insert(key.clone(), prop);
1607                            }
1608                            Some(Value::Edge(e)) => {
1609                                e.properties.insert(key.clone(), prop);
1610                            }
1611                            _ => return Err(Error::UnboundVariable(var.clone())),
1612                        }
1613                    }
1614                    SetAssignment::Labels { var, labels } => {
1615                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1616                            for l in labels {
1617                                if !n.labels.contains(l) {
1618                                    n.labels.push(l.clone());
1619                                }
1620                            }
1621                        }
1622                    }
1623                    _ => {}
1624                }
1625            }
1626            for ri in &self.remove_items {
1627                match ri {
1628                    RemoveSpec::Property { var, key } => {
1629                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1630                            n.properties.remove(key);
1631                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1632                            e.properties.remove(key);
1633                        }
1634                    }
1635                    RemoveSpec::Labels { var, labels } => {
1636                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1637                            n.labels.retain(|l| !labels.contains(l));
1638                        }
1639                    }
1640                }
1641            }
1642            // Flush mutated entities for each iteration
1643            for (_, val) in scratch.iter() {
1644                match val {
1645                    Value::Node(n) => ctx.writer.put_node(n)?,
1646                    Value::Edge(e) => ctx.writer.put_edge(e)?,
1647                    _ => {}
1648                }
1649            }
1650        }
1651        Ok(Some(row))
1652    }
1653}
1654
1655struct SeedRowOp {
1656    done: bool,
1657}
1658
1659impl Operator for SeedRowOp {
1660    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1661        if self.done {
1662            return Ok(None);
1663        }
1664        self.done = true;
1665        Ok(Some(Row::new()))
1666    }
1667}
1668
1669struct SeededRowOp {
1670    row: Option<Row>,
1671}
1672
1673impl Operator for SeededRowOp {
1674    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1675        Ok(self.row.take())
1676    }
1677}
1678
1679struct CallSubqueryOp {
1680    input: Box<dyn Operator>,
1681    body_plan: LogicalPlan,
1682    pending: Vec<Row>,
1683    pending_idx: usize,
1684}
1685
1686impl CallSubqueryOp {
1687    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1688        Self {
1689            input,
1690            body_plan,
1691            pending: Vec::new(),
1692            pending_idx: 0,
1693        }
1694    }
1695}
1696
1697impl Operator for CallSubqueryOp {
1698    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1699        loop {
1700            if self.pending_idx < self.pending.len() {
1701                let row = self.pending[self.pending_idx].clone();
1702                self.pending_idx += 1;
1703                return Ok(Some(row));
1704            }
1705            let outer_row = match self.input.next(ctx)? {
1706                Some(r) => r,
1707                None => return Ok(None),
1708            };
1709            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1710            let mut results = Vec::new();
1711            while let Some(body_row) = body_op.next(ctx)? {
1712                let mut merged = outer_row.clone();
1713                for (k, v) in body_row {
1714                    merged.insert(k, v);
1715                }
1716                results.push(merged);
1717            }
1718            if results.is_empty() {
1719                continue;
1720            }
1721            self.pending = results;
1722            self.pending_idx = 0;
1723        }
1724    }
1725}
1726
1727/// Per-input-row left-join driver (see
1728/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
1729/// outer row with the row as its seed; forwards all emitted rows
1730/// merged with the outer bindings, and emits one null-fallback
1731/// row (outer bindings plus `null_vars` bound to Null) only when
1732/// the body produced zero rows for that input.
1733struct OptionalApplyOp {
1734    input: Box<dyn Operator>,
1735    body_plan: LogicalPlan,
1736    null_vars: Vec<String>,
1737    pending: Vec<Row>,
1738    pending_idx: usize,
1739}
1740
1741impl OptionalApplyOp {
1742    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1743        Self {
1744            input,
1745            body_plan,
1746            null_vars,
1747            pending: Vec::new(),
1748            pending_idx: 0,
1749        }
1750    }
1751}
1752
1753impl Operator for OptionalApplyOp {
1754    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1755        loop {
1756            if self.pending_idx < self.pending.len() {
1757                let row = self.pending[self.pending_idx].clone();
1758                self.pending_idx += 1;
1759                return Ok(Some(row));
1760            }
1761            let outer_row = match self.input.next(ctx)? {
1762                Some(r) => r,
1763                None => return Ok(None),
1764            };
1765            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1766            // Push the outer row onto `outer_rows` so Filter /
1767            // eval_expr inside the body can resolve outer
1768            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
1769            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1770            stacked.push(&outer_row);
1771            stacked.extend_from_slice(ctx.outer_rows);
1772            let inner_ctx = ExecCtx {
1773                store: ctx.store,
1774                writer: ctx.writer,
1775                params: ctx.params,
1776                procedures: ctx.procedures,
1777                outer_rows: &stacked,
1778                tombstones: ctx.tombstones,
1779            };
1780            let mut results = Vec::new();
1781            while let Some(body_row) = body_op.next(&inner_ctx)? {
1782                let mut merged = outer_row.clone();
1783                for (k, v) in body_row {
1784                    merged.insert(k, v);
1785                }
1786                results.push(merged);
1787            }
1788            if results.is_empty() {
1789                let mut fallback = outer_row;
1790                for v in &self.null_vars {
1791                    fallback.insert(v.clone(), Value::Null);
1792                }
1793                return Ok(Some(fallback));
1794            }
1795            self.pending = results;
1796            self.pending_idx = 0;
1797        }
1798    }
1799}
1800
1801/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
1802///
1803/// Resolves the procedure against the [`ProcedureRegistry`] at
1804/// `next()` time, validates arity / types / form (implicit args,
1805/// `YIELD *`, in-query YIELD requirement), then scans the registered
1806/// data-table for rows whose input cells match the call arguments.
1807/// Each matching row is projected according to the YIELD spec and
1808/// merged into the upstream row.
1809///
1810/// Two shapes are supported at once:
1811/// * Standalone (`input = None`): the op is the full plan. It
1812///   runs the procedure exactly once (against a synthetic empty row)
1813///   and emits the matching rows directly.
1814/// * In-query (`input = Some(...)`): for each upstream row, runs
1815///   the procedure with args evaluated against that row and emits
1816///   one merged row per match. Procedures with zero declared output
1817///   columns act as a pass-through — the upstream row is forwarded
1818///   unchanged, matching Neo4j's side-effect-only semantics.
1819struct ProcedureCallOp {
1820    input: Option<Box<dyn Operator>>,
1821    qualified_name: Vec<String>,
1822    args: Option<Vec<Expr>>,
1823    yield_spec: Option<YieldSpec>,
1824    standalone: bool,
1825    buffered: Vec<Row>,
1826    buffered_idx: usize,
1827    // Only set for the standalone form, which drives itself off a
1828    // synthetic seed row exactly once.
1829    done: bool,
1830}
1831
1832impl ProcedureCallOp {
1833    fn new(
1834        input: Option<Box<dyn Operator>>,
1835        qualified_name: Vec<String>,
1836        args: Option<Vec<Expr>>,
1837        yield_spec: Option<YieldSpec>,
1838        standalone: bool,
1839    ) -> Self {
1840        Self {
1841            input,
1842            qualified_name,
1843            args,
1844            yield_spec,
1845            standalone,
1846            buffered: Vec::new(),
1847            buffered_idx: 0,
1848            done: false,
1849        }
1850    }
1851
1852    /// Resolve the projection list `(source_column, output_alias)`
1853    /// from the procedure signature and this op's yield spec. Also
1854    /// validates the spec — unknown YIELD columns, duplicate
1855    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
1856    /// surface as `Error::Procedure`.
1857    fn resolve_projection(
1858        &self,
1859        proc: &crate::procedures::Procedure,
1860    ) -> Result<Vec<(String, String)>> {
1861        match &self.yield_spec {
1862            None => {
1863                if !self.standalone {
1864                    // In-query CALL with no YIELD: legal only for
1865                    // side-effect-only procedures (zero declared
1866                    // outputs). A procedure with outputs but no
1867                    // YIELD leaves those outputs unbound, so any
1868                    // downstream RETURN that references them would
1869                    // see `UndefinedVariable`; reject here so the
1870                    // error surfaces instead of silently emitting
1871                    // nulls.
1872                    if proc.outputs.is_empty() {
1873                        return Ok(Vec::new());
1874                    }
1875                    return Err(Error::Procedure(format!(
1876                        "procedure '{}' has outputs but no YIELD clause",
1877                        self.qualified_name.join(".")
1878                    )));
1879                }
1880                Ok(proc
1881                    .outputs
1882                    .iter()
1883                    .map(|o| (o.name.clone(), o.name.clone()))
1884                    .collect())
1885            }
1886            Some(YieldSpec::Star) => {
1887                if !self.standalone {
1888                    return Err(Error::Procedure(
1889                        "YIELD * is only allowed on standalone CALL".into(),
1890                    ));
1891                }
1892                Ok(proc
1893                    .outputs
1894                    .iter()
1895                    .map(|o| (o.name.clone(), o.name.clone()))
1896                    .collect())
1897            }
1898            Some(YieldSpec::Items(items)) => {
1899                let mut projection = Vec::with_capacity(items.len());
1900                let mut seen_aliases: std::collections::HashSet<String> =
1901                    std::collections::HashSet::new();
1902                for yi in items {
1903                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
1904                        return Err(Error::Procedure(format!(
1905                            "procedure '{}' has no output column '{}'",
1906                            self.qualified_name.join("."),
1907                            yi.column
1908                        )));
1909                    }
1910                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
1911                    if !seen_aliases.insert(alias.clone()) {
1912                        return Err(Error::Procedure(format!(
1913                            "variable '{alias}' already bound by YIELD"
1914                        )));
1915                    }
1916                    projection.push((yi.column.clone(), alias));
1917                }
1918                Ok(projection)
1919            }
1920        }
1921    }
1922
1923    /// Evaluate the call's argument list against `row`. For the
1924    /// implicit-args form (`args = None`), each declared input
1925    /// column's value comes from the per-query parameter map
1926    /// (keyed by the input-column name). Returns the argument
1927    /// values in declaration order. Raises `ProcedureError` on
1928    /// arity mismatch, type mismatch, or missing parameter.
1929    fn evaluate_args(
1930        &self,
1931        ctx: &ExecCtx,
1932        row: &Row,
1933        proc: &crate::procedures::Procedure,
1934    ) -> Result<Vec<Value>> {
1935        match &self.args {
1936            Some(exprs) => {
1937                if exprs.len() != proc.inputs.len() {
1938                    return Err(Error::Procedure(format!(
1939                        "procedure '{}' expects {} argument(s), got {}",
1940                        self.qualified_name.join("."),
1941                        proc.inputs.len(),
1942                        exprs.len()
1943                    )));
1944                }
1945                let eval_ctx = ctx.eval_ctx(row);
1946                let mut values = Vec::with_capacity(exprs.len());
1947                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
1948                    let v = eval_expr(expr, &eval_ctx)?;
1949                    if !spec.ty.accepts(&v) {
1950                        return Err(Error::Procedure(format!(
1951                            "argument '{}' has wrong type for procedure '{}'",
1952                            spec.name,
1953                            self.qualified_name.join(".")
1954                        )));
1955                    }
1956                    values.push(coerce_arg(v, spec.ty));
1957                }
1958                Ok(values)
1959            }
1960            None => {
1961                // Implicit-arg form only valid standalone.
1962                if !self.standalone {
1963                    return Err(Error::Procedure(
1964                        "in-query CALL requires explicit argument list".into(),
1965                    ));
1966                }
1967                let mut values = Vec::with_capacity(proc.inputs.len());
1968                for spec in &proc.inputs {
1969                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
1970                        Error::Procedure(format!(
1971                            "missing parameter ${} for procedure '{}'",
1972                            spec.name,
1973                            self.qualified_name.join(".")
1974                        ))
1975                    })?;
1976                    if !spec.ty.accepts(&v) {
1977                        return Err(Error::Procedure(format!(
1978                            "parameter '{}' has wrong type",
1979                            spec.name
1980                        )));
1981                    }
1982                    values.push(coerce_arg(v, spec.ty));
1983                }
1984                Ok(values)
1985            }
1986        }
1987    }
1988
1989    /// Invoke the procedure once for `input_row` and emit zero or
1990    /// more output rows into `out`. Handles the "zero-output-column
1991    /// pass-through" case that keeps `MATCH (n) CALL test.doNothing()
1992    /// RETURN n.name` from filtering the match rows.
1993    fn invoke_once(
1994        &self,
1995        ctx: &ExecCtx,
1996        input_row: &Row,
1997        proc: &crate::procedures::Procedure,
1998        projection: &[(String, String)],
1999        out: &mut Vec<Row>,
2000    ) -> Result<()> {
2001        // Zero-output-column procedures are side-effect-only in
2002        // the TCK; they either suppress rows entirely (standalone)
2003        // or pass the input row through unchanged (in-query).
2004        if proc.outputs.is_empty() {
2005            if !self.standalone {
2006                out.push(input_row.clone());
2007            }
2008            return Ok(());
2009        }
2010        let args = self.evaluate_args(ctx, input_row, proc)?;
2011        let rows = proc.resolve_rows(ctx.store)?;
2012        for proc_row in &rows {
2013            if !proc.row_matches(proc_row, &args) {
2014                continue;
2015            }
2016            let mut merged = if self.standalone {
2017                Row::new()
2018            } else {
2019                input_row.clone()
2020            };
2021            for (src, alias) in projection {
2022                let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2023                merged.insert(alias.clone(), v);
2024            }
2025            out.push(merged);
2026        }
2027        Ok(())
2028    }
2029}
2030
2031/// Cast an int to a float when the declared type is `FLOAT`. Other
2032/// declared types leave the value as-is (accept() has already
2033/// gated on kind) so the comparison in `row_matches` sees a
2034/// consistent shape.
2035fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2036    use crate::procedures::ProcType;
2037    if matches!(ty, ProcType::Float) {
2038        if let Value::Property(Property::Int64(n)) = v {
2039            return Value::Property(Property::Float64(n as f64));
2040        }
2041    }
2042    v
2043}
2044
2045impl Operator for ProcedureCallOp {
2046    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2047        loop {
2048            if self.buffered_idx < self.buffered.len() {
2049                let row = self.buffered[self.buffered_idx].clone();
2050                self.buffered_idx += 1;
2051                return Ok(Some(row));
2052            }
2053            self.buffered.clear();
2054            self.buffered_idx = 0;
2055
2056            let proc = match ctx.procedures.get(&self.qualified_name) {
2057                Some(p) => p,
2058                None => {
2059                    return Err(Error::Procedure(format!(
2060                        "procedure '{}' not found",
2061                        self.qualified_name.join(".")
2062                    )));
2063                }
2064            };
2065            let projection = self.resolve_projection(proc)?;
2066
2067            let input_row = match &mut self.input {
2068                Some(inp) => match inp.next(ctx)? {
2069                    Some(r) => r,
2070                    None => return Ok(None),
2071                },
2072                None => {
2073                    if self.done {
2074                        return Ok(None);
2075                    }
2076                    self.done = true;
2077                    Row::new()
2078                }
2079            };
2080
2081            let mut produced = Vec::new();
2082            self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2083            if produced.is_empty() {
2084                if self.input.is_some() {
2085                    continue;
2086                }
2087                return Ok(None);
2088            }
2089            self.buffered = produced;
2090        }
2091    }
2092}
2093
2094/// Pull a property map out of a value. Supports node / edge
2095/// (uses their live property map) and map values (parameter or
2096/// map literal). Null propagates as an empty map — `SET x = null`
2097/// is spec'd as a property clear / no-op and SET = <null-binding>
2098/// (from an unmatched OPTIONAL MATCH) matches that shape.
2099fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2100    match v {
2101        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2102        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2103        Value::Map(pairs) => pairs
2104            .iter()
2105            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2106            .collect(),
2107        Value::Property(Property::Map(entries)) => Ok(entries
2108            .iter()
2109            .map(|(k, p)| (k.clone(), p.clone()))
2110            .collect()),
2111        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2112        _ => Err(Error::InvalidSetValue),
2113    }
2114}
2115
2116fn value_to_property(v: Value) -> Result<Property> {
2117    match v {
2118        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2119        Value::Property(p) => Ok(p),
2120        Value::Null => Ok(Property::Null),
2121        Value::List(items) => {
2122            let props: Vec<Property> = items
2123                .into_iter()
2124                .map(value_to_property)
2125                .collect::<Result<_>>()?;
2126            Ok(Property::List(props))
2127        }
2128        // Graph-aware `Value::Map` and graph elements can't be
2129        // stored as node / edge property values; SET will reject
2130        // them.
2131        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2132            Err(Error::InvalidSetValue)
2133        }
2134    }
2135}
2136
2137struct NodeScanAllOp {
2138    var: String,
2139    ids: Option<Vec<NodeId>>,
2140    cursor: usize,
2141}
2142
2143impl NodeScanAllOp {
2144    fn new(var: String) -> Self {
2145        Self {
2146            var,
2147            ids: None,
2148            cursor: 0,
2149        }
2150    }
2151}
2152
2153impl Operator for NodeScanAllOp {
2154    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2155        if self.ids.is_none() {
2156            self.ids = Some(ctx.store.all_node_ids()?);
2157        }
2158        let ids = self.ids.as_ref().unwrap();
2159        while self.cursor < ids.len() {
2160            let id = ids[self.cursor];
2161            self.cursor += 1;
2162            if let Some(node) = ctx.store.get_node(id)? {
2163                let mut row = Row::new();
2164                row.insert(self.var.clone(), Value::Node(node));
2165                return Ok(Some(row));
2166            }
2167        }
2168        Ok(None)
2169    }
2170}
2171
2172struct NodeScanByLabelsOp {
2173    var: String,
2174    labels: Vec<String>,
2175    ids: Option<Vec<NodeId>>,
2176    cursor: usize,
2177}
2178
2179impl NodeScanByLabelsOp {
2180    fn new(var: String, labels: Vec<String>) -> Self {
2181        Self {
2182            var,
2183            labels,
2184            ids: None,
2185            cursor: 0,
2186        }
2187    }
2188}
2189
2190impl Operator for NodeScanByLabelsOp {
2191    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2192        if self.ids.is_none() {
2193            // Use the first label for the index scan, filter the rest per-node.
2194            let primary = self
2195                .labels
2196                .first()
2197                .expect("NodeScanByLabels must have at least one label");
2198            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2199        }
2200        let ids = self.ids.as_ref().unwrap();
2201        while self.cursor < ids.len() {
2202            let id = ids[self.cursor];
2203            self.cursor += 1;
2204            if let Some(node) = ctx.store.get_node(id)? {
2205                if has_all_labels(&node, &self.labels) {
2206                    let mut row = Row::new();
2207                    row.insert(self.var.clone(), Value::Node(node));
2208                    return Ok(Some(row));
2209                }
2210            }
2211        }
2212        Ok(None)
2213    }
2214}
2215
2216fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2217    labels.iter().all(|l| node.labels.contains(l))
2218}
2219
2220/// Equality-lookup operator backed by a property index. Evaluates
2221/// the value expression lazily on the first `next()` — crucially
2222/// so parameters resolve against the per-query `ExecCtx::params`
2223/// map, not against a literal baked in at plan-construction time.
2224///
2225/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2226/// that slipped through) surface as `Error::InvalidSetValue`. The
2227/// planner only emits this op for indexes that do exist, so the
2228/// reader call should find a populated CF unless a concurrent DROP
2229/// raced us (in which case the result is just empty, which matches
2230/// how Neo4j's planner handles the race).
2231struct IndexSeekOp {
2232    var: String,
2233    label: String,
2234    property: String,
2235    value_expr: Expr,
2236    results: Option<Vec<NodeId>>,
2237    cursor: usize,
2238}
2239
2240impl IndexSeekOp {
2241    fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2242        Self {
2243            var,
2244            label,
2245            property,
2246            value_expr,
2247            results: None,
2248            cursor: 0,
2249        }
2250    }
2251}
2252
2253impl Operator for IndexSeekOp {
2254    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2255        if self.results.is_none() {
2256            let empty = Row::new();
2257            let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2258            let property = match value {
2259                Value::Property(p) => p,
2260                Value::Null => Property::Null,
2261                Value::Node(_)
2262                | Value::Edge(_)
2263                | Value::List(_)
2264                | Value::Map(_)
2265                | Value::Path { .. } => {
2266                    return Err(Error::InvalidSetValue);
2267                }
2268            };
2269            let ids = ctx
2270                .store
2271                .nodes_by_property(&self.label, &self.property, &property)?;
2272            self.results = Some(ids);
2273        }
2274        let ids = self.results.as_ref().unwrap();
2275        while self.cursor < ids.len() {
2276            let id = ids[self.cursor];
2277            self.cursor += 1;
2278            if let Some(node) = ctx.store.get_node(id)? {
2279                let mut row = Row::new();
2280                row.insert(self.var.clone(), Value::Node(node));
2281                return Ok(Some(row));
2282            }
2283        }
2284        Ok(None)
2285    }
2286}
2287
2288fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2289    props.iter().all(|(k, v)| {
2290        node.properties
2291            .get(k)
2292            .map(|stored| stored == v)
2293            .unwrap_or(false)
2294    })
2295}
2296
2297struct MergeNodeOp {
2298    var: String,
2299    labels: Vec<String>,
2300    /// Pattern property expressions as they came from the planner. These
2301    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
2302    /// isn't available until `next()` is called.
2303    properties: Vec<(String, Expr)>,
2304    /// `ON CREATE SET ...` assignments — applied only when the
2305    /// merge took the create branch. Evaluated against a row
2306    /// `{var → Node}` so the value expressions can reference the
2307    /// just-created node.
2308    on_create: Vec<SetAssignment>,
2309    /// `ON MATCH SET ...` assignments — applied to every matched
2310    /// node when the merge took the match branch. Same row shape
2311    /// as `on_create`.
2312    on_match: Vec<SetAssignment>,
2313    /// Optional upstream operator. `None` means this is a
2314    /// top-level producer (`MERGE (n) RETURN n`) and emits
2315    /// rows with a fresh empty base. `Some` means this is a
2316    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
2317    /// and each emitted row is a cross-join between an input
2318    /// row and a merge-result node.
2319    input: Option<Box<dyn Operator>>,
2320    /// Cached merge result. Populated on the first `next()`
2321    /// call by running the scan + maybe-create logic *once*,
2322    /// then reused for every input row. Running the merge
2323    /// exactly once sidesteps the read-after-write issue in
2324    /// buffered-writer mode (a node created by the first input
2325    /// row wouldn't be visible to a re-scan on the second).
2326    merged_nodes: Vec<Node>,
2327    /// Whether `merged_nodes` has been populated. The merge
2328    /// logic runs lazily on the first `next()` so
2329    /// `ExecCtx::params` is available.
2330    merge_done: bool,
2331    /// Current upstream row — held between calls while we drain
2332    /// `merged_nodes` against it. `None` for the top-level
2333    /// case (no input).
2334    current_input_row: Option<Row>,
2335    cursor: usize,
2336}
2337
2338impl MergeNodeOp {
2339    fn new(
2340        input: Option<Box<dyn Operator>>,
2341        var: String,
2342        labels: Vec<String>,
2343        properties: Vec<(String, Expr)>,
2344        on_create: Vec<SetAssignment>,
2345        on_match: Vec<SetAssignment>,
2346    ) -> Self {
2347        Self {
2348            var,
2349            labels,
2350            properties,
2351            on_create,
2352            on_match,
2353            input,
2354            merged_nodes: Vec::new(),
2355            merge_done: false,
2356            current_input_row: None,
2357            cursor: 0,
2358        }
2359    }
2360
2361    /// Run the MERGE logic exactly once: scan the store for
2362    /// existing matches, apply ON MATCH SET to each; or create
2363    /// a fresh node and apply ON CREATE SET; persist everything
2364    /// via `ctx.writer`; stash the resulting nodes in
2365    /// `self.merged_nodes`. Idempotent — subsequent calls are
2366    /// no-ops once `self.merge_done` is set.
2367    /// Resolve the pattern properties against `base`, scan the
2368    /// store, and either match existing nodes or create a fresh
2369    /// one. Returns the resulting node set — can be called
2370    /// multiple times with different `base` rows for
2371    /// input-driven merges.
2372    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2373        let resolved_props: Vec<(String, Property)> = self
2374            .properties
2375            .iter()
2376            .map(|(k, expr)| {
2377                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2378                Ok((k.clone(), value_to_property(v)?))
2379            })
2380            .collect::<Result<Vec<_>>>()?;
2381
2382        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2383            ctx.store.nodes_by_label(primary)?
2384        } else {
2385            ctx.store.all_node_ids()?
2386        };
2387        let mut merged_nodes: Vec<Node> = Vec::new();
2388        for id in candidate_ids {
2389            if let Some(node) = ctx.store.get_node(id)? {
2390                if has_all_labels(&node, &self.labels)
2391                    && matches_pattern_props(&node, &resolved_props)
2392                {
2393                    merged_nodes.push(node);
2394                }
2395            }
2396        }
2397
2398        if merged_nodes.is_empty() {
2399            let mut node = Node::new();
2400            for label in &self.labels {
2401                node.labels.push(label.clone());
2402            }
2403            for (k, prop) in resolved_props {
2404                node.properties.insert(k, prop);
2405            }
2406            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2407            ctx.writer.put_node(&node)?;
2408            merged_nodes.push(node);
2409        } else if !self.on_match.is_empty() {
2410            for node in merged_nodes.iter_mut() {
2411                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2412                ctx.writer.put_node(node)?;
2413            }
2414        }
2415        Ok(merged_nodes)
2416    }
2417}
2418
2419impl Operator for MergeNodeOp {
2420    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2421        // Top-level producer: no upstream context, so the pattern
2422        // properties can only reference literals / parameters.
2423        // Run the merge once against an empty row, then emit
2424        // each result.
2425        if self.input.is_none() {
2426            if !self.merge_done {
2427                let empty = Row::new();
2428                let nodes = self.run_merge_for(ctx, &empty)?;
2429                self.merged_nodes = nodes;
2430                self.merge_done = true;
2431            }
2432            if self.cursor < self.merged_nodes.len() {
2433                let node = self.merged_nodes[self.cursor].clone();
2434                self.cursor += 1;
2435                let mut row = Row::new();
2436                row.insert(self.var.clone(), Value::Node(node));
2437                return Ok(Some(row));
2438            }
2439            return Ok(None);
2440        }
2441
2442        // Input-driven case: evaluate pattern properties *per
2443        // input row* so references like `MERGE (:City {name:
2444        // person.bornIn})` resolve against the bound `person`.
2445        // Each incoming row produces its own merged-node set,
2446        // which is then cross-joined with that row before
2447        // emission.
2448        loop {
2449            if let Some(base) = self.current_input_row.as_ref() {
2450                if self.cursor < self.merged_nodes.len() {
2451                    let node = self.merged_nodes[self.cursor].clone();
2452                    self.cursor += 1;
2453                    let mut row = base.clone();
2454                    row.insert(self.var.clone(), Value::Node(node));
2455                    return Ok(Some(row));
2456                }
2457            }
2458            match self.input.as_mut().unwrap().next(ctx)? {
2459                None => return Ok(None),
2460                Some(row) => {
2461                    let nodes = self.run_merge_for(ctx, &row)?;
2462                    self.merged_nodes = nodes;
2463                    self.cursor = 0;
2464                    self.current_input_row = Some(row);
2465                }
2466            }
2467        }
2468    }
2469}
2470
2471/// Apply MERGE-conditional SET assignments (`ON CREATE` or
2472/// Find-or-create executor for edge MERGE
2473/// (`MERGE (a)-[r:KNOWS]->(b)`).
2474///
2475/// For every row pulled from `input`, looks up the `src_var`
2476/// and `dst_var` bindings (which must be `Value::Node` — the
2477/// planner enforces that they came from a prior MATCH or
2478/// MERGE), scans `src`'s outgoing edges, and either:
2479///
2480/// - Picks the first edge of type `edge_type` whose target is
2481///   `dst` and applies `on_match` to it, or
2482/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
2483///   `on_create`, and persists it via `ctx.writer.put_edge`.
2484///
2485/// Either way, the resulting edge is bound into `edge_var` in
2486/// the output row and the row is emitted. v1 restrictions:
2487/// single directed hop, both endpoints already bound,
2488/// explicit relationship type.
2489struct MergeEdgeOp {
2490    input: Box<dyn Operator>,
2491    edge_var: String,
2492    src_var: String,
2493    dst_var: String,
2494    edge_type: String,
2495    undirected: bool,
2496    /// Inline edge property filter from the MERGE pattern
2497    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
2498    /// the create branch stamps them onto the new edge.
2499    properties: Vec<(String, Expr)>,
2500    on_create: Vec<SetAssignment>,
2501    on_match: Vec<SetAssignment>,
2502    /// Rows buffered from the current input row's MERGE result —
2503    /// one per existing matched edge (or a single synthesized edge
2504    /// if the create branch fired). Drained before the next
2505    /// `input.next()` call so multi-match semantics stay correct:
2506    /// `MATCH (a:A),(b:B) MERGE (a)-[r:T]->(b)` against two pre-
2507    /// existing edges has to yield two rows, not one.
2508    pending: std::collections::VecDeque<Row>,
2509}
2510
2511impl MergeEdgeOp {
2512    #[allow(clippy::too_many_arguments)]
2513    fn new(
2514        input: Box<dyn Operator>,
2515        edge_var: String,
2516        src_var: String,
2517        dst_var: String,
2518        edge_type: String,
2519        undirected: bool,
2520        properties: Vec<(String, Expr)>,
2521        on_create: Vec<SetAssignment>,
2522        on_match: Vec<SetAssignment>,
2523    ) -> Self {
2524        Self {
2525            input,
2526            edge_var,
2527            src_var,
2528            dst_var,
2529            edge_type,
2530            undirected,
2531            properties,
2532            on_create,
2533            on_match,
2534            pending: std::collections::VecDeque::new(),
2535        }
2536    }
2537}
2538
2539impl Operator for MergeEdgeOp {
2540    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2541        loop {
2542            if let Some(row) = self.pending.pop_front() {
2543                return Ok(Some(row));
2544            }
2545            let Some(row) = self.input.next(ctx)? else {
2546                return Ok(None);
2547            };
2548            // Resolve src/dst. Both must be Value::Node — the
2549            // planner enforces that the variables came from an
2550            // earlier producer, so anything else is a bug or a
2551            // later-added feature that didn't update the check.
2552            let src_node = match row.get(&self.src_var) {
2553                Some(Value::Node(n)) => n.clone(),
2554                _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2555            };
2556            let dst_node = match row.get(&self.dst_var) {
2557                Some(Value::Node(n)) => n.clone(),
2558                _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2559            };
2560
2561            // Evaluate the inline edge property filter once per
2562            // input row. These are AST expressions so they can
2563            // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
2564            let required_props: Vec<(String, Property)> = self
2565                .properties
2566                .iter()
2567                .map(|(k, expr)| {
2568                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2569                    Ok((k.clone(), value_to_property(v)?))
2570                })
2571                .collect::<Result<Vec<_>>>()?;
2572            let edge_matches = |edge: &Edge| -> bool {
2573                required_props.iter().all(|(k, want)| {
2574                    edge.properties
2575                        .get(k)
2576                        .map(|have| have == want)
2577                        .unwrap_or(false)
2578                })
2579            };
2580
2581            // Collect every edge of type `edge_type` from src to
2582            // dst (and, for undirected patterns, dst to src) that
2583            // also satisfies the inline property filter. If any
2584            // exist we take the match branch and yield one row per
2585            // match. If none exist we synthesize one and yield a
2586            // single row from the create branch.
2587            let mut matched: Vec<Edge> = Vec::new();
2588            for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2589                if neighbor_id != dst_node.id {
2590                    continue;
2591                }
2592                if let Some(edge) = ctx.store.get_edge(edge_id)? {
2593                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
2594                        matched.push(edge);
2595                    }
2596                }
2597            }
2598            if self.undirected {
2599                for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2600                    if neighbor_id != dst_node.id {
2601                        continue;
2602                    }
2603                    if let Some(edge) = ctx.store.get_edge(edge_id)? {
2604                        if edge.edge_type == self.edge_type && edge_matches(&edge) {
2605                            matched.push(edge);
2606                        }
2607                    }
2608                }
2609            }
2610
2611            if matched.is_empty() {
2612                let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2613                for (k, p) in &required_props {
2614                    new_edge.properties.insert(k.clone(), p.clone());
2615                }
2616                let mut row_out = row.clone();
2617                apply_merge_edge_actions(
2618                    &mut new_edge,
2619                    &self.on_create,
2620                    &self.edge_var,
2621                    ctx,
2622                    &mut row_out,
2623                )?;
2624                ctx.writer.put_edge(&new_edge)?;
2625                row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2626                self.pending.push_back(row_out);
2627            } else {
2628                for mut existing in matched {
2629                    let mut row_out = row.clone();
2630                    if !self.on_match.is_empty() {
2631                        apply_merge_edge_actions(
2632                            &mut existing,
2633                            &self.on_match,
2634                            &self.edge_var,
2635                            ctx,
2636                            &mut row_out,
2637                        )?;
2638                        ctx.writer.put_edge(&existing)?;
2639                    }
2640                    row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2641                    self.pending.push_back(row_out);
2642                }
2643            }
2644        }
2645    }
2646}
2647
2648/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
2649/// each `SetAssignment` against the outer `row` (augmented with
2650/// the current edge binding) and mutates either the edge itself
2651/// or a non-edge target node from the outer row. MERGE's ON
2652/// CREATE / ON MATCH clauses are scoped against the whole input
2653/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
2654/// reach outside the edge-local binding. Non-edge mutations
2655/// are persisted via `ctx.writer.put_node` so the change is
2656/// visible to later clauses.
2657fn apply_merge_edge_actions(
2658    edge: &mut Edge,
2659    actions: &[SetAssignment],
2660    var: &str,
2661    exec_ctx: &ExecCtx,
2662    outer: &mut Row,
2663) -> Result<()> {
2664    if actions.is_empty() {
2665        return Ok(());
2666    }
2667    // Edge binding is live in `outer` while we evaluate — RHS can
2668    // reference both the edge and any sibling node binding.
2669    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2670    for action in actions {
2671        match action {
2672            SetAssignment::Property {
2673                var: target,
2674                key,
2675                value,
2676            } => {
2677                let sub_ctx = exec_ctx.eval_ctx(outer);
2678                let evaluated = eval_expr(value, &sub_ctx)?;
2679                let prop = value_to_property(evaluated)?;
2680                if target == var {
2681                    if matches!(prop, Property::Null) {
2682                        edge.properties.remove(key);
2683                    } else {
2684                        edge.properties.insert(key.clone(), prop);
2685                    }
2686                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2687                } else {
2688                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2689                }
2690            }
2691            SetAssignment::Merge {
2692                var: target,
2693                properties,
2694            } => {
2695                let sub_ctx = exec_ctx.eval_ctx(outer);
2696                let resolved: Vec<(String, Property)> = properties
2697                    .iter()
2698                    .map(|(k, expr)| {
2699                        let v = eval_expr(expr, &sub_ctx)?;
2700                        Ok((k.clone(), value_to_property(v)?))
2701                    })
2702                    .collect::<Result<Vec<_>>>()?;
2703                if target == var {
2704                    for (k, p) in resolved {
2705                        edge.properties.insert(k, p);
2706                    }
2707                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2708                } else {
2709                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2710                }
2711            }
2712            SetAssignment::Replace {
2713                var: target,
2714                properties,
2715            } => {
2716                let sub_ctx = exec_ctx.eval_ctx(outer);
2717                let resolved: Vec<(String, Property)> = properties
2718                    .iter()
2719                    .map(|(k, expr)| {
2720                        let v = eval_expr(expr, &sub_ctx)?;
2721                        Ok((k.clone(), value_to_property(v)?))
2722                    })
2723                    .collect::<Result<Vec<_>>>()?;
2724                if target == var {
2725                    edge.properties.clear();
2726                    for (k, p) in resolved {
2727                        edge.properties.insert(k, p);
2728                    }
2729                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2730                } else {
2731                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2732                }
2733            }
2734            SetAssignment::Labels {
2735                var: target,
2736                labels,
2737            } => {
2738                if target == var {
2739                    // Edges don't carry labels.
2740                    return Err(Error::UnboundVariable(target.clone()));
2741                }
2742                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2743            }
2744            SetAssignment::ReplaceFromExpr {
2745                var: target,
2746                source,
2747                replace,
2748            } => {
2749                let sub_ctx = exec_ctx.eval_ctx(outer);
2750                let v = eval_expr(source, &sub_ctx)?;
2751                let props = extract_property_map(&v)?;
2752                if target == var {
2753                    if *replace {
2754                        edge.properties.clear();
2755                    }
2756                    for (k, p) in props {
2757                        edge.properties.insert(k, p);
2758                    }
2759                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2760                } else {
2761                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2762                }
2763            }
2764        }
2765    }
2766    Ok(())
2767}
2768
2769/// Apply a single `SET target.key = prop` to a node or edge bound
2770/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
2771/// the target isn't the merge edge itself (the common case being
2772/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
2773fn apply_set_prop_to_outer(
2774    outer: &mut Row,
2775    exec_ctx: &ExecCtx,
2776    target: &str,
2777    key: &str,
2778    prop: Property,
2779) -> Result<()> {
2780    match outer.get_mut(target) {
2781        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2782            // SET on a null target (typically an unmatched OPTIONAL
2783            // MATCH binding) is a silent no-op in openCypher.
2784            return Ok(());
2785        }
2786        Some(Value::Node(n)) => {
2787            if matches!(prop, Property::Null) {
2788                n.properties.remove(key);
2789            } else {
2790                n.properties.insert(key.to_string(), prop);
2791            }
2792            exec_ctx.writer.put_node(n)?;
2793        }
2794        Some(Value::Edge(e)) => {
2795            if matches!(prop, Property::Null) {
2796                e.properties.remove(key);
2797            } else {
2798                e.properties.insert(key.to_string(), prop);
2799            }
2800            exec_ctx.writer.put_edge(e)?;
2801        }
2802        _ => return Err(Error::UnboundVariable(target.to_string())),
2803    }
2804    Ok(())
2805}
2806
2807/// Apply a property-map assignment (`SET target = {..}` when
2808/// `replace`, or `SET target += {..}` when not) to a node or
2809/// edge bound in the outer row.
2810fn apply_set_map_to_outer(
2811    outer: &mut Row,
2812    exec_ctx: &ExecCtx,
2813    target: &str,
2814    props: Vec<(String, Property)>,
2815    replace: bool,
2816) -> Result<()> {
2817    match outer.get_mut(target) {
2818        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2819        Some(Value::Node(n)) => {
2820            if replace {
2821                n.properties.clear();
2822            }
2823            for (k, p) in props {
2824                if replace || !matches!(p, Property::Null) {
2825                    n.properties.insert(k, p);
2826                } else {
2827                    n.properties.remove(&k);
2828                }
2829            }
2830            exec_ctx.writer.put_node(n)?;
2831            Ok(())
2832        }
2833        Some(Value::Edge(e)) => {
2834            if replace {
2835                e.properties.clear();
2836            }
2837            for (k, p) in props {
2838                if replace || !matches!(p, Property::Null) {
2839                    e.properties.insert(k, p);
2840                } else {
2841                    e.properties.remove(&k);
2842                }
2843            }
2844            exec_ctx.writer.put_edge(e)?;
2845            Ok(())
2846        }
2847        _ => Err(Error::UnboundVariable(target.to_string())),
2848    }
2849}
2850
2851/// Apply a labels assignment to a node bound in the outer row.
2852fn apply_set_labels_to_outer(
2853    outer: &mut Row,
2854    exec_ctx: &ExecCtx,
2855    target: &str,
2856    labels: &[String],
2857) -> Result<()> {
2858    match outer.get_mut(target) {
2859        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2860        Some(Value::Node(n)) => {
2861            for label in labels {
2862                if !n.labels.contains(label) {
2863                    n.labels.push(label.clone());
2864                }
2865            }
2866            exec_ctx.writer.put_node(n)?;
2867            Ok(())
2868        }
2869        _ => Err(Error::UnboundVariable(target.to_string())),
2870    }
2871}
2872
2873/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
2874/// dispatch but specialized to a single bound variable so we
2875/// don't have to materialize a full row dispatcher.
2876///
2877/// Value expressions are evaluated against a temporary row
2878/// `{var → Node(node.clone())}` so the RHS can reference the
2879/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
2880/// works the same as if the SET ran in a SetPropertyOp pipeline.
2881fn apply_merge_actions(
2882    node: &mut Node,
2883    actions: &[SetAssignment],
2884    var: &str,
2885    exec_ctx: &ExecCtx,
2886    base_row: &Row,
2887) -> Result<()> {
2888    if actions.is_empty() {
2889        return Ok(());
2890    }
2891    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
2892    // can resolve `other` — then overlay the merged node under `var`.
2893    let mut row = base_row.clone();
2894    row.insert(var.to_string(), Value::Node(node.clone()));
2895    for action in actions {
2896        let sub_ctx = exec_ctx.eval_ctx(&row);
2897        match action {
2898            SetAssignment::Property {
2899                var: target,
2900                key,
2901                value,
2902            } => {
2903                if target != var {
2904                    return Err(Error::UnboundVariable(target.clone()));
2905                }
2906                let evaluated = eval_expr(value, &sub_ctx)?;
2907                let prop = value_to_property(evaluated)?;
2908                node.properties.insert(key.clone(), prop);
2909                row.insert(var.to_string(), Value::Node(node.clone()));
2910            }
2911            SetAssignment::Labels {
2912                var: target,
2913                labels,
2914            } => {
2915                if target != var {
2916                    return Err(Error::UnboundVariable(target.clone()));
2917                }
2918                for label in labels {
2919                    if !node.labels.contains(label) {
2920                        node.labels.push(label.clone());
2921                    }
2922                }
2923                row.insert(var.to_string(), Value::Node(node.clone()));
2924            }
2925            SetAssignment::Replace {
2926                var: target,
2927                properties,
2928            } => {
2929                if target != var {
2930                    return Err(Error::UnboundVariable(target.clone()));
2931                }
2932                let resolved: Vec<(String, Property)> = properties
2933                    .iter()
2934                    .map(|(k, expr)| {
2935                        let v = eval_expr(expr, &sub_ctx)?;
2936                        Ok((k.clone(), value_to_property(v)?))
2937                    })
2938                    .collect::<Result<Vec<_>>>()?;
2939                node.properties.clear();
2940                for (k, p) in resolved {
2941                    node.properties.insert(k, p);
2942                }
2943                row.insert(var.to_string(), Value::Node(node.clone()));
2944            }
2945            SetAssignment::Merge {
2946                var: target,
2947                properties,
2948            } => {
2949                if target != var {
2950                    return Err(Error::UnboundVariable(target.clone()));
2951                }
2952                let resolved: Vec<(String, Property)> = properties
2953                    .iter()
2954                    .map(|(k, expr)| {
2955                        let v = eval_expr(expr, &sub_ctx)?;
2956                        Ok((k.clone(), value_to_property(v)?))
2957                    })
2958                    .collect::<Result<Vec<_>>>()?;
2959                for (k, p) in resolved {
2960                    node.properties.insert(k, p);
2961                }
2962                row.insert(var.to_string(), Value::Node(node.clone()));
2963            }
2964            SetAssignment::ReplaceFromExpr {
2965                var: target,
2966                source,
2967                replace,
2968            } => {
2969                if target != var {
2970                    return Err(Error::UnboundVariable(target.clone()));
2971                }
2972                let v = eval_expr(source, &sub_ctx)?;
2973                let props = extract_property_map(&v)?;
2974                if *replace {
2975                    node.properties.clear();
2976                }
2977                for (k, p) in props {
2978                    node.properties.insert(k, p);
2979                }
2980                row.insert(var.to_string(), Value::Node(node.clone()));
2981            }
2982        }
2983    }
2984    Ok(())
2985}
2986
2987struct EdgeExpandOp {
2988    input: Box<dyn Operator>,
2989    src_var: String,
2990    edge_var: Option<String>,
2991    dst_var: String,
2992    dst_labels: Vec<String>,
2993    edge_properties: Vec<(String, Expr)>,
2994    edge_types: Vec<String>,
2995    direction: Direction,
2996    /// When set, only the specific edge whose id matches the
2997    /// row-bound value counts as a match. Used by fresh-scan
2998    /// MATCH patterns that reuse an edge variable from a prior
2999    /// clause so the new hop stays an existence check instead of
3000    /// rebinding and clobbering the outer edge.
3001    edge_constraint_var: Option<String>,
3002    current_row: Option<Row>,
3003    pending: Vec<(EdgeId, NodeId)>,
3004    pending_idx: usize,
3005}
3006
3007impl EdgeExpandOp {
3008    #[allow(clippy::too_many_arguments)]
3009    fn new(
3010        input: Box<dyn Operator>,
3011        src_var: String,
3012        edge_var: Option<String>,
3013        dst_var: String,
3014        dst_labels: Vec<String>,
3015        edge_properties: Vec<(String, Expr)>,
3016        edge_types: Vec<String>,
3017        direction: Direction,
3018        edge_constraint_var: Option<String>,
3019    ) -> Self {
3020        Self {
3021            input,
3022            src_var,
3023            edge_var,
3024            dst_var,
3025            dst_labels,
3026            edge_properties,
3027            edge_types,
3028            direction,
3029            edge_constraint_var,
3030            current_row: None,
3031            pending: Vec::new(),
3032            pending_idx: 0,
3033        }
3034    }
3035}
3036
3037impl Operator for EdgeExpandOp {
3038    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3039        loop {
3040            while self.pending_idx < self.pending.len() {
3041                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3042                self.pending_idx += 1;
3043
3044                let edge = match ctx.store.get_edge(edge_id)? {
3045                    Some(e) => e,
3046                    None => continue,
3047                };
3048                if !self.edge_types.is_empty()
3049                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3050                {
3051                    continue;
3052                }
3053                // Pre-bound edge constraint: only accept the edge
3054                // whose id matches the row-bound value. Falls
3055                // through to outer scopes so a fresh right-side
3056                // scan of a CartesianProduct can still see the
3057                // bound edge from the left side. Non-edge / null
3058                // bindings trigger no matches at all — the
3059                // expansion yields nothing for that input row.
3060                if let Some(constraint_var) = &self.edge_constraint_var {
3061                    let base = self
3062                        .current_row
3063                        .as_ref()
3064                        .expect("pending edges without source row");
3065                    let expected = match ctx.lookup_binding(base, constraint_var) {
3066                        Some(Value::Edge(e)) => Some(e.id),
3067                        _ => None,
3068                    };
3069                    match expected {
3070                        Some(id) if id != edge.id => continue,
3071                        None => continue,
3072                        _ => {}
3073                    }
3074                }
3075                // Inline edge property filter: every (key, value)
3076                // must match the traversed edge's property of the
3077                // same name via `=` equality. Missing keys fail the
3078                // check (matching Neo4j).
3079                if !self.edge_properties.is_empty() {
3080                    let base = self
3081                        .current_row
3082                        .as_ref()
3083                        .expect("pending edges without source row");
3084                    let ectx = ctx.eval_ctx(base);
3085                    let mut ok = true;
3086                    for (key, expr) in &self.edge_properties {
3087                        let expected = eval_expr(expr, &ectx)?;
3088                        let actual = match edge.properties.get(key) {
3089                            Some(v) => Value::Property(v.clone()),
3090                            None => {
3091                                ok = false;
3092                                break;
3093                            }
3094                        };
3095                        if !values_equal(&actual, &expected) {
3096                            ok = false;
3097                            break;
3098                        }
3099                    }
3100                    if !ok {
3101                        continue;
3102                    }
3103                }
3104
3105                let neighbor = match ctx.store.get_node(neighbor_id)? {
3106                    Some(n) => n,
3107                    None => continue,
3108                };
3109                if !has_all_labels(&neighbor, &self.dst_labels) {
3110                    continue;
3111                }
3112
3113                let base = self
3114                    .current_row
3115                    .as_ref()
3116                    .expect("pending edges without source row");
3117                let mut out = base.clone();
3118                if let Some(ev) = &self.edge_var {
3119                    out.insert(ev.clone(), Value::Edge(edge));
3120                }
3121                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3122                return Ok(Some(out));
3123            }
3124
3125            match self.input.next(ctx)? {
3126                None => return Ok(None),
3127                Some(row) => {
3128                    let src_id = match row.get(&self.src_var) {
3129                        Some(Value::Node(n)) => n.id,
3130                        // A null source (e.g. from OPTIONAL MATCH
3131                        // that matched nothing) drops the input
3132                        // row — `MATCH (a)-->(b)` against a null
3133                        // `a` is just empty, not an error.
3134                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3135                            continue
3136                        }
3137                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3138                    };
3139                    self.pending = match self.direction {
3140                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3141                        Direction::Incoming => ctx.store.incoming(src_id)?,
3142                        Direction::Both => {
3143                            // For undirected traversal, a self-loop
3144                            // appears once as outgoing and once as
3145                            // incoming. The Cypher spec visits each
3146                            // physical edge once, so dedupe by
3147                            // edge_id before emitting.
3148                            let mut all = ctx.store.outgoing(src_id)?;
3149                            let mut seen: std::collections::HashSet<EdgeId> =
3150                                all.iter().map(|(e, _)| *e).collect();
3151                            for (e, n) in ctx.store.incoming(src_id)? {
3152                                if seen.insert(e) {
3153                                    all.push((e, n));
3154                                }
3155                            }
3156                            all
3157                        }
3158                    };
3159                    self.pending_idx = 0;
3160                    self.current_row = Some(row);
3161                }
3162            }
3163        }
3164    }
3165}
3166
3167/// Left-join variant of [`EdgeExpandOp`]. For each input row,
3168/// expands the adjacency in the configured direction and
3169/// filters by `edge_type` / `dst_labels` — if **any** neighbor
3170/// survives the filters, emits rows exactly like
3171/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
3172/// that carries the input row's bindings plus `edge_var` /
3173/// `dst_var` set to `Value::Null`, preserving the input row in
3174/// the output stream. This is the left-outer-join semantics
3175/// OPTIONAL MATCH needs.
3176///
3177/// Tracks per-input-row whether any output was produced so the
3178/// fallback Null row is only emitted after the pending buffer
3179/// drains without yielding anything. The `yielded_for_current`
3180/// flag is reset whenever a new input row is pulled.
3181struct OptionalEdgeExpandOp {
3182    input: Box<dyn Operator>,
3183    src_var: String,
3184    edge_var: Option<String>,
3185    dst_var: String,
3186    dst_labels: Vec<String>,
3187    dst_properties: Vec<(String, Expr)>,
3188    edge_types: Vec<String>,
3189    direction: Direction,
3190    /// When set, edges whose target id differs from the node
3191    /// bound at this variable in the current row are skipped
3192    /// inside the expansion loop. A row whose outgoing edges all
3193    /// fail the constraint then triggers the same null-fallback
3194    /// as having no edges at all.
3195    dst_constraint_var: Option<String>,
3196    /// When set, the expansion only considers the single edge
3197    /// whose id matches the edge already bound at this row
3198    /// variable — used when the OPTIONAL MATCH pattern reuses
3199    /// an edge variable from a prior clause.
3200    edge_constraint_var: Option<String>,
3201    current_row: Option<Row>,
3202    pending: Vec<(EdgeId, NodeId)>,
3203    pending_idx: usize,
3204    yielded_for_current: bool,
3205}
3206
3207impl OptionalEdgeExpandOp {
3208    #[allow(clippy::too_many_arguments)]
3209    fn new(
3210        input: Box<dyn Operator>,
3211        src_var: String,
3212        edge_var: Option<String>,
3213        dst_var: String,
3214        dst_labels: Vec<String>,
3215        dst_properties: Vec<(String, Expr)>,
3216        edge_types: Vec<String>,
3217        direction: Direction,
3218        dst_constraint_var: Option<String>,
3219        edge_constraint_var: Option<String>,
3220    ) -> Self {
3221        Self {
3222            input,
3223            src_var,
3224            edge_var,
3225            dst_var,
3226            dst_labels,
3227            dst_properties,
3228            edge_types,
3229            direction,
3230            dst_constraint_var,
3231            edge_constraint_var,
3232            current_row: None,
3233            pending: Vec::new(),
3234            pending_idx: 0,
3235            yielded_for_current: false,
3236        }
3237    }
3238}
3239
3240impl Operator for OptionalEdgeExpandOp {
3241    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3242        loop {
3243            while self.pending_idx < self.pending.len() {
3244                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3245                self.pending_idx += 1;
3246
3247                let edge = match ctx.store.get_edge(edge_id)? {
3248                    Some(e) => e,
3249                    None => continue,
3250                };
3251                if !self.edge_types.is_empty()
3252                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3253                {
3254                    continue;
3255                }
3256                // Pre-bound edge constraint: only the specific
3257                // edge whose id matches the row-bound value
3258                // counts as a match. Falls through to outer
3259                // scopes so the constraint works for fresh
3260                // scans on the right side of a CartesianProduct.
3261                if let Some(constraint_var) = &self.edge_constraint_var {
3262                    let base = self
3263                        .current_row
3264                        .as_ref()
3265                        .expect("pending without source row");
3266                    let expected = match ctx.lookup_binding(base, constraint_var) {
3267                        Some(Value::Edge(e)) => Some(e.id),
3268                        _ => None,
3269                    };
3270                    match expected {
3271                        Some(id) if id != edge.id => continue,
3272                        None => continue,
3273                        _ => {}
3274                    }
3275                }
3276
3277                let neighbor = match ctx.store.get_node(neighbor_id)? {
3278                    Some(n) => n,
3279                    None => continue,
3280                };
3281                if !has_all_labels(&neighbor, &self.dst_labels) {
3282                    continue;
3283                }
3284                // Bound-endpoint constraint: when the declared
3285                // target is already bound in the row, only edges
3286                // that lead to that exact node count as a match.
3287                // Edges failing the constraint are silently
3288                // skipped — if every candidate fails, the
3289                // per-row left-join fallback below still fires.
3290                if let Some(constraint_var) = &self.dst_constraint_var {
3291                    let base = self
3292                        .current_row
3293                        .as_ref()
3294                        .expect("pending without source row");
3295                    let bound_id = match base.get(constraint_var) {
3296                        Some(Value::Node(n)) => Some(n.id),
3297                        Some(Value::Null)
3298                        | Some(Value::Property(meshdb_core::Property::Null))
3299                        | None => None,
3300                        _ => None,
3301                    };
3302                    match bound_id {
3303                        Some(id) if id != neighbor.id => continue,
3304                        None => continue,
3305                        _ => {}
3306                    }
3307                }
3308                if !self.dst_properties.is_empty() {
3309                    let base = self
3310                        .current_row
3311                        .as_ref()
3312                        .expect("pending without source row");
3313                    let ectx = ctx.eval_ctx(base);
3314                    let mut props_ok = true;
3315                    for (key, expr) in &self.dst_properties {
3316                        let expected = eval_expr(expr, &ectx)?;
3317                        let actual = neighbor
3318                            .properties
3319                            .get(key)
3320                            .cloned()
3321                            .map(Value::Property)
3322                            .unwrap_or(Value::Null);
3323                        if !values_equal(&expected, &actual) {
3324                            props_ok = false;
3325                            break;
3326                        }
3327                    }
3328                    if !props_ok {
3329                        continue;
3330                    }
3331                }
3332
3333                let base = self
3334                    .current_row
3335                    .as_ref()
3336                    .expect("pending edges without source row");
3337                let mut out = base.clone();
3338                if let Some(ev) = &self.edge_var {
3339                    out.insert(ev.clone(), Value::Edge(edge));
3340                }
3341                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3342                self.yielded_for_current = true;
3343                return Ok(Some(out));
3344            }
3345
3346            // Pending drained for the current row. If nothing was
3347            // yielded, emit the left-join fallback: preserve the
3348            // input row with the optional variables set to Null.
3349            //
3350            // Exception: when an edge / dst constraint names a
3351            // variable that was pre-bound in the input row, the
3352            // fallback must NOT clobber that variable — the
3353            // constraint turns the expansion into an existence
3354            // check and the outer value should survive through.
3355            if let Some(base) = self.current_row.take() {
3356                if !self.yielded_for_current {
3357                    let mut out = base;
3358                    if let Some(ev) = &self.edge_var {
3359                        let preserve = self
3360                            .edge_constraint_var
3361                            .as_ref()
3362                            .map(|c| c == ev)
3363                            .unwrap_or(false);
3364                        if !preserve {
3365                            out.insert(ev.clone(), Value::Null);
3366                        }
3367                    }
3368                    let preserve_dst = self
3369                        .dst_constraint_var
3370                        .as_ref()
3371                        .map(|c| c == &self.dst_var)
3372                        .unwrap_or(false);
3373                    if !preserve_dst {
3374                        out.insert(self.dst_var.clone(), Value::Null);
3375                    }
3376                    self.yielded_for_current = true;
3377                    return Ok(Some(out));
3378                }
3379            }
3380
3381            match self.input.next(ctx)? {
3382                None => return Ok(None),
3383                Some(row) => {
3384                    let src_id = match row.get(&self.src_var) {
3385                        Some(Value::Node(n)) => n.id,
3386                        // src_var is Null (because a prior
3387                        // OPTIONAL MATCH chained before this one
3388                        // Null-bound it). Skip adjacency entirely
3389                        // and fall through to the fallback Null
3390                        // row so downstream clauses see the
3391                        // preserved input.
3392                        Some(Value::Null) => {
3393                            self.pending = Vec::new();
3394                            self.pending_idx = 0;
3395                            self.yielded_for_current = false;
3396                            self.current_row = Some(row);
3397                            continue;
3398                        }
3399                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3400                    };
3401                    self.pending = match self.direction {
3402                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3403                        Direction::Incoming => ctx.store.incoming(src_id)?,
3404                        Direction::Both => {
3405                            // For undirected traversal, a self-loop
3406                            // appears once as outgoing and once as
3407                            // incoming. The Cypher spec visits each
3408                            // physical edge once, so dedupe by
3409                            // edge_id before emitting.
3410                            let mut all = ctx.store.outgoing(src_id)?;
3411                            let mut seen: std::collections::HashSet<EdgeId> =
3412                                all.iter().map(|(e, _)| *e).collect();
3413                            for (e, n) in ctx.store.incoming(src_id)? {
3414                                if seen.insert(e) {
3415                                    all.push((e, n));
3416                                }
3417                            }
3418                            all
3419                        }
3420                    };
3421                    self.pending_idx = 0;
3422                    self.yielded_for_current = false;
3423                    self.current_row = Some(row);
3424                }
3425            }
3426        }
3427    }
3428}
3429
3430struct VarLengthExpandOp {
3431    input: Box<dyn Operator>,
3432    src_var: String,
3433    edge_var: Option<String>,
3434    dst_var: String,
3435    dst_labels: Vec<String>,
3436    edge_types: Vec<String>,
3437    /// Per-edge property filter — every edge along the walked
3438    /// path must have these `(key, value)` pairs. Mirrors the
3439    /// inline filter on `EdgeExpandOp`; applied during DFS so
3440    /// failing edges prune the branch instead of generating
3441    /// wrong-length results.
3442    edge_properties: Vec<(String, Expr)>,
3443    direction: Direction,
3444    min_hops: u64,
3445    max_hops: u64,
3446    path_var: Option<String>,
3447    /// Per-row left-join mode: when `true`, an input row that
3448    /// produces no matching paths still emits one row with the
3449    /// expansion's output vars bound to Null. Set for
3450    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
3451    /// even when the path search is empty.
3452    optional: bool,
3453    /// When set, paths whose terminal node id differs from the
3454    /// node bound at this variable in the current row are
3455    /// filtered out before counting as a match. Combined with
3456    /// `optional`, an input row whose candidate paths all miss
3457    /// the bound target triggers the null-fallback instead of
3458    /// silently dropping.
3459    dst_constraint_var: Option<String>,
3460    /// Replay mode: read the walked edge sequence from the list
3461    /// already bound at this row variable instead of doing DFS.
3462    /// Used by openCypher's "walk a pre-bound edge list" form.
3463    bound_edge_list_var: Option<String>,
3464    /// Row variables whose bound edges (or edge lists) must not
3465    /// appear in the walked path — enforces openCypher's
3466    /// relationship-uniqueness rule across hops within the same
3467    /// MATCH pattern. Each entry resolves against the current row
3468    /// (falling through to outer-scope rows) at run time; the
3469    /// union of every referenced edge id becomes the DFS
3470    /// exclusion set.
3471    excluded_edge_vars: Vec<String>,
3472    current_row: Option<Row>,
3473    pending_paths: Vec<Vec<Edge>>,
3474    pending_node_paths: Vec<Vec<NodeId>>,
3475    pending_targets: Vec<NodeId>,
3476    pending_idx: usize,
3477}
3478
3479impl VarLengthExpandOp {
3480    #[allow(clippy::too_many_arguments)]
3481    fn new(
3482        input: Box<dyn Operator>,
3483        src_var: String,
3484        edge_var: Option<String>,
3485        dst_var: String,
3486        dst_labels: Vec<String>,
3487        edge_types: Vec<String>,
3488        edge_properties: Vec<(String, Expr)>,
3489        direction: Direction,
3490        min_hops: u64,
3491        max_hops: u64,
3492        path_var: Option<String>,
3493        optional: bool,
3494        dst_constraint_var: Option<String>,
3495        bound_edge_list_var: Option<String>,
3496        excluded_edge_vars: Vec<String>,
3497    ) -> Self {
3498        Self {
3499            input,
3500            src_var,
3501            edge_var,
3502            dst_var,
3503            dst_labels,
3504            edge_types,
3505            edge_properties,
3506            direction,
3507            min_hops,
3508            max_hops,
3509            path_var,
3510            optional,
3511            dst_constraint_var,
3512            bound_edge_list_var,
3513            excluded_edge_vars,
3514            current_row: None,
3515            pending_paths: Vec::new(),
3516            pending_node_paths: Vec::new(),
3517            pending_targets: Vec::new(),
3518            pending_idx: 0,
3519        }
3520    }
3521
3522    fn enumerate(
3523        &self,
3524        ctx: &ExecCtx,
3525        start: NodeId,
3526        input_row: &Row,
3527    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3528        let mut paths: Vec<Vec<Edge>> = Vec::new();
3529        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3530        let mut targets: Vec<NodeId> = Vec::new();
3531        let mut edge_buf: Vec<Edge> = Vec::new();
3532        let mut node_buf: Vec<NodeId> = vec![start];
3533        // Seed `used` with edges from outer-scope bindings the
3534        // pattern flags as exclusions (e.g. the other hops'
3535        // edge / edge-list vars). A fresh walk can't revisit an
3536        // edge that's already bound as a relationship variable
3537        // elsewhere in the MATCH — that's openCypher's
3538        // relationship-uniqueness rule.
3539        let mut used: HashSet<EdgeId> = HashSet::new();
3540        for var in &self.excluded_edge_vars {
3541            match ctx.lookup_binding(input_row, var) {
3542                Some(Value::Edge(e)) => {
3543                    used.insert(e.id);
3544                }
3545                Some(Value::List(items)) => {
3546                    for item in items {
3547                        if let Value::Edge(e) = item {
3548                            used.insert(e.id);
3549                        }
3550                    }
3551                }
3552                _ => {}
3553            }
3554        }
3555        // Evaluate edge-property expected values once per input row
3556        // — they may reference row bindings or `$`-parameters, but
3557        // don't vary per walked edge.
3558        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3559            Vec::new()
3560        } else {
3561            let ectx = ctx.eval_ctx(input_row);
3562            self.edge_properties
3563                .iter()
3564                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3565                .collect::<Result<Vec<_>>>()?
3566        };
3567        self.dfs(
3568            ctx,
3569            start,
3570            &expected_edge_props,
3571            &mut edge_buf,
3572            &mut node_buf,
3573            &mut used,
3574            &mut paths,
3575            &mut node_paths,
3576            &mut targets,
3577        )?;
3578        Ok((paths, node_paths, targets))
3579    }
3580
3581    #[allow(clippy::too_many_arguments)]
3582    fn dfs(
3583        &self,
3584        ctx: &ExecCtx,
3585        current_node: NodeId,
3586        expected_edge_props: &[(String, Value)],
3587        edge_buf: &mut Vec<Edge>,
3588        node_buf: &mut Vec<NodeId>,
3589        used: &mut HashSet<EdgeId>,
3590        out_paths: &mut Vec<Vec<Edge>>,
3591        out_node_paths: &mut Vec<Vec<NodeId>>,
3592        out_targets: &mut Vec<NodeId>,
3593    ) -> Result<()> {
3594        let depth = edge_buf.len() as u64;
3595
3596        if depth >= self.min_hops && depth <= self.max_hops {
3597            let terminal_ok = match ctx.store.get_node(current_node)? {
3598                Some(node) => has_all_labels(&node, &self.dst_labels),
3599                None => false,
3600            };
3601            if terminal_ok {
3602                out_paths.push(edge_buf.clone());
3603                out_node_paths.push(node_buf.clone());
3604                out_targets.push(current_node);
3605            }
3606        }
3607
3608        if depth >= self.max_hops {
3609            return Ok(());
3610        }
3611
3612        let neighbors = match self.direction {
3613            Direction::Outgoing => ctx.store.outgoing(current_node)?,
3614            Direction::Incoming => ctx.store.incoming(current_node)?,
3615            Direction::Both => {
3616                let mut all = ctx.store.outgoing(current_node)?;
3617                all.extend(ctx.store.incoming(current_node)?);
3618                all
3619            }
3620        };
3621
3622        for (eid, neighbor_id) in neighbors {
3623            if used.contains(&eid) {
3624                continue;
3625            }
3626            let edge = match ctx.store.get_edge(eid)? {
3627                Some(e) => e,
3628                None => continue,
3629            };
3630            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3631            {
3632                continue;
3633            }
3634            // Inline edge property filter: skip edges whose
3635            // properties don't equal the expected values computed
3636            // per input row. A missing property fails the check,
3637            // matching `EdgeExpandOp`.
3638            if !expected_edge_props.is_empty() {
3639                let mut ok = true;
3640                for (key, expected) in expected_edge_props {
3641                    let actual = match edge.properties.get(key) {
3642                        Some(v) => Value::Property(v.clone()),
3643                        None => {
3644                            ok = false;
3645                            break;
3646                        }
3647                    };
3648                    if !values_equal(&actual, expected) {
3649                        ok = false;
3650                        break;
3651                    }
3652                }
3653                if !ok {
3654                    continue;
3655                }
3656            }
3657            used.insert(eid);
3658            edge_buf.push(edge);
3659            node_buf.push(neighbor_id);
3660            self.dfs(
3661                ctx,
3662                neighbor_id,
3663                expected_edge_props,
3664                edge_buf,
3665                node_buf,
3666                used,
3667                out_paths,
3668                out_node_paths,
3669                out_targets,
3670            )?;
3671            edge_buf.pop();
3672            node_buf.pop();
3673            used.remove(&eid);
3674        }
3675
3676        Ok(())
3677    }
3678}
3679
3680/// Replay the edge sequence stored at `list_var` in `row` as a
3681/// var-length walk starting from `src_id`. Returns `(paths,
3682/// node_paths, targets)` in the same shape `enumerate` produces
3683/// — either one entry when the list reads as a valid connected
3684/// walk in `direction`, or empty when it doesn't.
3685///
3686/// Validation per step:
3687/// * the list element is a `Value::Edge`,
3688/// * the edge's optional type filter matches `edge_types`,
3689/// * the edge actually touches the current node and proceeds to
3690///   the other endpoint in the requested direction (undirected
3691///   accepts either).
3692///
3693/// A null / missing / non-list value, a null source, or any
3694/// failed step all produce an empty result — which, when the
3695/// caller is in optional mode, triggers the left-join null
3696/// fallback.
3697fn replay_edge_list(
3698    ctx: &ExecCtx,
3699    row: &Row,
3700    list_var: &str,
3701    src_id: Option<NodeId>,
3702    direction: Direction,
3703    edge_types: &[String],
3704) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3705    let start = match src_id {
3706        Some(id) => id,
3707        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3708    };
3709    let list = match ctx.lookup_binding(row, list_var) {
3710        Some(Value::List(items)) => items.clone(),
3711        Some(Value::Property(meshdb_core::Property::List(items))) => items
3712            .iter()
3713            .cloned()
3714            .map(Value::Property)
3715            .collect::<Vec<_>>(),
3716        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3717    };
3718    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3719    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3720    node_buf.push(start);
3721    let mut current = start;
3722    for item in list {
3723        let edge = match item {
3724            Value::Edge(e) => e,
3725            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3726        };
3727        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3728            return Ok((Vec::new(), Vec::new(), Vec::new()));
3729        }
3730        let next_node = match direction {
3731            Direction::Outgoing => {
3732                if edge.source != current {
3733                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3734                }
3735                edge.target
3736            }
3737            Direction::Incoming => {
3738                if edge.target != current {
3739                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3740                }
3741                edge.source
3742            }
3743            Direction::Both => {
3744                if edge.source == current {
3745                    edge.target
3746                } else if edge.target == current {
3747                    edge.source
3748                } else {
3749                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3750                }
3751            }
3752        };
3753        // The stored edge shape should still exist in the graph;
3754        // a missing node mid-walk means the snapshot shifted and
3755        // the list is no longer valid.
3756        if ctx.store.get_node(next_node)?.is_none() {
3757            return Ok((Vec::new(), Vec::new(), Vec::new()));
3758        }
3759        edge_buf.push(edge);
3760        node_buf.push(next_node);
3761        current = next_node;
3762    }
3763    Ok((vec![edge_buf], vec![node_buf], vec![current]))
3764}
3765
3766impl Operator for VarLengthExpandOp {
3767    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3768        loop {
3769            while self.pending_idx < self.pending_paths.len() {
3770                let i = self.pending_idx;
3771                self.pending_idx += 1;
3772
3773                let target_id = self.pending_targets[i];
3774                let target = match ctx.store.get_node(target_id)? {
3775                    Some(n) => n,
3776                    None => continue,
3777                };
3778
3779                let base = self
3780                    .current_row
3781                    .as_ref()
3782                    .expect("pending without source row");
3783                let mut out = base.clone();
3784                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3785                if let Some(ev) = &self.edge_var {
3786                    let edges: Vec<Value> = self.pending_paths[i]
3787                        .iter()
3788                        .cloned()
3789                        .map(Value::Edge)
3790                        .collect();
3791                    out.insert(ev.clone(), Value::List(edges));
3792                }
3793                if let Some(pv) = &self.path_var {
3794                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3795                    for nid in &self.pending_node_paths[i] {
3796                        match ctx.store.get_node(*nid)? {
3797                            Some(n) => nodes.push(n),
3798                            None => continue,
3799                        }
3800                    }
3801                    let edges = self.pending_paths[i].clone();
3802                    out.insert(pv.clone(), Value::Path { nodes, edges });
3803                }
3804                return Ok(Some(out));
3805            }
3806
3807            match self.input.next(ctx)? {
3808                None => return Ok(None),
3809                Some(row) => {
3810                    let src_id = match row.get(&self.src_var) {
3811                        Some(Value::Node(n)) => Some(n.id),
3812                        // Null source → no paths. Same
3813                        // null-propagating semantics as
3814                        // `EdgeExpandOp`. In optional mode we
3815                        // still emit the left-join fallback;
3816                        // otherwise we just skip.
3817                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3818                            None
3819                        }
3820                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3821                    };
3822                    // Replay path: when the hop names an already-
3823                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
3824                    // `rs = [r1, r2]` from an earlier WITH), skip
3825                    // DFS entirely and verify the listed edges
3826                    // form a connected walk from `src_id` in the
3827                    // required direction. Produces at most one
3828                    // path — the exact list that was supplied.
3829                    let (mut paths, mut node_paths, mut targets) =
3830                        if let Some(list_var) = &self.bound_edge_list_var {
3831                            replay_edge_list(
3832                                ctx,
3833                                &row,
3834                                list_var,
3835                                src_id,
3836                                self.direction,
3837                                &self.edge_types,
3838                            )?
3839                        } else {
3840                            match src_id {
3841                                Some(id) => self.enumerate(ctx, id, &row)?,
3842                                None => (Vec::new(), Vec::new(), Vec::new()),
3843                            }
3844                        };
3845                    // Bound-endpoint constraint: drop paths whose
3846                    // terminal node doesn't match the node already
3847                    // bound at the constraint var. When combined
3848                    // with `optional`, an input whose paths are
3849                    // all filtered out still triggers the
3850                    // null-fallback below.
3851                    if let Some(constraint_var) = &self.dst_constraint_var {
3852                        let target_id = match row.get(constraint_var) {
3853                            Some(Value::Node(n)) => Some(n.id),
3854                            _ => None,
3855                        };
3856                        match target_id {
3857                            Some(id) => {
3858                                let mut kept_paths = Vec::new();
3859                                let mut kept_node_paths = Vec::new();
3860                                let mut kept_targets = Vec::new();
3861                                for ((p, np), t) in paths
3862                                    .drain(..)
3863                                    .zip(node_paths.drain(..))
3864                                    .zip(targets.drain(..))
3865                                {
3866                                    if t == id {
3867                                        kept_paths.push(p);
3868                                        kept_node_paths.push(np);
3869                                        kept_targets.push(t);
3870                                    }
3871                                }
3872                                paths = kept_paths;
3873                                node_paths = kept_node_paths;
3874                                targets = kept_targets;
3875                            }
3876                            None => {
3877                                paths.clear();
3878                                node_paths.clear();
3879                                targets.clear();
3880                            }
3881                        }
3882                    }
3883                    if paths.is_empty() && self.optional {
3884                        // Left-join fallback: emit one row with
3885                        // the expansion's output vars set to Null
3886                        // so the outer OPTIONAL MATCH preserves
3887                        // this input row.
3888                        let mut out = row;
3889                        if let Some(ev) = &self.edge_var {
3890                            out.insert(ev.clone(), Value::Null);
3891                        }
3892                        out.insert(self.dst_var.clone(), Value::Null);
3893                        if let Some(pv) = &self.path_var {
3894                            out.insert(pv.clone(), Value::Null);
3895                        }
3896                        return Ok(Some(out));
3897                    }
3898                    self.pending_paths = paths;
3899                    self.pending_node_paths = node_paths;
3900                    self.pending_targets = targets;
3901                    self.pending_idx = 0;
3902                    self.current_row = Some(row);
3903                }
3904            }
3905        }
3906    }
3907}
3908
3909struct FilterOp {
3910    input: Box<dyn Operator>,
3911    predicate: Expr,
3912}
3913
3914impl FilterOp {
3915    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
3916        Self { input, predicate }
3917    }
3918}
3919
3920impl Operator for FilterOp {
3921    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3922        while let Some(row) = self.input.next(ctx)? {
3923            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
3924                Ok(v) => v,
3925                // Type mismatches and non-boolean errors in filter predicates
3926                // are treated as false (row filtered out), not hard errors
3927                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
3928                Err(e) => return Err(e),
3929            };
3930            if to_bool(&v).unwrap_or(false) {
3931                return Ok(Some(row));
3932            }
3933        }
3934        Ok(None)
3935    }
3936}
3937
3938/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
3939/// row from the input unchanged.
3940struct IdentityOp {
3941    input: Box<dyn Operator>,
3942}
3943
3944impl IdentityOp {
3945    fn new(input: Box<dyn Operator>) -> Self {
3946        Self { input }
3947    }
3948}
3949
3950impl Operator for IdentityOp {
3951    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3952        self.input.next(ctx)
3953    }
3954}
3955
3956/// Passes input rows through; if the input produces zero rows,
3957/// emits exactly one row with `null_vars` bound to Value::Null.
3958/// Implements standalone OPTIONAL MATCH semantics (e.g.
3959/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
3960/// row with n=null rather than the empty result set).
3961struct CoalesceNullRowOp {
3962    input: Box<dyn Operator>,
3963    null_vars: Vec<String>,
3964    produced_any: bool,
3965    done: bool,
3966}
3967
3968impl CoalesceNullRowOp {
3969    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
3970        Self {
3971            input,
3972            null_vars,
3973            produced_any: false,
3974            done: false,
3975        }
3976    }
3977}
3978
3979impl Operator for CoalesceNullRowOp {
3980    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3981        if self.done {
3982            return Ok(None);
3983        }
3984        match self.input.next(ctx)? {
3985            Some(row) => {
3986                self.produced_any = true;
3987                Ok(Some(row))
3988            }
3989            None => {
3990                self.done = true;
3991                if self.produced_any {
3992                    Ok(None)
3993                } else {
3994                    let mut row = Row::new();
3995                    for v in &self.null_vars {
3996                        row.insert(v.clone(), Value::Null);
3997                    }
3998                    Ok(Some(row))
3999                }
4000            }
4001        }
4002    }
4003}
4004
4005struct ProjectOp {
4006    input: Box<dyn Operator>,
4007    items: Vec<ReturnItem>,
4008}
4009
4010impl ProjectOp {
4011    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4012        Self { input, items }
4013    }
4014}
4015
4016impl Operator for ProjectOp {
4017    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4018        match self.input.next(ctx)? {
4019            Some(row) => {
4020                let mut out = Row::new();
4021                for (i, item) in self.items.iter().enumerate() {
4022                    let name = item.alias.clone().unwrap_or_else(|| {
4023                        item.raw_text
4024                            .clone()
4025                            .unwrap_or_else(|| default_name(&item.expr, i))
4026                    });
4027                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4028                    out.insert(name, value);
4029                }
4030                Ok(Some(out))
4031            }
4032            None => Ok(None),
4033        }
4034    }
4035}
4036
4037fn default_name(expr: &Expr, idx: usize) -> String {
4038    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4039}
4040
4041fn render_expr_name(expr: &Expr) -> Option<String> {
4042    Some(match expr {
4043        Expr::Identifier(s) => s.clone(),
4044        Expr::Property { var, key } => format!("{var}.{key}"),
4045        Expr::PropertyAccess { base, key } => {
4046            // Match the source syntax's parenthesisation of a
4047            // bracketed base: `(list[1]).k` round-trips, while a
4048            // plain identifier base stays bare (`a.b`).
4049            if matches!(
4050                base.as_ref(),
4051                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4052            ) {
4053                format!("({}).{key}", render_expr_name(base)?)
4054            } else {
4055                format!("{}.{key}", render_expr_name(base)?)
4056            }
4057        }
4058        Expr::Parameter(name) => format!("${name}"),
4059        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4060        Expr::Literal(Literal::Integer(i)) => i.to_string(),
4061        Expr::Literal(Literal::Float(f)) => f.to_string(),
4062        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4063        Expr::Literal(Literal::Null) => "NULL".into(),
4064        Expr::Call { name, args } => {
4065            let arg_str = match args {
4066                CallArgs::Star => "*".into(),
4067                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4068                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4069                        "DISTINCT "
4070                    } else {
4071                        ""
4072                    };
4073                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4074                    if inner.len() != es.len() {
4075                        return None;
4076                    }
4077                    format!("{prefix}{}", inner.join(", "))
4078                }
4079            };
4080            format!("{name}({arg_str})")
4081        }
4082        Expr::BinaryOp { op, left, right } => {
4083            let op_str = match op {
4084                BinaryOp::Add => " + ",
4085                BinaryOp::Sub => " - ",
4086                BinaryOp::Mul => " * ",
4087                BinaryOp::Div => " / ",
4088                BinaryOp::Mod => " % ",
4089                BinaryOp::Pow => " ^ ",
4090            };
4091            format!(
4092                "{}{op_str}{}",
4093                render_expr_name(left)?,
4094                render_expr_name(right)?
4095            )
4096        }
4097        Expr::UnaryOp { op, operand } => {
4098            let op_str = match op {
4099                UnaryOp::Neg => "-",
4100            };
4101            format!("{op_str}{}", render_expr_name(operand)?)
4102        }
4103        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4104        Expr::IsNull { negated, inner } => {
4105            if *negated {
4106                format!("{} IS NOT NULL", render_expr_name(inner)?)
4107            } else {
4108                format!("{} IS NULL", render_expr_name(inner)?)
4109            }
4110        }
4111        Expr::Compare { op, left, right } => {
4112            let op_str = match op {
4113                CompareOp::Eq => " = ",
4114                CompareOp::Ne => " <> ",
4115                CompareOp::Lt => " < ",
4116                CompareOp::Le => " <= ",
4117                CompareOp::Gt => " > ",
4118                CompareOp::Ge => " >= ",
4119                CompareOp::StartsWith => " STARTS WITH ",
4120                CompareOp::EndsWith => " ENDS WITH ",
4121                CompareOp::Contains => " CONTAINS ",
4122                CompareOp::RegexMatch => " =~ ",
4123            };
4124            format!(
4125                "{}{op_str}{}",
4126                render_expr_name(left)?,
4127                render_expr_name(right)?
4128            )
4129        }
4130        Expr::List(items) => {
4131            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4132            if inner.len() != items.len() {
4133                return None;
4134            }
4135            format!("[{}]", inner.join(", "))
4136        }
4137        Expr::Map(entries) => {
4138            let inner: Vec<String> = entries
4139                .iter()
4140                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4141                .collect::<Option<Vec<_>>>()?;
4142            format!("{{{}}}", inner.join(", "))
4143        }
4144        Expr::IndexAccess { base, index } => {
4145            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4146        }
4147        Expr::InList { element, list } => {
4148            format!(
4149                "{} IN {}",
4150                render_expr_name(element)?,
4151                render_expr_name(list)?
4152            )
4153        }
4154        Expr::HasLabels { expr, labels } => {
4155            let mut s = format!("({}", render_expr_name(expr)?);
4156            for l in labels {
4157                s.push(':');
4158                s.push_str(l);
4159            }
4160            s.push(')');
4161            s
4162        }
4163        _ => return None,
4164    })
4165}
4166
4167struct DistinctOp {
4168    input: Box<dyn Operator>,
4169    seen: HashSet<String>,
4170}
4171
4172impl DistinctOp {
4173    fn new(input: Box<dyn Operator>) -> Self {
4174        Self {
4175            input,
4176            seen: HashSet::new(),
4177        }
4178    }
4179}
4180
4181impl Operator for DistinctOp {
4182    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4183        while let Some(row) = self.input.next(ctx)? {
4184            let key = row_key(&row);
4185            if self.seen.insert(key) {
4186                return Ok(Some(row));
4187            }
4188        }
4189        Ok(None)
4190    }
4191}
4192
4193/// Assemble a `Value::Path` from a row's already-bound node and
4194/// edge variables. Emitted by `plan_pattern` when the source
4195/// pattern carries `path_var`. The operator pulls `node_vars[i]`
4196/// and `edge_vars[i]` out of every row, walks them in order, and
4197/// inserts the result into the row under `path_var` before
4198/// forwarding downstream.
4199///
4200/// Rows where any referenced variable is missing or not the
4201/// expected Node/Edge shape get a `Value::Null` at `path_var` —
4202/// matching how `OPTIONAL MATCH` flows null bindings through the
4203/// downstream projection without hard-erroring. Missing bindings
4204/// shouldn't normally happen (the planner fills in synthetic
4205/// names via `ensure_path_bindings`), but the null fallback
4206/// means an unexpected upstream shape degrades gracefully
4207/// instead of crashing the whole query.
4208struct BindPathOp {
4209    input: Box<dyn Operator>,
4210    path_var: String,
4211    node_vars: Vec<String>,
4212    edge_vars: Vec<String>,
4213}
4214
4215impl BindPathOp {
4216    fn new(
4217        input: Box<dyn Operator>,
4218        path_var: String,
4219        node_vars: Vec<String>,
4220        edge_vars: Vec<String>,
4221    ) -> Self {
4222        Self {
4223            input,
4224            path_var,
4225            node_vars,
4226            edge_vars,
4227        }
4228    }
4229}
4230
4231impl Operator for BindPathOp {
4232    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4233        let Some(mut row) = self.input.next(ctx)? else {
4234            return Ok(None);
4235        };
4236        // Extract the ordered node + edge sequence from the row.
4237        // Any missing or wrong-shaped entry collapses the whole
4238        // path binding to null and falls through.
4239        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4240        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4241        let mut abort = false;
4242        // Interleave node/edge vars. For each hop i:
4243        //   node_vars[i] = start/intermediate node
4244        //   edge_vars[i] = edge (or sub-path for var-length)
4245        //   node_vars[i+1] = target node
4246        // For var-length hops, edge_vars[i] may contain a
4247        // Value::Path — splice its interior into the running path.
4248        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4249            nodes.push(n.clone());
4250        } else {
4251            abort = true;
4252        }
4253        if !abort {
4254            for (i, ev) in self.edge_vars.iter().enumerate() {
4255                match row.get(ev) {
4256                    Some(Value::Edge(e)) => {
4257                        edges.push(e.clone());
4258                        match row.get(&self.node_vars[i + 1]) {
4259                            Some(Value::Node(n)) => nodes.push(n.clone()),
4260                            _ => {
4261                                abort = true;
4262                                break;
4263                            }
4264                        }
4265                    }
4266                    Some(Value::Path {
4267                        nodes: sub_nodes,
4268                        edges: sub_edges,
4269                    }) => {
4270                        // Splice the sub-path. The sub-path's first
4271                        // node is the same as nodes.last() (already
4272                        // pushed), so skip it. All sub-edges go in.
4273                        // Sub-path interior nodes go in. The sub-path's
4274                        // last node is the target for this hop.
4275                        edges.extend(sub_edges.iter().cloned());
4276                        if sub_nodes.len() > 1 {
4277                            nodes.extend(sub_nodes[1..].iter().cloned());
4278                        }
4279                    }
4280                    _ => {
4281                        abort = true;
4282                        break;
4283                    }
4284                }
4285            }
4286        }
4287        if abort {
4288            row.insert(self.path_var.clone(), Value::Null);
4289        } else {
4290            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4291        }
4292        Ok(Some(row))
4293    }
4294}
4295
4296/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
4297/// row (which must already bind both `src_var` and `dst_var`
4298/// to `Value::Node`), runs a breadth-first search from the
4299/// source node toward the target, filtering edges by
4300/// `edge_type` and walking up to `max_hops` steps. Emits one
4301/// row per successful search with `path_var` set to a
4302/// `Value::Path` carrying the traversed node/edge sequence;
4303/// rows where BFS finds no path are dropped entirely (matching
4304/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
4305///
4306/// The BFS uses classic parent-pointer reconstruction: each
4307/// visited node's `(parent_node_id, edge_id)` pair is stored
4308/// in a hashmap, and once the target is reached we walk back
4309/// to the source, reversing the accumulated edges to produce
4310/// a forward-order path. Cycle detection is a side-effect of
4311/// the visited set — the first time BFS sees a node is
4312/// necessarily via a shortest path, so later visits are
4313/// ignored.
4314struct ShortestPathOp {
4315    input: Box<dyn Operator>,
4316    src_var: String,
4317    dst_var: String,
4318    path_var: String,
4319    edge_types: Vec<String>,
4320    direction: meshdb_cypher::Direction,
4321    max_hops: u64,
4322    kind: meshdb_cypher::ShortestKind,
4323    /// Buffered `(base_row, path)` pairs from the current
4324    /// input row, used only by `AllShortest` mode where a
4325    /// single input can expand into multiple shortest paths
4326    /// of the same length. Each call to `next` drains one
4327    /// entry before pulling a fresh input row; in `Shortest`
4328    /// mode the buffer is always empty or has one element.
4329    pending: std::collections::VecDeque<(Row, Value)>,
4330}
4331
4332impl ShortestPathOp {
4333    #[allow(clippy::too_many_arguments)]
4334    fn new(
4335        input: Box<dyn Operator>,
4336        src_var: String,
4337        dst_var: String,
4338        path_var: String,
4339        edge_types: Vec<String>,
4340        direction: meshdb_cypher::Direction,
4341        max_hops: u64,
4342        kind: meshdb_cypher::ShortestKind,
4343    ) -> Self {
4344        Self {
4345            input,
4346            src_var,
4347            dst_var,
4348            path_var,
4349            edge_types,
4350            direction,
4351            max_hops,
4352            kind,
4353            pending: std::collections::VecDeque::new(),
4354        }
4355    }
4356}
4357
4358impl Operator for ShortestPathOp {
4359    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4360        loop {
4361            // Drain buffered paths from a prior input row
4362            // before pulling the next one. Only reachable in
4363            // `AllShortest` mode; `Shortest` mode's buffer
4364            // never accumulates more than one entry.
4365            if let Some((mut row, path)) = self.pending.pop_front() {
4366                row.insert(self.path_var.clone(), path);
4367                return Ok(Some(row));
4368            }
4369            let Some(row) = self.input.next(ctx)? else {
4370                return Ok(None);
4371            };
4372            let src = match row.get(&self.src_var) {
4373                Some(Value::Node(n)) => n.clone(),
4374                _ => continue,
4375            };
4376            let dst = match row.get(&self.dst_var) {
4377                Some(Value::Node(n)) => n.clone(),
4378                _ => continue,
4379            };
4380            let paths = bfs_shortest_paths(
4381                &src,
4382                &dst,
4383                &self.edge_types,
4384                self.direction,
4385                self.max_hops,
4386                self.kind,
4387                ctx.store,
4388            )?;
4389            if paths.is_empty() {
4390                // No path of length ≤ max_hops — drop the row.
4391                continue;
4392            }
4393            for path in paths {
4394                self.pending.push_back((row.clone(), path));
4395            }
4396        }
4397    }
4398}
4399
4400/// Layered breadth-first search from `src` toward `dst`,
4401/// constrained by edge type, direction, and max hop count.
4402/// Returns every path of minimum length when `kind` is
4403/// `AllShortest`, or just the first discovered shortest path
4404/// when `kind` is `Shortest`. An empty vector means no path
4405/// of length ≤ `max_hops` exists.
4406///
4407/// `src == dst` is a zero-hop special case that always
4408/// returns a singleton path regardless of `max_hops`.
4409///
4410/// The algorithm builds a parent DAG as it expands each
4411/// level: every node that was first reached at depth `d+1`
4412/// records every `(parent, edge)` pair from frontier[d] that
4413/// reaches it, not just the first one. This lets the
4414/// reconstruction walk enumerate all source-to-target paths
4415/// of the discovered shortest length. For `Shortest` mode
4416/// the reconstruction short-circuits on the first complete
4417/// path.
4418fn bfs_shortest_paths(
4419    src: &Node,
4420    dst: &Node,
4421    edge_types: &[String],
4422    direction: meshdb_cypher::Direction,
4423    max_hops: u64,
4424    kind: meshdb_cypher::ShortestKind,
4425    reader: &dyn crate::reader::GraphReader,
4426) -> Result<Vec<Value>> {
4427    use meshdb_cypher::Direction;
4428
4429    if src.id == dst.id {
4430        return Ok(vec![Value::Path {
4431            nodes: vec![src.clone()],
4432            edges: vec![],
4433        }]);
4434    }
4435
4436    // `dist` tracks the shortest distance from `src` to each
4437    // reached node. `parents` maps each reached node id to
4438    // every `(parent_id, edge_id)` pair that reached it at
4439    // that shortest distance — a node may have multiple
4440    // parents in the parent DAG for `AllShortest`.
4441    let mut dist: HashMap<NodeId, u64> = HashMap::new();
4442    dist.insert(src.id, 0);
4443    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4444
4445    let mut frontier: Vec<NodeId> = vec![src.id];
4446    let mut depth: u64 = 0;
4447    let mut found = false;
4448
4449    while !frontier.is_empty() && depth < max_hops && !found {
4450        let mut next_frontier: Vec<NodeId> = Vec::new();
4451        for node_id in &frontier {
4452            let neighbors = match direction {
4453                Direction::Outgoing => reader.outgoing(*node_id)?,
4454                Direction::Incoming => reader.incoming(*node_id)?,
4455                Direction::Both => {
4456                    let mut out = reader.outgoing(*node_id)?;
4457                    out.extend(reader.incoming(*node_id)?);
4458                    out
4459                }
4460            };
4461            for (edge_id, neighbor_id) in neighbors {
4462                // Edge-type filter. Only fetch the edge record
4463                // when a type constraint is present.
4464                if !edge_types.is_empty() {
4465                    let edge = match reader.get_edge(edge_id)? {
4466                        Some(e) => e,
4467                        None => continue,
4468                    };
4469                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
4470                        continue;
4471                    }
4472                }
4473                match dist.get(&neighbor_id) {
4474                    Some(&d) if d == depth + 1 => {
4475                        // Alternate parent at the same
4476                        // shortest-path level — record the
4477                        // additional edge so the reconstruction
4478                        // can enumerate it, but don't re-add to
4479                        // the next frontier.
4480                        parents
4481                            .entry(neighbor_id)
4482                            .or_default()
4483                            .push((*node_id, edge_id));
4484                    }
4485                    Some(_) => {
4486                        // Already reached at a strictly shorter
4487                        // depth; this edge isn't on a shortest
4488                        // path.
4489                    }
4490                    None => {
4491                        dist.insert(neighbor_id, depth + 1);
4492                        parents
4493                            .entry(neighbor_id)
4494                            .or_default()
4495                            .push((*node_id, edge_id));
4496                        if neighbor_id == dst.id {
4497                            found = true;
4498                        } else {
4499                            next_frontier.push(neighbor_id);
4500                        }
4501                    }
4502                }
4503            }
4504        }
4505        depth += 1;
4506        if !found {
4507            frontier = next_frontier;
4508        }
4509    }
4510
4511    if !found {
4512        return Ok(Vec::new());
4513    }
4514
4515    // Enumerate all shortest paths from the parent DAG.
4516    // `Shortest` mode short-circuits after the first complete
4517    // walk; `AllShortest` collects every distinct path.
4518    let mut out: Vec<Value> = Vec::new();
4519    let mut nodes_rev: Vec<Node> = Vec::new();
4520    let mut edges_rev: Vec<Edge> = Vec::new();
4521    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4522    collect_shortest_paths(
4523        src,
4524        dst,
4525        &parents,
4526        reader,
4527        &mut nodes_rev,
4528        &mut edges_rev,
4529        &mut out,
4530        only_first,
4531    )?;
4532    Ok(out)
4533}
4534
4535/// Depth-first walk through the parent DAG built by
4536/// `bfs_shortest_paths`, collecting every source-to-target
4537/// path of the BFS-determined shortest length. The recursion
4538/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
4539/// representing the partially-reconstructed path in reverse
4540/// order; on reaching `src` we copy the reversed accumulators
4541/// into a forward-order `Value::Path` and push it into `out`.
4542///
4543/// `only_first = true` short-circuits after the first
4544/// complete path, giving `Shortest` mode the optimal-case
4545/// early exit.
4546#[allow(clippy::too_many_arguments)]
4547fn collect_shortest_paths(
4548    src: &Node,
4549    current: &Node,
4550    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4551    reader: &dyn crate::reader::GraphReader,
4552    nodes_rev: &mut Vec<Node>,
4553    edges_rev: &mut Vec<Edge>,
4554    out: &mut Vec<Value>,
4555    only_first: bool,
4556) -> Result<()> {
4557    if current.id == src.id {
4558        // Complete walk: `nodes_rev` holds dst, ..., (last
4559        // node before src) in reverse; prepend src and
4560        // reverse to produce the forward-order node list.
4561        // Edges are similarly reversed.
4562        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4563        nodes.push(src.clone());
4564        nodes.extend(nodes_rev.iter().rev().cloned());
4565        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4566        out.push(Value::Path { nodes, edges });
4567        return Ok(());
4568    }
4569    let Some(parent_edges) = parents.get(&current.id) else {
4570        // Unreachable in normal flow — BFS only inserts dst
4571        // into `parents` when it found at least one incoming
4572        // edge — but handled defensively.
4573        return Ok(());
4574    };
4575    for (parent_id, edge_id) in parent_edges {
4576        if only_first && !out.is_empty() {
4577            return Ok(());
4578        }
4579        let edge = reader
4580            .get_edge(*edge_id)?
4581            .expect("BFS inserted this edge id; it must still exist");
4582        let parent_node = reader
4583            .get_node(*parent_id)?
4584            .expect("BFS visited this node id; it must still exist");
4585        nodes_rev.push(current.clone());
4586        edges_rev.push(edge);
4587        collect_shortest_paths(
4588            src,
4589            &parent_node,
4590            parents,
4591            reader,
4592            nodes_rev,
4593            edges_rev,
4594            out,
4595            only_first,
4596        )?;
4597        nodes_rev.pop();
4598        edges_rev.pop();
4599    }
4600    Ok(())
4601}
4602
4603/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
4604/// its rows through. For plain `UNION` (`all = false`) we
4605/// deduplicate across the combined stream using the same
4606/// `row_key` the `DistinctOp` uses, so the semantics match
4607/// Neo4j's "set union" shape regardless of whether duplicates
4608/// sit inside a single branch or straddle branches. For
4609/// `UNION ALL` the `seen` set stays `None` and every produced
4610/// row is forwarded as-is.
4611struct UnionOp {
4612    branches: Vec<Box<dyn Operator>>,
4613    current: usize,
4614    seen: Option<HashSet<String>>,
4615}
4616
4617impl UnionOp {
4618    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4619        Self {
4620            branches,
4621            current: 0,
4622            seen: if all { None } else { Some(HashSet::new()) },
4623        }
4624    }
4625}
4626
4627impl Operator for UnionOp {
4628    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4629        while self.current < self.branches.len() {
4630            match self.branches[self.current].next(ctx)? {
4631                Some(row) => {
4632                    if let Some(seen) = self.seen.as_mut() {
4633                        let key = row_key(&row);
4634                        if !seen.insert(key) {
4635                            continue;
4636                        }
4637                    }
4638                    return Ok(Some(row));
4639                }
4640                None => {
4641                    self.current += 1;
4642                }
4643            }
4644        }
4645        Ok(None)
4646    }
4647}
4648
4649struct OrderByOp {
4650    input: Box<dyn Operator>,
4651    sort_items: Vec<SortItem>,
4652    sorted: Option<Vec<Row>>,
4653    cursor: usize,
4654}
4655
4656impl OrderByOp {
4657    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4658        Self {
4659            input,
4660            sort_items,
4661            sorted: None,
4662            cursor: 0,
4663        }
4664    }
4665}
4666
4667impl Operator for OrderByOp {
4668    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4669        if self.sorted.is_none() {
4670            let mut rows: Vec<Row> = Vec::new();
4671            while let Some(row) = self.input.next(ctx)? {
4672                rows.push(row);
4673            }
4674            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4675            for row in rows {
4676                let mut keys = Vec::with_capacity(self.sort_items.len());
4677                for item in &self.sort_items {
4678                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4679                }
4680                keyed.push((keys, row));
4681            }
4682            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4683            keyed.sort_by(|a, b| {
4684                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4685                    let ord = compare_values(va, vb);
4686                    let ord = if descs[i] { ord.reverse() } else { ord };
4687                    if ord != Ordering::Equal {
4688                        return ord;
4689                    }
4690                }
4691                Ordering::Equal
4692            });
4693            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4694        }
4695        let rows = self.sorted.as_ref().unwrap();
4696        if self.cursor < rows.len() {
4697            let row = rows[self.cursor].clone();
4698            self.cursor += 1;
4699            Ok(Some(row))
4700        } else {
4701            Ok(None)
4702        }
4703    }
4704}
4705
4706struct AggregateOp {
4707    input: Box<dyn Operator>,
4708    group_keys: Vec<ReturnItem>,
4709    aggregates: Vec<AggregateSpec>,
4710    results: Option<Vec<Row>>,
4711    cursor: usize,
4712}
4713
4714impl AggregateOp {
4715    fn new(
4716        input: Box<dyn Operator>,
4717        group_keys: Vec<ReturnItem>,
4718        aggregates: Vec<AggregateSpec>,
4719    ) -> Self {
4720        Self {
4721            input,
4722            group_keys,
4723            aggregates,
4724            results: None,
4725            cursor: 0,
4726        }
4727    }
4728
4729    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4730        let mut groups: HashMap<String, GroupState> = HashMap::new();
4731        let mut order: Vec<String> = Vec::new();
4732
4733        // If there are no input rows AND no group keys, we still emit one row
4734        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
4735        let mut saw_any = false;
4736
4737        while let Some(row) = self.input.next(ctx)? {
4738            saw_any = true;
4739            let mut key_values = Vec::with_capacity(self.group_keys.len());
4740            for item in &self.group_keys {
4741                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4742            }
4743            let mut hash_key = String::new();
4744            for v in &key_values {
4745                hash_key.push_str(&value_key(v));
4746                hash_key.push('|');
4747            }
4748            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4749                order.push(hash_key.clone());
4750                GroupState {
4751                    key_values: key_values.clone(),
4752                    agg_states: self
4753                        .aggregates
4754                        .iter()
4755                        .map(|a| AggState::initial(a.function))
4756                        .collect(),
4757                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4758                }
4759            });
4760            for (i, spec) in self.aggregates.iter().enumerate() {
4761                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4762                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4763                    if matches!(v, Value::Null) {
4764                        continue;
4765                    }
4766                    let key = value_key(&v);
4767                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4768                    if !seen.insert(key) {
4769                        continue;
4770                    }
4771                }
4772                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4773                // The percentile is a constant expression — evaluate
4774                // it once against the first row we see and stash it
4775                // in the state so finalize() has a number to use.
4776                if let Some(extra_expr) = &spec.extra_arg {
4777                    let need_resolve = matches!(
4778                        &entry.agg_states[i],
4779                        AggState::PercentileDisc {
4780                            percentile: None,
4781                            ..
4782                        } | AggState::PercentileCont {
4783                            percentile: None,
4784                            ..
4785                        }
4786                    );
4787                    if need_resolve {
4788                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4789                        let p = match pv {
4790                            Value::Property(Property::Float64(f)) => f,
4791                            Value::Property(Property::Int64(i)) => i as f64,
4792                            _ => 0.0,
4793                        };
4794                        // openCypher requires the percentile to
4795                        // lie in [0.0, 1.0]; anything else is an
4796                        // `ArgumentError: NumberOutOfRange`.
4797                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
4798                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
4799                        }
4800                        match &mut entry.agg_states[i] {
4801                            AggState::PercentileDisc { percentile, .. }
4802                            | AggState::PercentileCont { percentile, .. } => {
4803                                *percentile = Some(p);
4804                            }
4805                            _ => {}
4806                        }
4807                    }
4808                }
4809            }
4810        }
4811
4812        let mut out = Vec::new();
4813        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4814            // Empty group, single aggregate row
4815            let mut row = Row::new();
4816            for spec in &self.aggregates {
4817                row.insert(
4818                    spec.alias.clone(),
4819                    AggState::initial(spec.function).finalize(),
4820                );
4821            }
4822            out.push(row);
4823        } else {
4824            for key in order {
4825                let state = groups.remove(&key).unwrap();
4826                let mut row = Row::new();
4827                for (i, item) in self.group_keys.iter().enumerate() {
4828                    let name = item
4829                        .alias
4830                        .clone()
4831                        .unwrap_or_else(|| default_name(&item.expr, i));
4832                    row.insert(name, state.key_values[i].clone());
4833                }
4834                for (i, spec) in self.aggregates.iter().enumerate() {
4835                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4836                }
4837                out.push(row);
4838            }
4839        }
4840        self.results = Some(out);
4841        Ok(())
4842    }
4843}
4844
4845impl Operator for AggregateOp {
4846    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4847        if self.results.is_none() {
4848            self.compute(ctx)?;
4849        }
4850        let rows = self.results.as_ref().unwrap();
4851        if self.cursor < rows.len() {
4852            let row = rows[self.cursor].clone();
4853            self.cursor += 1;
4854            Ok(Some(row))
4855        } else {
4856            Ok(None)
4857        }
4858    }
4859}
4860
4861struct GroupState {
4862    key_values: Vec<Value>,
4863    agg_states: Vec<AggState>,
4864    distinct_seen: Vec<Option<HashSet<String>>>,
4865}
4866
4867enum AggState {
4868    Count(i64),
4869    Sum {
4870        int_part: i64,
4871        float_part: f64,
4872        is_float: bool,
4873    },
4874    Avg {
4875        total: f64,
4876        count: i64,
4877    },
4878    Min(Option<Value>),
4879    Max(Option<Value>),
4880    Collect(Vec<Value>),
4881    StDev {
4882        sum: f64,
4883        sum_sq: f64,
4884        count: i64,
4885    },
4886    StDevP {
4887        sum: f64,
4888        sum_sq: f64,
4889        count: i64,
4890    },
4891    PercentileDisc {
4892        items: Vec<Value>,
4893        percentile: Option<f64>,
4894    },
4895    PercentileCont {
4896        items: Vec<Value>,
4897        percentile: Option<f64>,
4898    },
4899}
4900
4901impl AggState {
4902    fn initial(func: AggregateFn) -> Self {
4903        match func {
4904            AggregateFn::Count => AggState::Count(0),
4905            AggregateFn::Sum => AggState::Sum {
4906                int_part: 0,
4907                float_part: 0.0,
4908                is_float: false,
4909            },
4910            AggregateFn::Avg => AggState::Avg {
4911                total: 0.0,
4912                count: 0,
4913            },
4914            AggregateFn::Min => AggState::Min(None),
4915            AggregateFn::Max => AggState::Max(None),
4916            AggregateFn::Collect => AggState::Collect(Vec::new()),
4917            AggregateFn::StDev => AggState::StDev {
4918                sum: 0.0,
4919                sum_sq: 0.0,
4920                count: 0,
4921            },
4922            AggregateFn::StDevP => AggState::StDevP {
4923                sum: 0.0,
4924                sum_sq: 0.0,
4925                count: 0,
4926            },
4927            AggregateFn::PercentileDisc => AggState::PercentileDisc {
4928                items: Vec::new(),
4929                percentile: None,
4930            },
4931            AggregateFn::PercentileCont => AggState::PercentileCont {
4932                items: Vec::new(),
4933                percentile: None,
4934            },
4935        }
4936    }
4937
4938    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
4939        match self {
4940            AggState::Count(c) => match arg {
4941                AggregateArg::Star => *c += 1,
4942                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
4943                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
4944                        *c += 1;
4945                    }
4946                }
4947            },
4948            AggState::Sum {
4949                int_part,
4950                float_part,
4951                is_float,
4952            } => {
4953                let v = expr_arg_value(arg, ctx)?;
4954                match v {
4955                    Value::Null => {}
4956                    Value::Property(Property::Int64(i)) => *int_part += i,
4957                    Value::Property(Property::Float64(f)) => {
4958                        *float_part += f;
4959                        *is_float = true;
4960                    }
4961                    _ => return Err(Error::AggregateTypeError),
4962                }
4963            }
4964            AggState::Avg { total, count } => {
4965                let v = expr_arg_value(arg, ctx)?;
4966                match v {
4967                    Value::Null => {}
4968                    Value::Property(Property::Int64(i)) => {
4969                        *total += i as f64;
4970                        *count += 1;
4971                    }
4972                    Value::Property(Property::Float64(f)) => {
4973                        *total += f;
4974                        *count += 1;
4975                    }
4976                    _ => return Err(Error::AggregateTypeError),
4977                }
4978            }
4979            AggState::Min(slot) => {
4980                // `min` / `max` ignore null inputs and operate
4981                // over any `Value` (including lists, nodes etc.),
4982                // not just scalar Properties. The ordering is
4983                // `compare_values`: within a single type the
4984                // natural order, across types the
4985                // `type_order_value` rank.
4986                let v = expr_arg_value(arg, ctx)?;
4987                if matches!(v, Value::Null | Value::Property(Property::Null)) {
4988                    // skip
4989                } else {
4990                    match slot {
4991                        None => *slot = Some(v),
4992                        Some(cur) => {
4993                            if compare_values(&v, cur) == Ordering::Less {
4994                                *cur = v;
4995                            }
4996                        }
4997                    }
4998                }
4999            }
5000            AggState::Max(slot) => {
5001                let v = expr_arg_value(arg, ctx)?;
5002                if matches!(v, Value::Null | Value::Property(Property::Null)) {
5003                    // skip
5004                } else {
5005                    match slot {
5006                        None => *slot = Some(v),
5007                        Some(cur) => {
5008                            if compare_values(&v, cur) == Ordering::Greater {
5009                                *cur = v;
5010                            }
5011                        }
5012                    }
5013                }
5014            }
5015            AggState::Collect(items) => {
5016                let v = expr_arg_value(arg, ctx)?;
5017                if !matches!(v, Value::Null) {
5018                    items.push(v);
5019                }
5020            }
5021            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5022                let v = expr_arg_value(arg, ctx)?;
5023                if !matches!(v, Value::Null) {
5024                    items.push(v);
5025                }
5026            }
5027            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5028                let v = expr_arg_value(arg, ctx)?;
5029                match v {
5030                    Value::Null => {}
5031                    Value::Property(Property::Int64(i)) => {
5032                        let f = i as f64;
5033                        *sum += f;
5034                        *sum_sq += f * f;
5035                        *count += 1;
5036                    }
5037                    Value::Property(Property::Float64(f)) => {
5038                        *sum += f;
5039                        *sum_sq += f * f;
5040                        *count += 1;
5041                    }
5042                    _ => return Err(Error::AggregateTypeError),
5043                }
5044            }
5045        }
5046        Ok(())
5047    }
5048
5049    fn finalize(&self) -> Value {
5050        match self {
5051            AggState::Count(c) => Value::Property(Property::Int64(*c)),
5052            AggState::Sum {
5053                int_part,
5054                float_part,
5055                is_float,
5056            } => {
5057                if *is_float {
5058                    Value::Property(Property::Float64(*float_part + *int_part as f64))
5059                } else {
5060                    Value::Property(Property::Int64(*int_part))
5061                }
5062            }
5063            AggState::Avg { total, count } => {
5064                if *count == 0 {
5065                    Value::Null
5066                } else {
5067                    Value::Property(Property::Float64(*total / *count as f64))
5068                }
5069            }
5070            AggState::Min(slot) | AggState::Max(slot) => match slot {
5071                Some(v) => v.clone(),
5072                None => Value::Null,
5073            },
5074            AggState::Collect(items) => Value::List(items.clone()),
5075            AggState::StDevP { sum, sum_sq, count } => {
5076                if *count == 0 {
5077                    Value::Property(Property::Float64(0.0))
5078                } else {
5079                    let n = *count as f64;
5080                    let variance = *sum_sq / n - (*sum / n).powi(2);
5081                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5082                }
5083            }
5084            AggState::StDev { sum, sum_sq, count } => {
5085                if *count < 2 {
5086                    Value::Property(Property::Float64(0.0))
5087                } else {
5088                    let n = *count as f64;
5089                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5090                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5091                }
5092            }
5093            AggState::PercentileDisc { items, percentile } => {
5094                percentile_disc(items, percentile.unwrap_or(0.0))
5095            }
5096            AggState::PercentileCont { items, percentile } => {
5097                percentile_cont(items, percentile.unwrap_or(0.0))
5098            }
5099        }
5100    }
5101}
5102
5103fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5104    match arg {
5105        AggregateArg::Star => Err(Error::AggregateTypeError),
5106        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5107    }
5108}
5109
5110/// Coerce a collected aggregate value into an f64 for percentile
5111/// math. Unhandled types fall back to NaN; the caller sorts NaN
5112/// out of the stream before computing the percentile.
5113fn value_to_f64(v: &Value) -> f64 {
5114    match v {
5115        Value::Property(Property::Int64(i)) => *i as f64,
5116        Value::Property(Property::Float64(f)) => *f,
5117        _ => f64::NAN,
5118    }
5119}
5120
5121/// `percentileDisc(expr, p)` — discrete percentile. Returns the
5122/// smallest value at or above the `p`-ranked position. Numbers only;
5123/// non-numeric values get sorted to the end and are effectively
5124/// ignored unless the percentile lands on one.
5125fn percentile_disc(items: &[Value], p: f64) -> Value {
5126    let mut nums: Vec<(f64, Value)> = items
5127        .iter()
5128        .map(|v| (value_to_f64(v), v.clone()))
5129        .filter(|(f, _)| !f.is_nan())
5130        .collect();
5131    if nums.is_empty() {
5132        return Value::Null;
5133    }
5134    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5135    let p = p.clamp(0.0, 1.0);
5136    let n = nums.len();
5137    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
5138    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5139    nums[idx.min(n - 1)].1.clone()
5140}
5141
5142/// `percentileCont(expr, p)` — continuous percentile. Linearly
5143/// interpolates between the two ranks that bracket the fractional
5144/// position `p * (n - 1)`. Returns a Float64.
5145fn percentile_cont(items: &[Value], p: f64) -> Value {
5146    let mut nums: Vec<f64> = items
5147        .iter()
5148        .map(value_to_f64)
5149        .filter(|f| !f.is_nan())
5150        .collect();
5151    if nums.is_empty() {
5152        return Value::Null;
5153    }
5154    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5155    let p = p.clamp(0.0, 1.0);
5156    let n = nums.len();
5157    if n == 1 {
5158        return Value::Property(Property::Float64(nums[0]));
5159    }
5160    let pos = p * (n as f64 - 1.0);
5161    let lo = pos.floor() as usize;
5162    let hi = pos.ceil() as usize;
5163    let frac = pos - lo as f64;
5164    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5165    Value::Property(Property::Float64(v))
5166}
5167
5168struct SkipOp {
5169    input: Box<dyn Operator>,
5170    count_expr: Expr,
5171    remaining: Option<i64>,
5172}
5173
5174impl SkipOp {
5175    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5176        Self {
5177            input,
5178            count_expr,
5179            remaining: None,
5180        }
5181    }
5182}
5183
5184impl Operator for SkipOp {
5185    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5186        if self.remaining.is_none() {
5187            let empty = Row::new();
5188            let ectx = ctx.eval_ctx(&empty);
5189            let val = eval_expr(&self.count_expr, &ectx)?;
5190            self.remaining = Some(expr_to_count(val)?);
5191        }
5192        let rem = self.remaining.as_mut().unwrap();
5193        while *rem > 0 {
5194            if self.input.next(ctx)?.is_none() {
5195                return Ok(None);
5196            }
5197            *rem -= 1;
5198        }
5199        self.input.next(ctx)
5200    }
5201}
5202
5203struct LimitOp {
5204    input: Box<dyn Operator>,
5205    count_expr: Expr,
5206    remaining: Option<i64>,
5207}
5208
5209impl LimitOp {
5210    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5211        Self {
5212            input,
5213            count_expr,
5214            remaining: None,
5215        }
5216    }
5217}
5218
5219impl Operator for LimitOp {
5220    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5221        if self.remaining.is_none() {
5222            let empty = Row::new();
5223            let ectx = ctx.eval_ctx(&empty);
5224            let val = eval_expr(&self.count_expr, &ectx)?;
5225            self.remaining = Some(expr_to_count(val)?);
5226        }
5227        let rem = self.remaining.as_mut().unwrap();
5228        if *rem <= 0 {
5229            return Ok(None);
5230        }
5231        match self.input.next(ctx)? {
5232            Some(row) => {
5233                *rem -= 1;
5234                Ok(Some(row))
5235            }
5236            None => Ok(None),
5237        }
5238    }
5239}
5240
5241fn expr_to_count(val: Value) -> Result<i64> {
5242    match val {
5243        Value::Null | Value::Property(Property::Null) => Ok(0),
5244        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5245        // openCypher: SKIP/LIMIT require an integer. Float values
5246        // (including integer-valued floats) are rejected as
5247        // InvalidArgumentType — they'd only be valid after an
5248        // explicit cast, which the user must write themselves.
5249        _ => Err(Error::TypeMismatch),
5250    }
5251}