Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12    ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13    Literal, LogicalPlan, PropertyType as CypherPropertyType, RemoveSpec, ReturnItem,
14    SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17    ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18    PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24/// Shared tombstone set for nodes / edges that have been DELETEd
25/// earlier in the current query. Property / label / type / keys /
26/// id accesses consult this set and raise `DeletedEntityAccess`
27/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
28/// runtime instead of reading off a stale clone.
29#[derive(Default)]
30pub struct Tombstones {
31    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36    pub store: &'a dyn GraphReader,
37    pub writer: &'a dyn GraphWriter,
38    /// Per-query parameter bindings. Empty for the single-node and Raft
39    /// entry points that don't carry a Bolt RUN; populated with the
40    /// driver-supplied params on the Bolt path. Threaded into every
41    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
42    pub params: &'a ParamMap,
43    /// Procedures known to this execution. Defaults to an empty
44    /// registry on call sites that don't care; the TCK harness and
45    /// any future server startup plug in a populated registry so
46    /// `CALL ns.name(...)` resolves.
47    pub procedures: &'a ProcedureRegistry,
48    /// Outer-scope rows contributed by enclosing operators. The
49    /// innermost slot (first entry) is the nearest outer. Set by
50    /// [`CartesianProductOp`] when running its right side so that
51    /// operators inside — typically `EdgeExpandOp` /
52    /// `VarLengthExpandOp` constraint lookups — can resolve
53    /// variables the right-side scan didn't bind directly. Empty
54    /// at the top level.
55    pub outer_rows: &'a [&'a Row],
56    /// Tombstones for entities deleted earlier in the same query.
57    /// Shared by reference so DeleteOp can insert and downstream
58    /// eval can check.
59    pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64    fn put_node(&self, _: &Node) -> Result<()> {
65        Ok(())
66    }
67    fn put_edge(&self, _: &Edge) -> Result<()> {
68        Ok(())
69    }
70    fn delete_edge(&self, _: EdgeId) -> Result<()> {
71        Ok(())
72    }
73    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74        Ok(())
75    }
76}
77
78impl<'a> ExecCtx<'a> {
79    /// Build an `EvalCtx` wrapping the given row alongside this
80    /// exec context's params and reader. Used by operators to
81    /// bridge the per-operator context into the expression
82    /// evaluator without repeating the field list at every
83    /// `eval_expr` call site.
84    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85    where
86        'a: 'b,
87    {
88        EvalCtx {
89            row,
90            params: self.params,
91            reader: self.store,
92            procedures: self.procedures,
93            outer_rows: self.outer_rows,
94            tombstones: self.tombstones,
95        }
96    }
97
98    /// Look up a variable in `row` first, then in each outer-scope
99    /// row in order. Returns the nearest match. Used for constraint
100    /// lookups inside right-side operators of a `CartesianProduct`
101    /// so a fresh scan that references an outer binding
102    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
103    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104        if let Some(v) = row.get(name) {
105            return Some(v);
106        }
107        for outer in self.outer_rows {
108            if let Some(v) = outer.get(name) {
109                return Some(v);
110            }
111        }
112        None
113    }
114}
115
116pub trait Operator {
117    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120/// Execute a plan using the given store for both reads and writes.
121/// Equivalent to [`execute_with_reader`] with the store acting as both
122/// and an empty parameter map.
123pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124    let params = ParamMap::new();
125    execute_with_reader(
126        plan,
127        store as &dyn GraphReader,
128        store as &dyn GraphWriter,
129        &params,
130    )
131}
132
133/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
134/// sending mutations to a separate [`GraphWriter`]. Used by in-process
135/// setups where reads are always cheap-local (single node or
136/// full-replica Raft). No parameters.
137pub fn execute_with_writer(
138    plan: &LogicalPlan,
139    store: &RocksDbStorageEngine,
140    writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142    let params = ParamMap::new();
143    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
144}
145
146/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
147/// parameter map for `$param` resolution. Routing mode uses a
148/// partitioned reader that fans out reads across peers; the Bolt
149/// listener supplies the driver-bound param map.
150pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151    let text = meshdb_cypher::format_plan(plan);
152    let mut row = Row::new();
153    row.insert("plan".to_string(), Value::Property(Property::String(text)));
154    vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158    let result_rows = execute(plan, store)?;
159    let row_count = result_rows.len() as i64;
160    let plan_text = meshdb_cypher::format_plan(plan);
161    let summary = format!("{plan_text}\nRows: {row_count}");
162    let mut row = Row::new();
163    row.insert(
164        "profile".to_string(),
165        Value::Property(Property::String(summary)),
166    );
167    row.insert(
168        "rows".to_string(),
169        Value::Property(Property::Int64(row_count)),
170    );
171    Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175    plan: &LogicalPlan,
176    reader: &dyn GraphReader,
177    writer: &dyn GraphWriter,
178    params: &ParamMap,
179) -> Result<Vec<Row>> {
180    let empty_procs = ProcedureRegistry::new();
181    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184/// Like [`execute_with_reader`] but with an explicit procedure
185/// registry in scope. Used by the TCK harness to mount mock
186/// procedures declared by `there exists a procedure ...` steps,
187/// and reserved for the server startup path once built-in
188/// procedures land.
189pub fn execute_with_reader_and_procs(
190    plan: &LogicalPlan,
191    reader: &dyn GraphReader,
192    writer: &dyn GraphWriter,
193    params: &ParamMap,
194    procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196    // `datetime()`, `localtime()`, and friends cache "now" in a
197    // thread-local so multiple calls inside the same statement see
198    // the same instant. Clear it at the start of every execution so
199    // the next query gets a fresh reading.
200    crate::eval::reset_statement_time();
201    // Schema DDL never enters the operator pipeline — it's a
202    // side-effect on the backing store, not a row-producing query.
203    // Short-circuit here so `build_op` stays a pure plan-to-operator
204    // mapping and doesn't need DDL-specific cases.
205    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206        return Ok(rows);
207    }
208    let suppress_output = is_write_only_plan(plan);
209    let mut op = build_op(plan);
210    let tombstones = Tombstones::default();
211    let ctx = ExecCtx {
212        store: reader,
213        writer,
214        params,
215        procedures,
216        outer_rows: &[],
217        tombstones: &tombstones,
218    };
219    let mut rows = Vec::new();
220    while let Some(row) = op.next(&ctx)? {
221        rows.push(row);
222    }
223    if suppress_output {
224        Ok(Vec::new())
225    } else {
226        Ok(rows)
227    }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231    // A plan is write-only when its top-level operator is a mutation
232    // with no projection (Project/Identity) wrapping it. Mutations
233    // with RETURN have a Project on top.
234    match plan {
235        LogicalPlan::CreatePath { .. }
236        | LogicalPlan::Delete { .. }
237        | LogicalPlan::SetProperty { .. }
238        | LogicalPlan::Remove { .. }
239        | LogicalPlan::Foreach { .. }
240        | LogicalPlan::MergeNode { .. }
241        | LogicalPlan::MergeEdge { .. } => true,
242        _ => false,
243    }
244}
245
246/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
247/// statement and was handled; `Ok(None)` when it's a regular graph
248/// operation that the operator pipeline should execute normally.
249///
250/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
251/// value so Bolt clients see a non-empty RECORD stream rather than
252/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
253/// `SHOW INDEXES` returns one row per registered index with
254/// `label`, `property`, and `state` columns.
255fn try_execute_ddl(
256    plan: &LogicalPlan,
257    reader: &dyn GraphReader,
258    writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260    match plan {
261        LogicalPlan::CreatePropertyIndex { label, property } => {
262            writer.create_property_index(label, property)?;
263            Ok(Some(vec![node_index_ddl_ack_row(
264                "created", label, property,
265            )]))
266        }
267        LogicalPlan::DropPropertyIndex { label, property } => {
268            writer.drop_property_index(label, property)?;
269            Ok(Some(vec![node_index_ddl_ack_row(
270                "dropped", label, property,
271            )]))
272        }
273        LogicalPlan::CreateEdgePropertyIndex {
274            edge_type,
275            property,
276        } => {
277            writer.create_edge_property_index(edge_type, property)?;
278            Ok(Some(vec![edge_index_ddl_ack_row(
279                "created", edge_type, property,
280            )]))
281        }
282        LogicalPlan::DropEdgePropertyIndex {
283            edge_type,
284            property,
285        } => {
286            writer.drop_edge_property_index(edge_type, property)?;
287            Ok(Some(vec![edge_index_ddl_ack_row(
288                "dropped", edge_type, property,
289            )]))
290        }
291        LogicalPlan::ShowPropertyIndexes => {
292            // SHOW is a pure read — source it from the reader so
293            // overlay/partitioned readers deliver the right view
294            // without round-tripping through a writer. Both
295            // node-scoped and edge-scoped indexes land in the same
296            // row stream, distinguished by the `scope` column.
297            let mut rows: Vec<Row> = Vec::new();
298            for (label, property) in reader.list_property_indexes()? {
299                rows.push(show_index_row("NODE", label, property));
300            }
301            for (edge_type, property) in reader.list_edge_property_indexes()? {
302                rows.push(show_index_row("RELATIONSHIP", edge_type, property));
303            }
304            Ok(Some(rows))
305        }
306        LogicalPlan::CreatePropertyConstraint {
307            name,
308            scope,
309            properties,
310            kind,
311            if_not_exists,
312        } => {
313            let storage_kind = match kind {
314                ConstraintKind::Unique => PropertyConstraintKind::Unique,
315                ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
316                ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
317                ConstraintKind::PropertyType(t) => {
318                    PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
319                }
320            };
321            let storage_scope = cypher_to_storage_scope(scope);
322            let spec = writer.create_property_constraint(
323                name.as_deref(),
324                &storage_scope,
325                properties,
326                storage_kind,
327                *if_not_exists,
328            )?;
329            Ok(Some(vec![constraint_ack_row("created", &spec)]))
330        }
331        LogicalPlan::DropPropertyConstraint { name, if_exists } => {
332            writer.drop_property_constraint(name, *if_exists)?;
333            let mut row = Row::default();
334            row.insert(
335                "state".into(),
336                Value::Property(Property::String("dropped".into())),
337            );
338            row.insert(
339                "name".into(),
340                Value::Property(Property::String(name.clone())),
341            );
342            Ok(Some(vec![row]))
343        }
344        LogicalPlan::ShowPropertyConstraints => {
345            let specs = reader.list_property_constraints()?;
346            let rows = specs.into_iter().map(constraint_show_row).collect();
347            Ok(Some(rows))
348        }
349        _ => Ok(None),
350    }
351}
352
353/// One-row acknowledgement emitted after `CREATE CONSTRAINT`. Carries
354/// the resolved spec so callers see the final name when they omitted
355/// it. Columns match `SHOW CONSTRAINTS` (plus `state`) so Bolt clients
356/// can uniformly format DDL responses.
357fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
358    let mut row = constraint_show_row(spec.clone());
359    row.insert(
360        "state".into(),
361        Value::Property(Property::String(state.into())),
362    );
363    row
364}
365
366/// Row shape for `SHOW CONSTRAINTS` and `db.constraints()`. Columns:
367/// `name`, `scope` ("NODE" / "RELATIONSHIP"), `label` (label or edge
368/// type, depending on scope), `properties` (list), and `type`.
369/// Single-property constraints still carry a one-element `properties`
370/// list; composite kinds (e.g. `NodeKey`) carry the full tuple.
371fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
372    let mut row = Row::default();
373    row.insert("name".into(), Value::Property(Property::String(spec.name)));
374    let (scope_tag, target) = match spec.scope {
375        meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
376        meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
377    };
378    row.insert(
379        "scope".into(),
380        Value::Property(Property::String(scope_tag.into())),
381    );
382    // `label` stays for backwards compatibility — it carries the
383    // scope's target regardless of node vs relationship, matching
384    // the index DDL row's column for historical consistency. A
385    // separate `scope` column tells you what kind of target it is.
386    row.insert("label".into(), Value::Property(Property::String(target)));
387    let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
388    row.insert("properties".into(), Value::Property(Property::List(props)));
389    row.insert(
390        "type".into(),
391        Value::Property(Property::String(spec.kind.as_string())),
392    );
393    row
394}
395
396/// Convert the Cypher AST's `ConstraintScope` into the storage-layer
397/// enum. Narrow bridge — same shape on both sides; kept in
398/// different crates to preserve the dependency direction.
399fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
400    match scope {
401        CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
402        CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
403    }
404}
405
406/// Convert the Cypher AST's `PropertyType` into the storage-layer
407/// enum. Narrow bridge — the two enums carry the same four variants
408/// but live in different crates to keep the dependency direction
409/// clean (cypher → executor → storage, never the reverse).
410fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
411    match t {
412        CypherPropertyType::String => StoragePropertyType::String,
413        CypherPropertyType::Integer => StoragePropertyType::Integer,
414        CypherPropertyType::Float => StoragePropertyType::Float,
415        CypherPropertyType::Boolean => StoragePropertyType::Boolean,
416    }
417}
418
419/// One-row acknowledgement emitted after `CREATE INDEX` /
420/// `DROP INDEX` on a node scope. Columns mirror the `SHOW INDEXES`
421/// shape (`scope`, `label`, `property`) so Bolt clients can format
422/// DDL responses uniformly with schema-read responses. `label` is
423/// kept as a column name for backwards-compat with pre-edge-index
424/// clients; the `scope` column disambiguates node from edge.
425fn node_index_ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
426    let mut row = Row::default();
427    row.insert(
428        "state".into(),
429        Value::Property(Property::String(state.into())),
430    );
431    row.insert(
432        "scope".into(),
433        Value::Property(Property::String("NODE".into())),
434    );
435    row.insert(
436        "label".into(),
437        Value::Property(Property::String(label.into())),
438    );
439    row.insert(
440        "property".into(),
441        Value::Property(Property::String(property.into())),
442    );
443    row
444}
445
446/// One-row acknowledgement emitted after `CREATE INDEX` /
447/// `DROP INDEX` on a relationship scope. Mirrors
448/// [`node_index_ddl_ack_row`] but the target lives under
449/// `edge_type` (and `label` carries the same string so existing
450/// driver code that read `label` still sees the target name).
451fn edge_index_ddl_ack_row(state: &str, edge_type: &str, property: &str) -> Row {
452    let mut row = Row::default();
453    row.insert(
454        "state".into(),
455        Value::Property(Property::String(state.into())),
456    );
457    row.insert(
458        "scope".into(),
459        Value::Property(Property::String("RELATIONSHIP".into())),
460    );
461    row.insert(
462        "label".into(),
463        Value::Property(Property::String(edge_type.into())),
464    );
465    row.insert(
466        "edge_type".into(),
467        Value::Property(Property::String(edge_type.into())),
468    );
469    row.insert(
470        "property".into(),
471        Value::Property(Property::String(property.into())),
472    );
473    row
474}
475
476/// Row shape for `SHOW INDEXES` output. Columns:
477/// `scope` ("NODE" / "RELATIONSHIP"), `label` (the scope target —
478/// label for node, edge type for relationship; kept for
479/// backwards-compat), `property`, and `state`. `label` and
480/// `edge_type` carry the same string when `scope == "RELATIONSHIP"`
481/// so clients can read either column.
482fn show_index_row(scope: &str, target: String, property: String) -> Row {
483    let mut row = Row::default();
484    row.insert(
485        "scope".into(),
486        Value::Property(Property::String(scope.into())),
487    );
488    row.insert(
489        "label".into(),
490        Value::Property(Property::String(target.clone())),
491    );
492    if scope == "RELATIONSHIP" {
493        row.insert(
494            "edge_type".into(),
495            Value::Property(Property::String(target)),
496        );
497    }
498    row.insert(
499        "property".into(),
500        Value::Property(Property::String(property)),
501    );
502    row.insert(
503        "state".into(),
504        Value::Property(Property::String("online".into())),
505    );
506    row
507}
508
509fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
510    build_op_inner(plan, None)
511}
512
513pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
514    macro_rules! child {
515        ($p:expr) => {
516            build_op_inner($p, seed)
517        };
518    }
519    match plan {
520        LogicalPlan::CreatePath {
521            input,
522            nodes,
523            edges,
524        } => Box::new(CreatePathOp::new(
525            input.as_ref().map(|p| child!(p)),
526            nodes.clone(),
527            edges.clone(),
528        )),
529        LogicalPlan::CartesianProduct { left, right } => {
530            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
531        }
532        LogicalPlan::Delete {
533            input,
534            detach,
535            vars,
536            exprs,
537        } => Box::new(DeleteOp::new(
538            child!(input),
539            *detach,
540            vars.clone(),
541            exprs.clone(),
542        )),
543        LogicalPlan::SetProperty { input, assignments } => {
544            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
545        }
546        LogicalPlan::Remove { input, items } => {
547            Box::new(RemoveOp::new(child!(input), items.clone()))
548        }
549        LogicalPlan::LoadCsv {
550            input,
551            path_expr,
552            var,
553            with_headers,
554        } => Box::new(LoadCsvOp::new(
555            input.as_ref().map(|p| child!(p)),
556            path_expr.clone(),
557            var.clone(),
558            *with_headers,
559        )),
560        LogicalPlan::Foreach {
561            input,
562            var,
563            list_expr,
564            set_assignments,
565            remove_items,
566        } => Box::new(ForeachOp::new(
567            child!(input),
568            var.clone(),
569            list_expr.clone(),
570            set_assignments.clone(),
571            remove_items.clone(),
572        )),
573        LogicalPlan::CallSubquery { input, body } => {
574            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
575        }
576        LogicalPlan::OptionalApply {
577            input,
578            body,
579            null_vars,
580        } => Box::new(OptionalApplyOp::new(
581            child!(input),
582            (**body).clone(),
583            null_vars.clone(),
584        )),
585        LogicalPlan::ProcedureCall {
586            input,
587            qualified_name,
588            args,
589            yield_spec,
590            standalone,
591        } => Box::new(ProcedureCallOp::new(
592            input.as_ref().map(|p| child!(p)),
593            qualified_name.clone(),
594            args.clone(),
595            yield_spec.clone(),
596            *standalone,
597        )),
598        LogicalPlan::SeedRow => match seed {
599            Some(r) => Box::new(SeededRowOp {
600                row: Some(r.clone()),
601            }),
602            None => Box::new(SeedRowOp { done: false }),
603        },
604        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
605        LogicalPlan::NodeScanByLabels { var, labels } => {
606            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
607        }
608        LogicalPlan::EdgeExpand {
609            input,
610            src_var,
611            edge_var,
612            dst_var,
613            dst_labels,
614            edge_properties,
615            edge_types,
616            direction,
617            edge_constraint_var,
618        } => Box::new(EdgeExpandOp::new(
619            child!(input),
620            src_var.clone(),
621            edge_var.clone(),
622            dst_var.clone(),
623            dst_labels.clone(),
624            edge_properties.clone(),
625            edge_types.clone(),
626            *direction,
627            edge_constraint_var.clone(),
628        )),
629        LogicalPlan::OptionalEdgeExpand {
630            input,
631            src_var,
632            edge_var,
633            dst_var,
634            dst_labels,
635            dst_properties,
636            edge_types,
637            direction,
638            dst_constraint_var,
639            edge_constraint_var,
640        } => Box::new(OptionalEdgeExpandOp::new(
641            child!(input),
642            src_var.clone(),
643            edge_var.clone(),
644            dst_var.clone(),
645            dst_labels.clone(),
646            dst_properties.clone(),
647            edge_types.clone(),
648            *direction,
649            dst_constraint_var.clone(),
650            edge_constraint_var.clone(),
651        )),
652        LogicalPlan::VarLengthExpand {
653            input,
654            src_var,
655            edge_var,
656            dst_var,
657            dst_labels,
658            edge_types,
659            edge_properties,
660            direction,
661            min_hops,
662            max_hops,
663            path_var,
664            optional,
665            dst_constraint_var,
666            bound_edge_list_var,
667            excluded_edge_vars,
668        } => Box::new(VarLengthExpandOp::new(
669            child!(input),
670            src_var.clone(),
671            edge_var.clone(),
672            dst_var.clone(),
673            dst_labels.clone(),
674            edge_types.clone(),
675            edge_properties.clone(),
676            *direction,
677            *min_hops,
678            *max_hops,
679            path_var.clone(),
680            *optional,
681            dst_constraint_var.clone(),
682            bound_edge_list_var.clone(),
683            excluded_edge_vars.clone(),
684        )),
685        LogicalPlan::Filter { input, predicate } => {
686            Box::new(FilterOp::new(child!(input), predicate.clone()))
687        }
688        LogicalPlan::Project { input, items } => {
689            Box::new(ProjectOp::new(child!(input), items.clone()))
690        }
691        LogicalPlan::Aggregate {
692            input,
693            group_keys,
694            aggregates,
695        } => Box::new(AggregateOp::new(
696            child!(input),
697            group_keys.clone(),
698            aggregates.clone(),
699        )),
700        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
701        LogicalPlan::CoalesceNullRow { input, null_vars } => {
702            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
703        }
704        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
705        LogicalPlan::OrderBy { input, sort_items } => {
706            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
707        }
708        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
709        LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
710        LogicalPlan::MergeNode {
711            input,
712            var,
713            labels,
714            properties,
715            on_create,
716            on_match,
717        } => Box::new(MergeNodeOp::new(
718            input.as_ref().map(|p| child!(p)),
719            var.clone(),
720            labels.clone(),
721            properties.clone(),
722            on_create.clone(),
723            on_match.clone(),
724        )),
725        LogicalPlan::MergeEdge {
726            input,
727            edge_var,
728            src_var,
729            dst_var,
730            edge_type,
731            undirected,
732            properties,
733            on_create,
734            on_match,
735        } => Box::new(MergeEdgeOp::new(
736            child!(input),
737            edge_var.clone(),
738            src_var.clone(),
739            dst_var.clone(),
740            edge_type.clone(),
741            *undirected,
742            properties.clone(),
743            on_create.clone(),
744            on_match.clone(),
745        )),
746        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
747        LogicalPlan::UnwindChain { input, var, expr } => {
748            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
749        }
750        LogicalPlan::IndexSeek {
751            var,
752            label,
753            property,
754            value,
755        } => Box::new(IndexSeekOp::new(
756            var.clone(),
757            label.clone(),
758            property.clone(),
759            value.clone(),
760        )),
761        // DDL plans are handled by `try_execute_ddl` before the
762        // operator pipeline is built, so they should never reach
763        // this point. An assertion here catches any future
764        // refactor that forgets to gate them.
765        LogicalPlan::Union { branches, all } => {
766            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
767            Box::new(UnionOp::new(branch_ops, *all))
768        }
769        LogicalPlan::BindPath {
770            input,
771            path_var,
772            node_vars,
773            edge_vars,
774        } => Box::new(BindPathOp::new(
775            child!(input),
776            path_var.clone(),
777            node_vars.clone(),
778            edge_vars.clone(),
779        )),
780        LogicalPlan::ShortestPath {
781            input,
782            src_var,
783            dst_var,
784            path_var,
785            edge_types,
786            direction,
787            max_hops,
788            kind,
789        } => Box::new(ShortestPathOp::new(
790            child!(input),
791            src_var.clone(),
792            dst_var.clone(),
793            path_var.clone(),
794            edge_types.clone(),
795            *direction,
796            *max_hops,
797            *kind,
798        )),
799        LogicalPlan::CreatePropertyIndex { .. }
800        | LogicalPlan::DropPropertyIndex { .. }
801        | LogicalPlan::CreateEdgePropertyIndex { .. }
802        | LogicalPlan::DropEdgePropertyIndex { .. }
803        | LogicalPlan::ShowPropertyIndexes
804        | LogicalPlan::CreatePropertyConstraint { .. }
805        | LogicalPlan::DropPropertyConstraint { .. }
806        | LogicalPlan::ShowPropertyConstraints => {
807            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
808        }
809    }
810}
811
812struct UnwindOp {
813    var: String,
814    expr: Expr,
815    items: Option<Vec<Value>>,
816    cursor: usize,
817}
818
819impl UnwindOp {
820    fn new(var: String, expr: Expr) -> Self {
821        Self {
822            var,
823            expr,
824            items: None,
825            cursor: 0,
826        }
827    }
828}
829
830impl Operator for UnwindOp {
831    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
832        if self.items.is_none() {
833            let empty = Row::new();
834            let ectx = EvalCtx {
835                row: &empty,
836                params: ctx.params,
837                reader: ctx.store,
838                procedures: ctx.procedures,
839                outer_rows: ctx.outer_rows,
840                tombstones: ctx.tombstones,
841            };
842            let val = eval_expr(&self.expr, &ectx)?;
843            self.items = Some(coerce_unwind_list(val)?);
844        }
845        let items = self.items.as_ref().unwrap();
846        if self.cursor < items.len() {
847            let v = items[self.cursor].clone();
848            self.cursor += 1;
849            let mut row = Row::new();
850            row.insert(self.var.clone(), v);
851            Ok(Some(row))
852        } else {
853            Ok(None)
854        }
855    }
856}
857
858/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
859/// to produce a list, and emits one output row per list element.
860/// Each output row inherits every binding from the input row plus
861/// a new `var` binding. Empty / null lists drop the input row and
862/// the operator immediately pulls the next input row.
863struct UnwindChainOp {
864    input: Box<dyn Operator>,
865    var: String,
866    expr: Expr,
867    current_row: Option<Row>,
868    items: Vec<Value>,
869    cursor: usize,
870}
871
872impl UnwindChainOp {
873    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
874        Self {
875            input,
876            var,
877            expr,
878            current_row: None,
879            items: Vec::new(),
880            cursor: 0,
881        }
882    }
883}
884
885impl Operator for UnwindChainOp {
886    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
887        loop {
888            if let Some(base) = &self.current_row {
889                if self.cursor < self.items.len() {
890                    let v = self.items[self.cursor].clone();
891                    self.cursor += 1;
892                    let mut row = base.clone();
893                    row.insert(self.var.clone(), v);
894                    return Ok(Some(row));
895                }
896                self.current_row = None;
897                self.items.clear();
898                self.cursor = 0;
899            }
900            let base = match self.input.next(ctx)? {
901                Some(r) => r,
902                None => return Ok(None),
903            };
904            let ectx = EvalCtx {
905                row: &base,
906                params: ctx.params,
907                reader: ctx.store,
908                procedures: ctx.procedures,
909                outer_rows: ctx.outer_rows,
910                tombstones: ctx.tombstones,
911            };
912            let val = eval_expr(&self.expr, &ectx)?;
913            self.items = coerce_unwind_list(val)?;
914            self.current_row = Some(base);
915        }
916    }
917}
918
919/// Shared list-coercion used by both UNWIND operators. Accepts a
920/// native executor `Value::List`, a property-list
921/// `Property::List`, or `Null` (treated as an empty list). Any
922/// other shape is a type error — UNWIND is defined over lists.
923fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
924    match val {
925        Value::List(items) => Ok(items),
926        Value::Property(Property::List(props)) => {
927            Ok(props.into_iter().map(Value::Property).collect())
928        }
929        Value::Null => Ok(Vec::new()),
930        _ => Err(Error::TypeMismatch),
931    }
932}
933
934struct CreatePathOp {
935    input: Option<Box<dyn Operator>>,
936    nodes: Vec<CreateNodeSpec>,
937    edges: Vec<CreateEdgeSpec>,
938    done: bool,
939    buffered: Option<Vec<Row>>,
940    cursor: usize,
941}
942
943impl CreatePathOp {
944    fn new(
945        input: Option<Box<dyn Operator>>,
946        nodes: Vec<CreateNodeSpec>,
947        edges: Vec<CreateEdgeSpec>,
948    ) -> Self {
949        Self {
950            input,
951            nodes,
952            edges,
953            done: false,
954            buffered: None,
955            cursor: 0,
956        }
957    }
958
959    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
960        let mut out = row.clone();
961        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
962        for spec in &self.nodes {
963            match spec {
964                CreateNodeSpec::New {
965                    var,
966                    labels,
967                    properties,
968                } => {
969                    let mut node = Node::new();
970                    for label in labels {
971                        node.labels.push(label.clone());
972                    }
973                    // Property values are `Expr` (literal | parameter
974                    // per the grammar). Evaluate against the current row
975                    // + params and convert to a stored Property via the
976                    // existing helper, which rejects Node/Edge values.
977                    // Null-valued properties aren't stored — openCypher
978                    // treats `{k: null}` as "no such property," so
979                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
980                    for (k, expr) in properties {
981                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
982                        let prop = value_to_property(value)?;
983                        if matches!(prop, Property::Null) {
984                            continue;
985                        }
986                        node.properties.insert(k.clone(), prop);
987                    }
988                    ctx.writer.put_node(&node)?;
989                    node_ids.push(node.id);
990                    if let Some(v) = var {
991                        out.insert(v.clone(), Value::Node(node));
992                    }
993                }
994                CreateNodeSpec::Reference(name) => {
995                    let id = match out.get(name) {
996                        Some(Value::Node(n)) => n.id,
997                        _ => return Err(Error::UnboundVariable(name.clone())),
998                    };
999                    node_ids.push(id);
1000                }
1001            }
1002        }
1003        for spec in &self.edges {
1004            let src = node_ids[spec.src_idx];
1005            let dst = node_ids[spec.dst_idx];
1006            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1007            for (k, expr) in &spec.properties {
1008                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1009                let prop = value_to_property(value)?;
1010                if matches!(prop, Property::Null) {
1011                    continue;
1012                }
1013                edge.properties.insert(k.clone(), prop);
1014            }
1015            ctx.writer.put_edge(&edge)?;
1016            if let Some(v) = &spec.var {
1017                out.insert(v.clone(), Value::Edge(edge));
1018            }
1019        }
1020        Ok(out)
1021    }
1022}
1023
1024impl Operator for CreatePathOp {
1025    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1026        if self.input.is_some() {
1027            // Drain the whole input first, then replay sequentially. Draining up
1028            // front avoids aliasing the source scan's cursor while we're writing
1029            // to the store (node-label scans cache ids lazily on first call).
1030            if let Some(buffered) = self.buffered.as_mut() {
1031                if self.cursor < buffered.len() {
1032                    let row = buffered[self.cursor].clone();
1033                    self.cursor += 1;
1034                    return Ok(Some(self.apply(ctx, &row)?));
1035                }
1036                return Ok(None);
1037            }
1038            let mut rows: Vec<Row> = Vec::new();
1039            {
1040                let input = self.input.as_mut().unwrap();
1041                while let Some(row) = input.next(ctx)? {
1042                    rows.push(row);
1043                }
1044            }
1045            self.buffered = Some(rows);
1046            self.cursor = 0;
1047            // Fall through to next call via recursion.
1048            self.next(ctx)
1049        } else {
1050            if self.done {
1051                return Ok(None);
1052            }
1053            self.done = true;
1054            let empty = Row::new();
1055            Ok(Some(self.apply(ctx, &empty)?))
1056        }
1057    }
1058}
1059
1060struct CartesianProductOp {
1061    left: Box<dyn Operator>,
1062    right_plan: LogicalPlan,
1063    left_row: Option<Row>,
1064    right_op: Option<Box<dyn Operator>>,
1065}
1066
1067impl CartesianProductOp {
1068    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1069        Self {
1070            left,
1071            right_plan,
1072            left_row: None,
1073            right_op: None,
1074        }
1075    }
1076}
1077
1078impl Operator for CartesianProductOp {
1079    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1080        loop {
1081            if self.left_row.is_none() {
1082                match self.left.next(ctx)? {
1083                    None => return Ok(None),
1084                    Some(row) => {
1085                        self.left_row = Some(row);
1086                        self.right_op = Some(build_op(&self.right_plan));
1087                    }
1088                }
1089            }
1090            let right_op = self.right_op.as_mut().expect("right_op set");
1091            // Expose the current left row to right-side operators
1092            // via a shadow context so constraint lookups (for
1093            // bound edges / bound edge lists) can find vars that
1094            // the right-side scan itself doesn't bind.
1095            let left_ref = self.left_row.as_ref().unwrap();
1096            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1097            stacked.push(left_ref);
1098            stacked.extend_from_slice(ctx.outer_rows);
1099            let inner_ctx = ExecCtx {
1100                store: ctx.store,
1101                writer: ctx.writer,
1102                params: ctx.params,
1103                procedures: ctx.procedures,
1104                outer_rows: &stacked,
1105                tombstones: ctx.tombstones,
1106            };
1107            match right_op.next(&inner_ctx)? {
1108                Some(right_row) => {
1109                    let mut combined = left_ref.clone();
1110                    for (k, v) in right_row {
1111                        combined.insert(k, v);
1112                    }
1113                    return Ok(Some(combined));
1114                }
1115                None => {
1116                    self.left_row = None;
1117                    self.right_op = None;
1118                }
1119            }
1120        }
1121    }
1122}
1123
1124struct DeleteOp {
1125    input: Box<dyn Operator>,
1126    detach: bool,
1127    #[allow(dead_code)]
1128    vars: Vec<String>,
1129    exprs: Vec<Expr>,
1130    /// Rows drained from `input` before any deletes have run.
1131    /// Populated on first call to `next`. openCypher treats DELETE
1132    /// as a batch mutation: the whole preceding row set is
1133    /// materialised, then the mutations happen, then rows are
1134    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
1135    /// a, b RETURN count(*)` deletes the first row's nodes before
1136    /// the pipelined scan reaches the second row.
1137    buffered: Option<Vec<Row>>,
1138    cursor: usize,
1139}
1140
1141impl DeleteOp {
1142    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1143        Self {
1144            input,
1145            detach,
1146            vars,
1147            exprs,
1148            buffered: None,
1149            cursor: 0,
1150        }
1151    }
1152
1153    /// Gather every edge and node id referenced by the row's
1154    /// DELETE expressions and run the delete in two phases —
1155    /// edges first, then nodes. The two-phase ordering is what
1156    /// lets `DELETE p1, p2` succeed when two paths share nodes
1157    /// connected by each other's edges: by the time the node
1158    /// phase runs, the in-batch edges are already gone. The
1159    /// non-detach check probes the graph then filters out
1160    /// any edge we just deleted, so the batch-level detachment
1161    /// is counted as "no longer attached" rather than failing.
1162    fn apply_deletes(
1163        &self,
1164        ctx: &ExecCtx,
1165        row: &Row,
1166        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1167        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1168    ) -> Result<()> {
1169        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1170        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1171        for expr in &self.exprs {
1172            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1173            match v {
1174                Value::Node(n) => node_ids.push(n.id),
1175                Value::Edge(e) => edge_ids.push(e.id),
1176                Value::Path { nodes, edges } => {
1177                    for e in edges {
1178                        edge_ids.push(e.id);
1179                    }
1180                    for n in nodes {
1181                        node_ids.push(n.id);
1182                    }
1183                }
1184                Value::Null | Value::Property(Property::Null) => continue,
1185                _ => return Err(Error::TypeMismatch),
1186            }
1187        }
1188        for eid in &edge_ids {
1189            if seen_edges.insert(*eid) {
1190                ctx.writer.delete_edge(*eid)?;
1191                ctx.tombstones.edges.borrow_mut().insert(*eid);
1192            }
1193        }
1194        for nid in &node_ids {
1195            if !seen_nodes.insert(*nid) {
1196                continue;
1197            }
1198            if self.detach {
1199                // DETACH DELETE also removes every attached edge;
1200                // tombstone them too so a later projection over a
1201                // formerly-attached edge clone also raises rather
1202                // than reading stale properties.
1203                for (eid, _) in ctx.store.outgoing(*nid)? {
1204                    ctx.tombstones.edges.borrow_mut().insert(eid);
1205                }
1206                for (eid, _) in ctx.store.incoming(*nid)? {
1207                    ctx.tombstones.edges.borrow_mut().insert(eid);
1208                }
1209                ctx.writer.detach_delete_node(*nid)?;
1210            } else {
1211                let out = ctx.store.outgoing(*nid)?;
1212                let inc = ctx.store.incoming(*nid)?;
1213                let still_attached = out
1214                    .iter()
1215                    .chain(inc.iter())
1216                    .any(|(eid, _)| !seen_edges.contains(eid));
1217                if still_attached {
1218                    return Err(Error::CannotDeleteAttachedNode);
1219                }
1220                ctx.writer.detach_delete_node(*nid)?;
1221            }
1222            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1223        }
1224        Ok(())
1225    }
1226}
1227
1228impl Operator for DeleteOp {
1229    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1230        // First call: drain the input, run all deletes, then
1231        // start forwarding rows. Buffering protects against the
1232        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1233        // DELETE r, a, b RETURN count(*)` must observe every
1234        // MATCH row before any node or edge disappears, otherwise
1235        // the undirected expansion hits an already-deleted source
1236        // on the second iteration.
1237        if self.buffered.is_none() {
1238            let mut rows: Vec<Row> = Vec::new();
1239            while let Some(row) = self.input.next(ctx)? {
1240                rows.push(row);
1241            }
1242            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1243            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1244            for row in &rows {
1245                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1246            }
1247            self.buffered = Some(rows);
1248            self.cursor = 0;
1249        }
1250        let rows = self.buffered.as_ref().unwrap();
1251        if self.cursor < rows.len() {
1252            let row = rows[self.cursor].clone();
1253            self.cursor += 1;
1254            return Ok(Some(row));
1255        }
1256        Ok(None)
1257    }
1258}
1259
1260struct SetPropertyOp {
1261    input: Box<dyn Operator>,
1262    assignments: Vec<SetAssignment>,
1263}
1264
1265impl SetPropertyOp {
1266    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1267        Self { input, assignments }
1268    }
1269}
1270
1271impl Operator for SetPropertyOp {
1272    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1273        match self.input.next(ctx)? {
1274            None => Ok(None),
1275            Some(mut row) => {
1276                // Phase 1: evaluate any RHSes against the original row bindings.
1277                enum Action {
1278                    SetKey {
1279                        var: String,
1280                        key: String,
1281                        prop: Property,
1282                    },
1283                    AddLabels {
1284                        var: String,
1285                        labels: Vec<String>,
1286                    },
1287                    Replace {
1288                        var: String,
1289                        props: Vec<(String, Property)>,
1290                    },
1291                    Merge {
1292                        var: String,
1293                        props: Vec<(String, Property)>,
1294                    },
1295                }
1296                let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1297                for a in &self.assignments {
1298                    match a {
1299                        SetAssignment::Property { var, key, value } => {
1300                            let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1301                            let prop = value_to_property(evaluated)?;
1302                            actions.push(Action::SetKey {
1303                                var: var.clone(),
1304                                key: key.clone(),
1305                                prop,
1306                            });
1307                        }
1308                        SetAssignment::Labels { var, labels } => {
1309                            actions.push(Action::AddLabels {
1310                                var: var.clone(),
1311                                labels: labels.clone(),
1312                            });
1313                        }
1314                        SetAssignment::Replace { var, properties } => {
1315                            // Property values are now `Expr`. Evaluate
1316                            // each against the current row + params and
1317                            // convert via the shared helper, which
1318                            // surfaces InvalidSetValue for Node/Edge.
1319                            let props = properties
1320                                .iter()
1321                                .map(|(k, expr)| {
1322                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1323                                    Ok((k.clone(), value_to_property(v)?))
1324                                })
1325                                .collect::<Result<Vec<(String, Property)>>>()?;
1326                            actions.push(Action::Replace {
1327                                var: var.clone(),
1328                                props,
1329                            });
1330                        }
1331                        SetAssignment::Merge { var, properties } => {
1332                            let props = properties
1333                                .iter()
1334                                .map(|(k, expr)| {
1335                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1336                                    Ok((k.clone(), value_to_property(v)?))
1337                                })
1338                                .collect::<Result<Vec<(String, Property)>>>()?;
1339                            actions.push(Action::Merge {
1340                                var: var.clone(),
1341                                props,
1342                            });
1343                        }
1344                        SetAssignment::ReplaceFromExpr {
1345                            var,
1346                            source,
1347                            replace,
1348                        } => {
1349                            let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1350                            let props = extract_property_map(&v)?;
1351                            if *replace {
1352                                actions.push(Action::Replace {
1353                                    var: var.clone(),
1354                                    props,
1355                                });
1356                            } else {
1357                                actions.push(Action::Merge {
1358                                    var: var.clone(),
1359                                    props,
1360                                });
1361                            }
1362                        }
1363                    }
1364                }
1365
1366                // Phase 2: apply updates in-place to the row bindings.
1367                let mut updated_nodes: HashSet<String> = HashSet::new();
1368                let mut updated_edges: HashSet<String> = HashSet::new();
1369                for action in actions {
1370                    match action {
1371                        Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1372                            // openCypher: SET on a null target (from
1373                            // OPTIONAL MATCH that didn't bind) is a
1374                            // silent no-op rather than an error.
1375                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1376                                continue
1377                            }
1378                            // openCypher: SET a.key = null removes the
1379                            // property rather than storing a null value.
1380                            Some(Value::Node(n)) => {
1381                                if matches!(prop, Property::Null) {
1382                                    n.properties.remove(&key);
1383                                } else {
1384                                    n.properties.insert(key, prop);
1385                                }
1386                                updated_nodes.insert(var);
1387                            }
1388                            Some(Value::Edge(e)) => {
1389                                if matches!(prop, Property::Null) {
1390                                    e.properties.remove(&key);
1391                                } else {
1392                                    e.properties.insert(key, prop);
1393                                }
1394                                updated_edges.insert(var);
1395                            }
1396                            _ => return Err(Error::UnboundVariable(var)),
1397                        },
1398                        Action::AddLabels { var, labels } => match row.get_mut(&var) {
1399                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1400                                continue
1401                            }
1402                            Some(Value::Node(n)) => {
1403                                for label in labels {
1404                                    if !n.labels.contains(&label) {
1405                                        n.labels.push(label);
1406                                    }
1407                                }
1408                                updated_nodes.insert(var);
1409                            }
1410                            _ => return Err(Error::UnboundVariable(var)),
1411                        },
1412                        Action::Replace { var, props } => match row.get_mut(&var) {
1413                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1414                                continue
1415                            }
1416                            Some(Value::Node(n)) => {
1417                                n.properties.clear();
1418                                for (k, v) in props {
1419                                    if !matches!(v, Property::Null) {
1420                                        n.properties.insert(k, v);
1421                                    }
1422                                }
1423                                updated_nodes.insert(var);
1424                            }
1425                            Some(Value::Edge(e)) => {
1426                                e.properties.clear();
1427                                for (k, v) in props {
1428                                    if !matches!(v, Property::Null) {
1429                                        e.properties.insert(k, v);
1430                                    }
1431                                }
1432                                updated_edges.insert(var);
1433                            }
1434                            _ => return Err(Error::UnboundVariable(var)),
1435                        },
1436                        Action::Merge { var, props } => match row.get_mut(&var) {
1437                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1438                                continue
1439                            }
1440                            Some(Value::Node(n)) => {
1441                                for (k, v) in props {
1442                                    if matches!(v, Property::Null) {
1443                                        n.properties.remove(&k);
1444                                    } else {
1445                                        n.properties.insert(k, v);
1446                                    }
1447                                }
1448                                updated_nodes.insert(var);
1449                            }
1450                            Some(Value::Edge(e)) => {
1451                                for (k, v) in props {
1452                                    if matches!(v, Property::Null) {
1453                                        e.properties.remove(&k);
1454                                    } else {
1455                                        e.properties.insert(k, v);
1456                                    }
1457                                }
1458                                updated_edges.insert(var);
1459                            }
1460                            _ => return Err(Error::UnboundVariable(var)),
1461                        },
1462                    }
1463                }
1464
1465                // Phase 3: flush each mutated entity once to the writer.
1466                for var in &updated_nodes {
1467                    if let Some(Value::Node(n)) = row.get(var) {
1468                        ctx.writer.put_node(n)?;
1469                    }
1470                }
1471                for var in &updated_edges {
1472                    if let Some(Value::Edge(e)) = row.get(var) {
1473                        ctx.writer.put_edge(e)?;
1474                    }
1475                }
1476
1477                Ok(Some(row))
1478            }
1479        }
1480    }
1481}
1482
1483struct RemoveOp {
1484    input: Box<dyn Operator>,
1485    items: Vec<RemoveSpec>,
1486}
1487
1488impl RemoveOp {
1489    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1490        Self { input, items }
1491    }
1492}
1493
1494impl Operator for RemoveOp {
1495    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1496        match self.input.next(ctx)? {
1497            None => Ok(None),
1498            Some(mut row) => {
1499                let mut updated_nodes: HashSet<String> = HashSet::new();
1500                let mut updated_edges: HashSet<String> = HashSet::new();
1501                for item in &self.items {
1502                    match item {
1503                        RemoveSpec::Property { var, key } => match row.get_mut(var) {
1504                            // Null target (from OPTIONAL MATCH) is a
1505                            // no-op, matching Neo4j's REMOVE semantics.
1506                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1507                                continue
1508                            }
1509                            Some(Value::Node(n)) => {
1510                                n.properties.remove(key);
1511                                updated_nodes.insert(var.clone());
1512                            }
1513                            Some(Value::Edge(e)) => {
1514                                e.properties.remove(key);
1515                                updated_edges.insert(var.clone());
1516                            }
1517                            _ => return Err(Error::UnboundVariable(var.clone())),
1518                        },
1519                        RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1520                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1521                                continue
1522                            }
1523                            Some(Value::Node(n)) => {
1524                                n.labels.retain(|l| !labels.contains(l));
1525                                updated_nodes.insert(var.clone());
1526                            }
1527                            _ => return Err(Error::UnboundVariable(var.clone())),
1528                        },
1529                    }
1530                }
1531                for var in &updated_nodes {
1532                    if let Some(Value::Node(n)) = row.get(var) {
1533                        ctx.writer.put_node(n)?;
1534                    }
1535                }
1536                for var in &updated_edges {
1537                    if let Some(Value::Edge(e)) = row.get(var) {
1538                        ctx.writer.put_edge(e)?;
1539                    }
1540                }
1541                Ok(Some(row))
1542            }
1543        }
1544    }
1545}
1546
1547struct LoadCsvOp {
1548    input: Option<Box<dyn Operator>>,
1549    path_expr: Expr,
1550    var: String,
1551    with_headers: bool,
1552    rows: Option<Vec<Value>>,
1553    cursor: usize,
1554}
1555
1556impl LoadCsvOp {
1557    fn new(
1558        input: Option<Box<dyn Operator>>,
1559        path_expr: Expr,
1560        var: String,
1561        with_headers: bool,
1562    ) -> Self {
1563        Self {
1564            input,
1565            path_expr,
1566            var,
1567            with_headers,
1568            rows: None,
1569            cursor: 0,
1570        }
1571    }
1572
1573    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1574        let ectx = ctx.eval_ctx(base_row);
1575        let path_val = eval_expr(&self.path_expr, &ectx)?;
1576        let path = match path_val {
1577            Value::Property(Property::String(s)) => s,
1578            _ => return Err(Error::TypeMismatch),
1579        };
1580        let content = std::fs::read_to_string(&path).map_err(|e| {
1581            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1582        })?;
1583        let mut lines = content.lines();
1584        let headers: Option<Vec<String>> = if self.with_headers {
1585            lines
1586                .next()
1587                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1588        } else {
1589            None
1590        };
1591        let mut csv_rows = Vec::new();
1592        for line in lines {
1593            if line.trim().is_empty() {
1594                continue;
1595            }
1596            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1597            if let Some(hdrs) = &headers {
1598                let mut map = std::collections::HashMap::new();
1599                for (i, h) in hdrs.iter().enumerate() {
1600                    let val = fields.get(i).cloned().unwrap_or_default();
1601                    map.insert(h.clone(), Property::String(val));
1602                }
1603                csv_rows.push(Value::Property(Property::Map(map)));
1604            } else {
1605                let list: Vec<Value> = fields
1606                    .into_iter()
1607                    .map(|f| Value::Property(Property::String(f)))
1608                    .collect();
1609                csv_rows.push(Value::List(list));
1610            }
1611        }
1612        self.rows = Some(csv_rows);
1613        self.cursor = 0;
1614        Ok(())
1615    }
1616}
1617
1618impl Operator for LoadCsvOp {
1619    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1620        if self.rows.is_none() {
1621            let base = if let Some(input) = &mut self.input {
1622                match input.next(ctx)? {
1623                    Some(r) => r,
1624                    None => return Ok(None),
1625                }
1626            } else {
1627                Row::new()
1628            };
1629            self.load(ctx, &base)?;
1630        }
1631        let rows = self.rows.as_ref().unwrap();
1632        if self.cursor < rows.len() {
1633            let val = rows[self.cursor].clone();
1634            self.cursor += 1;
1635            let mut row = Row::new();
1636            row.insert(self.var.clone(), val);
1637            Ok(Some(row))
1638        } else {
1639            Ok(None)
1640        }
1641    }
1642}
1643
1644struct ForeachOp {
1645    input: Box<dyn Operator>,
1646    var: String,
1647    list_expr: Expr,
1648    set_assignments: Vec<SetAssignment>,
1649    remove_items: Vec<RemoveSpec>,
1650}
1651
1652impl ForeachOp {
1653    fn new(
1654        input: Box<dyn Operator>,
1655        var: String,
1656        list_expr: Expr,
1657        set_assignments: Vec<SetAssignment>,
1658        remove_items: Vec<RemoveSpec>,
1659    ) -> Self {
1660        Self {
1661            input,
1662            var,
1663            list_expr,
1664            set_assignments,
1665            remove_items,
1666        }
1667    }
1668}
1669
1670impl Operator for ForeachOp {
1671    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1672        let Some(row) = self.input.next(ctx)? else {
1673            return Ok(None);
1674        };
1675        let ectx = ctx.eval_ctx(&row);
1676        let list_val = eval_expr(&self.list_expr, &ectx)?;
1677        let items = match list_val {
1678            Value::List(items) => items,
1679            Value::Property(Property::List(props)) => {
1680                props.into_iter().map(Value::Property).collect()
1681            }
1682            Value::Null | Value::Property(Property::Null) => Vec::new(),
1683            _ => return Err(Error::TypeMismatch),
1684        };
1685        for item in items {
1686            let mut scratch = row.clone();
1687            scratch.insert(self.var.clone(), item);
1688            for a in &self.set_assignments {
1689                match a {
1690                    SetAssignment::Property { var, key, value } => {
1691                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1692                        let prop = value_to_property(evaluated)?;
1693                        match scratch.get_mut(var) {
1694                            Some(Value::Node(n)) => {
1695                                n.properties.insert(key.clone(), prop);
1696                            }
1697                            Some(Value::Edge(e)) => {
1698                                e.properties.insert(key.clone(), prop);
1699                            }
1700                            _ => return Err(Error::UnboundVariable(var.clone())),
1701                        }
1702                    }
1703                    SetAssignment::Labels { var, labels } => {
1704                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1705                            for l in labels {
1706                                if !n.labels.contains(l) {
1707                                    n.labels.push(l.clone());
1708                                }
1709                            }
1710                        }
1711                    }
1712                    _ => {}
1713                }
1714            }
1715            for ri in &self.remove_items {
1716                match ri {
1717                    RemoveSpec::Property { var, key } => {
1718                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1719                            n.properties.remove(key);
1720                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1721                            e.properties.remove(key);
1722                        }
1723                    }
1724                    RemoveSpec::Labels { var, labels } => {
1725                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1726                            n.labels.retain(|l| !labels.contains(l));
1727                        }
1728                    }
1729                }
1730            }
1731            // Flush mutated entities for each iteration
1732            for (_, val) in scratch.iter() {
1733                match val {
1734                    Value::Node(n) => ctx.writer.put_node(n)?,
1735                    Value::Edge(e) => ctx.writer.put_edge(e)?,
1736                    _ => {}
1737                }
1738            }
1739        }
1740        Ok(Some(row))
1741    }
1742}
1743
1744struct SeedRowOp {
1745    done: bool,
1746}
1747
1748impl Operator for SeedRowOp {
1749    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1750        if self.done {
1751            return Ok(None);
1752        }
1753        self.done = true;
1754        Ok(Some(Row::new()))
1755    }
1756}
1757
1758struct SeededRowOp {
1759    row: Option<Row>,
1760}
1761
1762impl Operator for SeededRowOp {
1763    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1764        Ok(self.row.take())
1765    }
1766}
1767
1768struct CallSubqueryOp {
1769    input: Box<dyn Operator>,
1770    body_plan: LogicalPlan,
1771    pending: Vec<Row>,
1772    pending_idx: usize,
1773}
1774
1775impl CallSubqueryOp {
1776    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1777        Self {
1778            input,
1779            body_plan,
1780            pending: Vec::new(),
1781            pending_idx: 0,
1782        }
1783    }
1784}
1785
1786impl Operator for CallSubqueryOp {
1787    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1788        loop {
1789            if self.pending_idx < self.pending.len() {
1790                let row = self.pending[self.pending_idx].clone();
1791                self.pending_idx += 1;
1792                return Ok(Some(row));
1793            }
1794            let outer_row = match self.input.next(ctx)? {
1795                Some(r) => r,
1796                None => return Ok(None),
1797            };
1798            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1799            let mut results = Vec::new();
1800            while let Some(body_row) = body_op.next(ctx)? {
1801                let mut merged = outer_row.clone();
1802                for (k, v) in body_row {
1803                    merged.insert(k, v);
1804                }
1805                results.push(merged);
1806            }
1807            if results.is_empty() {
1808                continue;
1809            }
1810            self.pending = results;
1811            self.pending_idx = 0;
1812        }
1813    }
1814}
1815
1816/// Per-input-row left-join driver (see
1817/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
1818/// outer row with the row as its seed; forwards all emitted rows
1819/// merged with the outer bindings, and emits one null-fallback
1820/// row (outer bindings plus `null_vars` bound to Null) only when
1821/// the body produced zero rows for that input.
1822struct OptionalApplyOp {
1823    input: Box<dyn Operator>,
1824    body_plan: LogicalPlan,
1825    null_vars: Vec<String>,
1826    pending: Vec<Row>,
1827    pending_idx: usize,
1828}
1829
1830impl OptionalApplyOp {
1831    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1832        Self {
1833            input,
1834            body_plan,
1835            null_vars,
1836            pending: Vec::new(),
1837            pending_idx: 0,
1838        }
1839    }
1840}
1841
1842impl Operator for OptionalApplyOp {
1843    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1844        loop {
1845            if self.pending_idx < self.pending.len() {
1846                let row = self.pending[self.pending_idx].clone();
1847                self.pending_idx += 1;
1848                return Ok(Some(row));
1849            }
1850            let outer_row = match self.input.next(ctx)? {
1851                Some(r) => r,
1852                None => return Ok(None),
1853            };
1854            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1855            // Push the outer row onto `outer_rows` so Filter /
1856            // eval_expr inside the body can resolve outer
1857            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
1858            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1859            stacked.push(&outer_row);
1860            stacked.extend_from_slice(ctx.outer_rows);
1861            let inner_ctx = ExecCtx {
1862                store: ctx.store,
1863                writer: ctx.writer,
1864                params: ctx.params,
1865                procedures: ctx.procedures,
1866                outer_rows: &stacked,
1867                tombstones: ctx.tombstones,
1868            };
1869            let mut results = Vec::new();
1870            while let Some(body_row) = body_op.next(&inner_ctx)? {
1871                let mut merged = outer_row.clone();
1872                for (k, v) in body_row {
1873                    merged.insert(k, v);
1874                }
1875                results.push(merged);
1876            }
1877            if results.is_empty() {
1878                let mut fallback = outer_row;
1879                for v in &self.null_vars {
1880                    fallback.insert(v.clone(), Value::Null);
1881                }
1882                return Ok(Some(fallback));
1883            }
1884            self.pending = results;
1885            self.pending_idx = 0;
1886        }
1887    }
1888}
1889
1890/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
1891///
1892/// Resolves the procedure against the [`ProcedureRegistry`] at
1893/// `next()` time, validates arity / types / form (implicit args,
1894/// `YIELD *`, in-query YIELD requirement), then scans the registered
1895/// data-table for rows whose input cells match the call arguments.
1896/// Each matching row is projected according to the YIELD spec and
1897/// merged into the upstream row.
1898///
1899/// Two shapes are supported at once:
1900/// * Standalone (`input = None`): the op is the full plan. It
1901///   runs the procedure exactly once (against a synthetic empty row)
1902///   and emits the matching rows directly.
1903/// * In-query (`input = Some(...)`): for each upstream row, runs
1904///   the procedure with args evaluated against that row and emits
1905///   one merged row per match. Procedures with zero declared output
1906///   columns act as a pass-through — the upstream row is forwarded
1907///   unchanged, matching Neo4j's side-effect-only semantics.
1908struct ProcedureCallOp {
1909    input: Option<Box<dyn Operator>>,
1910    qualified_name: Vec<String>,
1911    args: Option<Vec<Expr>>,
1912    yield_spec: Option<YieldSpec>,
1913    standalone: bool,
1914    buffered: Vec<Row>,
1915    buffered_idx: usize,
1916    // Only set for the standalone form, which drives itself off a
1917    // synthetic seed row exactly once.
1918    done: bool,
1919}
1920
1921impl ProcedureCallOp {
1922    fn new(
1923        input: Option<Box<dyn Operator>>,
1924        qualified_name: Vec<String>,
1925        args: Option<Vec<Expr>>,
1926        yield_spec: Option<YieldSpec>,
1927        standalone: bool,
1928    ) -> Self {
1929        Self {
1930            input,
1931            qualified_name,
1932            args,
1933            yield_spec,
1934            standalone,
1935            buffered: Vec::new(),
1936            buffered_idx: 0,
1937            done: false,
1938        }
1939    }
1940
1941    /// Resolve the projection list `(source_column, output_alias)`
1942    /// from the procedure signature and this op's yield spec. Also
1943    /// validates the spec — unknown YIELD columns, duplicate
1944    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
1945    /// surface as `Error::Procedure`.
1946    fn resolve_projection(
1947        &self,
1948        proc: &crate::procedures::Procedure,
1949    ) -> Result<Vec<(String, String)>> {
1950        match &self.yield_spec {
1951            None => {
1952                if !self.standalone {
1953                    // In-query CALL with no YIELD: legal only for
1954                    // side-effect-only procedures (zero declared
1955                    // outputs). A procedure with outputs but no
1956                    // YIELD leaves those outputs unbound, so any
1957                    // downstream RETURN that references them would
1958                    // see `UndefinedVariable`; reject here so the
1959                    // error surfaces instead of silently emitting
1960                    // nulls.
1961                    if proc.outputs.is_empty() {
1962                        return Ok(Vec::new());
1963                    }
1964                    return Err(Error::Procedure(format!(
1965                        "procedure '{}' has outputs but no YIELD clause",
1966                        self.qualified_name.join(".")
1967                    )));
1968                }
1969                Ok(proc
1970                    .outputs
1971                    .iter()
1972                    .map(|o| (o.name.clone(), o.name.clone()))
1973                    .collect())
1974            }
1975            Some(YieldSpec::Star) => {
1976                if !self.standalone {
1977                    return Err(Error::Procedure(
1978                        "YIELD * is only allowed on standalone CALL".into(),
1979                    ));
1980                }
1981                Ok(proc
1982                    .outputs
1983                    .iter()
1984                    .map(|o| (o.name.clone(), o.name.clone()))
1985                    .collect())
1986            }
1987            Some(YieldSpec::Items(items)) => {
1988                let mut projection = Vec::with_capacity(items.len());
1989                let mut seen_aliases: std::collections::HashSet<String> =
1990                    std::collections::HashSet::new();
1991                for yi in items {
1992                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
1993                        return Err(Error::Procedure(format!(
1994                            "procedure '{}' has no output column '{}'",
1995                            self.qualified_name.join("."),
1996                            yi.column
1997                        )));
1998                    }
1999                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2000                    if !seen_aliases.insert(alias.clone()) {
2001                        return Err(Error::Procedure(format!(
2002                            "variable '{alias}' already bound by YIELD"
2003                        )));
2004                    }
2005                    projection.push((yi.column.clone(), alias));
2006                }
2007                Ok(projection)
2008            }
2009        }
2010    }
2011
2012    /// Evaluate the call's argument list against `row`. For the
2013    /// implicit-args form (`args = None`), each declared input
2014    /// column's value comes from the per-query parameter map
2015    /// (keyed by the input-column name). Returns the argument
2016    /// values in declaration order. Raises `ProcedureError` on
2017    /// arity mismatch, type mismatch, or missing parameter.
2018    fn evaluate_args(
2019        &self,
2020        ctx: &ExecCtx,
2021        row: &Row,
2022        proc: &crate::procedures::Procedure,
2023    ) -> Result<Vec<Value>> {
2024        match &self.args {
2025            Some(exprs) => {
2026                if exprs.len() != proc.inputs.len() {
2027                    return Err(Error::Procedure(format!(
2028                        "procedure '{}' expects {} argument(s), got {}",
2029                        self.qualified_name.join("."),
2030                        proc.inputs.len(),
2031                        exprs.len()
2032                    )));
2033                }
2034                let eval_ctx = ctx.eval_ctx(row);
2035                let mut values = Vec::with_capacity(exprs.len());
2036                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2037                    let v = eval_expr(expr, &eval_ctx)?;
2038                    if !spec.ty.accepts(&v) {
2039                        return Err(Error::Procedure(format!(
2040                            "argument '{}' has wrong type for procedure '{}'",
2041                            spec.name,
2042                            self.qualified_name.join(".")
2043                        )));
2044                    }
2045                    values.push(coerce_arg(v, spec.ty));
2046                }
2047                Ok(values)
2048            }
2049            None => {
2050                // Implicit-arg form only valid standalone.
2051                if !self.standalone {
2052                    return Err(Error::Procedure(
2053                        "in-query CALL requires explicit argument list".into(),
2054                    ));
2055                }
2056                let mut values = Vec::with_capacity(proc.inputs.len());
2057                for spec in &proc.inputs {
2058                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2059                        Error::Procedure(format!(
2060                            "missing parameter ${} for procedure '{}'",
2061                            spec.name,
2062                            self.qualified_name.join(".")
2063                        ))
2064                    })?;
2065                    if !spec.ty.accepts(&v) {
2066                        return Err(Error::Procedure(format!(
2067                            "parameter '{}' has wrong type",
2068                            spec.name
2069                        )));
2070                    }
2071                    values.push(coerce_arg(v, spec.ty));
2072                }
2073                Ok(values)
2074            }
2075        }
2076    }
2077
2078    /// Invoke the procedure once for `input_row` and emit zero or
2079    /// more output rows into `out`. Handles the "zero-output-column
2080    /// pass-through" case that keeps `MATCH (n) CALL test.doNothing()
2081    /// RETURN n.name` from filtering the match rows.
2082    fn invoke_once(
2083        &self,
2084        ctx: &ExecCtx,
2085        input_row: &Row,
2086        proc: &crate::procedures::Procedure,
2087        projection: &[(String, String)],
2088        out: &mut Vec<Row>,
2089    ) -> Result<()> {
2090        // Zero-output-column procedures are side-effect-only in
2091        // the TCK; they either suppress rows entirely (standalone)
2092        // or pass the input row through unchanged (in-query).
2093        if proc.outputs.is_empty() {
2094            if !self.standalone {
2095                out.push(input_row.clone());
2096            }
2097            return Ok(());
2098        }
2099        let args = self.evaluate_args(ctx, input_row, proc)?;
2100        let rows = proc.resolve_rows(ctx.store)?;
2101        for proc_row in &rows {
2102            if !proc.row_matches(proc_row, &args) {
2103                continue;
2104            }
2105            let mut merged = if self.standalone {
2106                Row::new()
2107            } else {
2108                input_row.clone()
2109            };
2110            for (src, alias) in projection {
2111                let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2112                merged.insert(alias.clone(), v);
2113            }
2114            out.push(merged);
2115        }
2116        Ok(())
2117    }
2118}
2119
2120/// Cast an int to a float when the declared type is `FLOAT`. Other
2121/// declared types leave the value as-is (accept() has already
2122/// gated on kind) so the comparison in `row_matches` sees a
2123/// consistent shape.
2124fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2125    use crate::procedures::ProcType;
2126    if matches!(ty, ProcType::Float) {
2127        if let Value::Property(Property::Int64(n)) = v {
2128            return Value::Property(Property::Float64(n as f64));
2129        }
2130    }
2131    v
2132}
2133
2134impl Operator for ProcedureCallOp {
2135    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2136        loop {
2137            if self.buffered_idx < self.buffered.len() {
2138                let row = self.buffered[self.buffered_idx].clone();
2139                self.buffered_idx += 1;
2140                return Ok(Some(row));
2141            }
2142            self.buffered.clear();
2143            self.buffered_idx = 0;
2144
2145            let proc = match ctx.procedures.get(&self.qualified_name) {
2146                Some(p) => p,
2147                None => {
2148                    return Err(Error::Procedure(format!(
2149                        "procedure '{}' not found",
2150                        self.qualified_name.join(".")
2151                    )));
2152                }
2153            };
2154            let projection = self.resolve_projection(proc)?;
2155
2156            let input_row = match &mut self.input {
2157                Some(inp) => match inp.next(ctx)? {
2158                    Some(r) => r,
2159                    None => return Ok(None),
2160                },
2161                None => {
2162                    if self.done {
2163                        return Ok(None);
2164                    }
2165                    self.done = true;
2166                    Row::new()
2167                }
2168            };
2169
2170            let mut produced = Vec::new();
2171            self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2172            if produced.is_empty() {
2173                if self.input.is_some() {
2174                    continue;
2175                }
2176                return Ok(None);
2177            }
2178            self.buffered = produced;
2179        }
2180    }
2181}
2182
2183/// Pull a property map out of a value. Supports node / edge
2184/// (uses their live property map) and map values (parameter or
2185/// map literal). Null propagates as an empty map — `SET x = null`
2186/// is spec'd as a property clear / no-op and SET = <null-binding>
2187/// (from an unmatched OPTIONAL MATCH) matches that shape.
2188fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2189    match v {
2190        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2191        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2192        Value::Map(pairs) => pairs
2193            .iter()
2194            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2195            .collect(),
2196        Value::Property(Property::Map(entries)) => Ok(entries
2197            .iter()
2198            .map(|(k, p)| (k.clone(), p.clone()))
2199            .collect()),
2200        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2201        _ => Err(Error::InvalidSetValue),
2202    }
2203}
2204
2205fn value_to_property(v: Value) -> Result<Property> {
2206    match v {
2207        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2208        Value::Property(p) => Ok(p),
2209        Value::Null => Ok(Property::Null),
2210        Value::List(items) => {
2211            let props: Vec<Property> = items
2212                .into_iter()
2213                .map(value_to_property)
2214                .collect::<Result<_>>()?;
2215            Ok(Property::List(props))
2216        }
2217        // Graph-aware `Value::Map` and graph elements can't be
2218        // stored as node / edge property values; SET will reject
2219        // them.
2220        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2221            Err(Error::InvalidSetValue)
2222        }
2223    }
2224}
2225
2226struct NodeScanAllOp {
2227    var: String,
2228    ids: Option<Vec<NodeId>>,
2229    cursor: usize,
2230}
2231
2232impl NodeScanAllOp {
2233    fn new(var: String) -> Self {
2234        Self {
2235            var,
2236            ids: None,
2237            cursor: 0,
2238        }
2239    }
2240}
2241
2242impl Operator for NodeScanAllOp {
2243    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2244        if self.ids.is_none() {
2245            self.ids = Some(ctx.store.all_node_ids()?);
2246        }
2247        let ids = self.ids.as_ref().unwrap();
2248        while self.cursor < ids.len() {
2249            let id = ids[self.cursor];
2250            self.cursor += 1;
2251            if let Some(node) = ctx.store.get_node(id)? {
2252                let mut row = Row::new();
2253                row.insert(self.var.clone(), Value::Node(node));
2254                return Ok(Some(row));
2255            }
2256        }
2257        Ok(None)
2258    }
2259}
2260
2261struct NodeScanByLabelsOp {
2262    var: String,
2263    labels: Vec<String>,
2264    ids: Option<Vec<NodeId>>,
2265    cursor: usize,
2266}
2267
2268impl NodeScanByLabelsOp {
2269    fn new(var: String, labels: Vec<String>) -> Self {
2270        Self {
2271            var,
2272            labels,
2273            ids: None,
2274            cursor: 0,
2275        }
2276    }
2277}
2278
2279impl Operator for NodeScanByLabelsOp {
2280    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2281        if self.ids.is_none() {
2282            // Use the first label for the index scan, filter the rest per-node.
2283            let primary = self
2284                .labels
2285                .first()
2286                .expect("NodeScanByLabels must have at least one label");
2287            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2288        }
2289        let ids = self.ids.as_ref().unwrap();
2290        while self.cursor < ids.len() {
2291            let id = ids[self.cursor];
2292            self.cursor += 1;
2293            if let Some(node) = ctx.store.get_node(id)? {
2294                if has_all_labels(&node, &self.labels) {
2295                    let mut row = Row::new();
2296                    row.insert(self.var.clone(), Value::Node(node));
2297                    return Ok(Some(row));
2298                }
2299            }
2300        }
2301        Ok(None)
2302    }
2303}
2304
2305fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2306    labels.iter().all(|l| node.labels.contains(l))
2307}
2308
2309/// Equality-lookup operator backed by a property index. Evaluates
2310/// the value expression lazily on the first `next()` — crucially
2311/// so parameters resolve against the per-query `ExecCtx::params`
2312/// map, not against a literal baked in at plan-construction time.
2313///
2314/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2315/// that slipped through) surface as `Error::InvalidSetValue`. The
2316/// planner only emits this op for indexes that do exist, so the
2317/// reader call should find a populated CF unless a concurrent DROP
2318/// raced us (in which case the result is just empty, which matches
2319/// how Neo4j's planner handles the race).
2320struct IndexSeekOp {
2321    var: String,
2322    label: String,
2323    property: String,
2324    value_expr: Expr,
2325    results: Option<Vec<NodeId>>,
2326    cursor: usize,
2327}
2328
2329impl IndexSeekOp {
2330    fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2331        Self {
2332            var,
2333            label,
2334            property,
2335            value_expr,
2336            results: None,
2337            cursor: 0,
2338        }
2339    }
2340}
2341
2342impl Operator for IndexSeekOp {
2343    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2344        if self.results.is_none() {
2345            let empty = Row::new();
2346            let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2347            let property = match value {
2348                Value::Property(p) => p,
2349                Value::Null => Property::Null,
2350                Value::Node(_)
2351                | Value::Edge(_)
2352                | Value::List(_)
2353                | Value::Map(_)
2354                | Value::Path { .. } => {
2355                    return Err(Error::InvalidSetValue);
2356                }
2357            };
2358            let ids = ctx
2359                .store
2360                .nodes_by_property(&self.label, &self.property, &property)?;
2361            self.results = Some(ids);
2362        }
2363        let ids = self.results.as_ref().unwrap();
2364        while self.cursor < ids.len() {
2365            let id = ids[self.cursor];
2366            self.cursor += 1;
2367            if let Some(node) = ctx.store.get_node(id)? {
2368                let mut row = Row::new();
2369                row.insert(self.var.clone(), Value::Node(node));
2370                return Ok(Some(row));
2371            }
2372        }
2373        Ok(None)
2374    }
2375}
2376
2377fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2378    props.iter().all(|(k, v)| {
2379        node.properties
2380            .get(k)
2381            .map(|stored| stored == v)
2382            .unwrap_or(false)
2383    })
2384}
2385
2386struct MergeNodeOp {
2387    var: String,
2388    labels: Vec<String>,
2389    /// Pattern property expressions as they came from the planner. These
2390    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
2391    /// isn't available until `next()` is called.
2392    properties: Vec<(String, Expr)>,
2393    /// `ON CREATE SET ...` assignments — applied only when the
2394    /// merge took the create branch. Evaluated against a row
2395    /// `{var → Node}` so the value expressions can reference the
2396    /// just-created node.
2397    on_create: Vec<SetAssignment>,
2398    /// `ON MATCH SET ...` assignments — applied to every matched
2399    /// node when the merge took the match branch. Same row shape
2400    /// as `on_create`.
2401    on_match: Vec<SetAssignment>,
2402    /// Optional upstream operator. `None` means this is a
2403    /// top-level producer (`MERGE (n) RETURN n`) and emits
2404    /// rows with a fresh empty base. `Some` means this is a
2405    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
2406    /// and each emitted row is a cross-join between an input
2407    /// row and a merge-result node.
2408    input: Option<Box<dyn Operator>>,
2409    /// Cached merge result. Populated on the first `next()`
2410    /// call by running the scan + maybe-create logic *once*,
2411    /// then reused for every input row. Running the merge
2412    /// exactly once sidesteps the read-after-write issue in
2413    /// buffered-writer mode (a node created by the first input
2414    /// row wouldn't be visible to a re-scan on the second).
2415    merged_nodes: Vec<Node>,
2416    /// Whether `merged_nodes` has been populated. The merge
2417    /// logic runs lazily on the first `next()` so
2418    /// `ExecCtx::params` is available.
2419    merge_done: bool,
2420    /// Current upstream row — held between calls while we drain
2421    /// `merged_nodes` against it. `None` for the top-level
2422    /// case (no input).
2423    current_input_row: Option<Row>,
2424    cursor: usize,
2425}
2426
2427impl MergeNodeOp {
2428    fn new(
2429        input: Option<Box<dyn Operator>>,
2430        var: String,
2431        labels: Vec<String>,
2432        properties: Vec<(String, Expr)>,
2433        on_create: Vec<SetAssignment>,
2434        on_match: Vec<SetAssignment>,
2435    ) -> Self {
2436        Self {
2437            var,
2438            labels,
2439            properties,
2440            on_create,
2441            on_match,
2442            input,
2443            merged_nodes: Vec::new(),
2444            merge_done: false,
2445            current_input_row: None,
2446            cursor: 0,
2447        }
2448    }
2449
2450    /// Run the MERGE logic exactly once: scan the store for
2451    /// existing matches, apply ON MATCH SET to each; or create
2452    /// a fresh node and apply ON CREATE SET; persist everything
2453    /// via `ctx.writer`; stash the resulting nodes in
2454    /// `self.merged_nodes`. Idempotent — subsequent calls are
2455    /// no-ops once `self.merge_done` is set.
2456    /// Resolve the pattern properties against `base`, scan the
2457    /// store, and either match existing nodes or create a fresh
2458    /// one. Returns the resulting node set — can be called
2459    /// multiple times with different `base` rows for
2460    /// input-driven merges.
2461    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2462        let resolved_props: Vec<(String, Property)> = self
2463            .properties
2464            .iter()
2465            .map(|(k, expr)| {
2466                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2467                Ok((k.clone(), value_to_property(v)?))
2468            })
2469            .collect::<Result<Vec<_>>>()?;
2470
2471        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2472            ctx.store.nodes_by_label(primary)?
2473        } else {
2474            ctx.store.all_node_ids()?
2475        };
2476        let mut merged_nodes: Vec<Node> = Vec::new();
2477        for id in candidate_ids {
2478            if let Some(node) = ctx.store.get_node(id)? {
2479                if has_all_labels(&node, &self.labels)
2480                    && matches_pattern_props(&node, &resolved_props)
2481                {
2482                    merged_nodes.push(node);
2483                }
2484            }
2485        }
2486
2487        if merged_nodes.is_empty() {
2488            let mut node = Node::new();
2489            for label in &self.labels {
2490                node.labels.push(label.clone());
2491            }
2492            for (k, prop) in resolved_props {
2493                node.properties.insert(k, prop);
2494            }
2495            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2496            ctx.writer.put_node(&node)?;
2497            merged_nodes.push(node);
2498        } else if !self.on_match.is_empty() {
2499            for node in merged_nodes.iter_mut() {
2500                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2501                ctx.writer.put_node(node)?;
2502            }
2503        }
2504        Ok(merged_nodes)
2505    }
2506}
2507
2508impl Operator for MergeNodeOp {
2509    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2510        // Top-level producer: no upstream context, so the pattern
2511        // properties can only reference literals / parameters.
2512        // Run the merge once against an empty row, then emit
2513        // each result.
2514        if self.input.is_none() {
2515            if !self.merge_done {
2516                let empty = Row::new();
2517                let nodes = self.run_merge_for(ctx, &empty)?;
2518                self.merged_nodes = nodes;
2519                self.merge_done = true;
2520            }
2521            if self.cursor < self.merged_nodes.len() {
2522                let node = self.merged_nodes[self.cursor].clone();
2523                self.cursor += 1;
2524                let mut row = Row::new();
2525                row.insert(self.var.clone(), Value::Node(node));
2526                return Ok(Some(row));
2527            }
2528            return Ok(None);
2529        }
2530
2531        // Input-driven case: evaluate pattern properties *per
2532        // input row* so references like `MERGE (:City {name:
2533        // person.bornIn})` resolve against the bound `person`.
2534        // Each incoming row produces its own merged-node set,
2535        // which is then cross-joined with that row before
2536        // emission.
2537        loop {
2538            if let Some(base) = self.current_input_row.as_ref() {
2539                if self.cursor < self.merged_nodes.len() {
2540                    let node = self.merged_nodes[self.cursor].clone();
2541                    self.cursor += 1;
2542                    let mut row = base.clone();
2543                    row.insert(self.var.clone(), Value::Node(node));
2544                    return Ok(Some(row));
2545                }
2546            }
2547            match self.input.as_mut().unwrap().next(ctx)? {
2548                None => return Ok(None),
2549                Some(row) => {
2550                    let nodes = self.run_merge_for(ctx, &row)?;
2551                    self.merged_nodes = nodes;
2552                    self.cursor = 0;
2553                    self.current_input_row = Some(row);
2554                }
2555            }
2556        }
2557    }
2558}
2559
2560/// Apply MERGE-conditional SET assignments (`ON CREATE` or
2561/// Find-or-create executor for edge MERGE
2562/// (`MERGE (a)-[r:KNOWS]->(b)`).
2563///
2564/// For every row pulled from `input`, looks up the `src_var`
2565/// and `dst_var` bindings (which must be `Value::Node` — the
2566/// planner enforces that they came from a prior MATCH or
2567/// MERGE), scans `src`'s outgoing edges, and either:
2568///
2569/// - Picks the first edge of type `edge_type` whose target is
2570///   `dst` and applies `on_match` to it, or
2571/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
2572///   `on_create`, and persists it via `ctx.writer.put_edge`.
2573///
2574/// Either way, the resulting edge is bound into `edge_var` in
2575/// the output row and the row is emitted. v1 restrictions:
2576/// single directed hop, both endpoints already bound,
2577/// explicit relationship type.
2578struct MergeEdgeOp {
2579    input: Box<dyn Operator>,
2580    edge_var: String,
2581    src_var: String,
2582    dst_var: String,
2583    edge_type: String,
2584    undirected: bool,
2585    /// Inline edge property filter from the MERGE pattern
2586    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
2587    /// the create branch stamps them onto the new edge.
2588    properties: Vec<(String, Expr)>,
2589    on_create: Vec<SetAssignment>,
2590    on_match: Vec<SetAssignment>,
2591    /// Rows buffered from the current input row's MERGE result —
2592    /// one per existing matched edge (or a single synthesized edge
2593    /// if the create branch fired). Drained before the next
2594    /// `input.next()` call so multi-match semantics stay correct:
2595    /// `MATCH (a:A),(b:B) MERGE (a)-[r:T]->(b)` against two pre-
2596    /// existing edges has to yield two rows, not one.
2597    pending: std::collections::VecDeque<Row>,
2598}
2599
2600impl MergeEdgeOp {
2601    #[allow(clippy::too_many_arguments)]
2602    fn new(
2603        input: Box<dyn Operator>,
2604        edge_var: String,
2605        src_var: String,
2606        dst_var: String,
2607        edge_type: String,
2608        undirected: bool,
2609        properties: Vec<(String, Expr)>,
2610        on_create: Vec<SetAssignment>,
2611        on_match: Vec<SetAssignment>,
2612    ) -> Self {
2613        Self {
2614            input,
2615            edge_var,
2616            src_var,
2617            dst_var,
2618            edge_type,
2619            undirected,
2620            properties,
2621            on_create,
2622            on_match,
2623            pending: std::collections::VecDeque::new(),
2624        }
2625    }
2626}
2627
2628impl Operator for MergeEdgeOp {
2629    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2630        loop {
2631            if let Some(row) = self.pending.pop_front() {
2632                return Ok(Some(row));
2633            }
2634            let Some(row) = self.input.next(ctx)? else {
2635                return Ok(None);
2636            };
2637            // Resolve src/dst. Both must be Value::Node — the
2638            // planner enforces that the variables came from an
2639            // earlier producer, so anything else is a bug or a
2640            // later-added feature that didn't update the check.
2641            let src_node = match row.get(&self.src_var) {
2642                Some(Value::Node(n)) => n.clone(),
2643                _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2644            };
2645            let dst_node = match row.get(&self.dst_var) {
2646                Some(Value::Node(n)) => n.clone(),
2647                _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2648            };
2649
2650            // Evaluate the inline edge property filter once per
2651            // input row. These are AST expressions so they can
2652            // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
2653            let required_props: Vec<(String, Property)> = self
2654                .properties
2655                .iter()
2656                .map(|(k, expr)| {
2657                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2658                    Ok((k.clone(), value_to_property(v)?))
2659                })
2660                .collect::<Result<Vec<_>>>()?;
2661            let edge_matches = |edge: &Edge| -> bool {
2662                required_props.iter().all(|(k, want)| {
2663                    edge.properties
2664                        .get(k)
2665                        .map(|have| have == want)
2666                        .unwrap_or(false)
2667                })
2668            };
2669
2670            // Collect every edge of type `edge_type` from src to
2671            // dst (and, for undirected patterns, dst to src) that
2672            // also satisfies the inline property filter. If any
2673            // exist we take the match branch and yield one row per
2674            // match. If none exist we synthesize one and yield a
2675            // single row from the create branch.
2676            let mut matched: Vec<Edge> = Vec::new();
2677            for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2678                if neighbor_id != dst_node.id {
2679                    continue;
2680                }
2681                if let Some(edge) = ctx.store.get_edge(edge_id)? {
2682                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
2683                        matched.push(edge);
2684                    }
2685                }
2686            }
2687            if self.undirected {
2688                for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2689                    if neighbor_id != dst_node.id {
2690                        continue;
2691                    }
2692                    if let Some(edge) = ctx.store.get_edge(edge_id)? {
2693                        if edge.edge_type == self.edge_type && edge_matches(&edge) {
2694                            matched.push(edge);
2695                        }
2696                    }
2697                }
2698            }
2699
2700            if matched.is_empty() {
2701                let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2702                for (k, p) in &required_props {
2703                    new_edge.properties.insert(k.clone(), p.clone());
2704                }
2705                let mut row_out = row.clone();
2706                apply_merge_edge_actions(
2707                    &mut new_edge,
2708                    &self.on_create,
2709                    &self.edge_var,
2710                    ctx,
2711                    &mut row_out,
2712                )?;
2713                ctx.writer.put_edge(&new_edge)?;
2714                row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2715                self.pending.push_back(row_out);
2716            } else {
2717                for mut existing in matched {
2718                    let mut row_out = row.clone();
2719                    if !self.on_match.is_empty() {
2720                        apply_merge_edge_actions(
2721                            &mut existing,
2722                            &self.on_match,
2723                            &self.edge_var,
2724                            ctx,
2725                            &mut row_out,
2726                        )?;
2727                        ctx.writer.put_edge(&existing)?;
2728                    }
2729                    row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2730                    self.pending.push_back(row_out);
2731                }
2732            }
2733        }
2734    }
2735}
2736
2737/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
2738/// each `SetAssignment` against the outer `row` (augmented with
2739/// the current edge binding) and mutates either the edge itself
2740/// or a non-edge target node from the outer row. MERGE's ON
2741/// CREATE / ON MATCH clauses are scoped against the whole input
2742/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
2743/// reach outside the edge-local binding. Non-edge mutations
2744/// are persisted via `ctx.writer.put_node` so the change is
2745/// visible to later clauses.
2746fn apply_merge_edge_actions(
2747    edge: &mut Edge,
2748    actions: &[SetAssignment],
2749    var: &str,
2750    exec_ctx: &ExecCtx,
2751    outer: &mut Row,
2752) -> Result<()> {
2753    if actions.is_empty() {
2754        return Ok(());
2755    }
2756    // Edge binding is live in `outer` while we evaluate — RHS can
2757    // reference both the edge and any sibling node binding.
2758    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2759    for action in actions {
2760        match action {
2761            SetAssignment::Property {
2762                var: target,
2763                key,
2764                value,
2765            } => {
2766                let sub_ctx = exec_ctx.eval_ctx(outer);
2767                let evaluated = eval_expr(value, &sub_ctx)?;
2768                let prop = value_to_property(evaluated)?;
2769                if target == var {
2770                    if matches!(prop, Property::Null) {
2771                        edge.properties.remove(key);
2772                    } else {
2773                        edge.properties.insert(key.clone(), prop);
2774                    }
2775                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2776                } else {
2777                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2778                }
2779            }
2780            SetAssignment::Merge {
2781                var: target,
2782                properties,
2783            } => {
2784                let sub_ctx = exec_ctx.eval_ctx(outer);
2785                let resolved: Vec<(String, Property)> = properties
2786                    .iter()
2787                    .map(|(k, expr)| {
2788                        let v = eval_expr(expr, &sub_ctx)?;
2789                        Ok((k.clone(), value_to_property(v)?))
2790                    })
2791                    .collect::<Result<Vec<_>>>()?;
2792                if target == var {
2793                    for (k, p) in resolved {
2794                        edge.properties.insert(k, p);
2795                    }
2796                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2797                } else {
2798                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2799                }
2800            }
2801            SetAssignment::Replace {
2802                var: target,
2803                properties,
2804            } => {
2805                let sub_ctx = exec_ctx.eval_ctx(outer);
2806                let resolved: Vec<(String, Property)> = properties
2807                    .iter()
2808                    .map(|(k, expr)| {
2809                        let v = eval_expr(expr, &sub_ctx)?;
2810                        Ok((k.clone(), value_to_property(v)?))
2811                    })
2812                    .collect::<Result<Vec<_>>>()?;
2813                if target == var {
2814                    edge.properties.clear();
2815                    for (k, p) in resolved {
2816                        edge.properties.insert(k, p);
2817                    }
2818                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2819                } else {
2820                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2821                }
2822            }
2823            SetAssignment::Labels {
2824                var: target,
2825                labels,
2826            } => {
2827                if target == var {
2828                    // Edges don't carry labels.
2829                    return Err(Error::UnboundVariable(target.clone()));
2830                }
2831                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2832            }
2833            SetAssignment::ReplaceFromExpr {
2834                var: target,
2835                source,
2836                replace,
2837            } => {
2838                let sub_ctx = exec_ctx.eval_ctx(outer);
2839                let v = eval_expr(source, &sub_ctx)?;
2840                let props = extract_property_map(&v)?;
2841                if target == var {
2842                    if *replace {
2843                        edge.properties.clear();
2844                    }
2845                    for (k, p) in props {
2846                        edge.properties.insert(k, p);
2847                    }
2848                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
2849                } else {
2850                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2851                }
2852            }
2853        }
2854    }
2855    Ok(())
2856}
2857
2858/// Apply a single `SET target.key = prop` to a node or edge bound
2859/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
2860/// the target isn't the merge edge itself (the common case being
2861/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
2862fn apply_set_prop_to_outer(
2863    outer: &mut Row,
2864    exec_ctx: &ExecCtx,
2865    target: &str,
2866    key: &str,
2867    prop: Property,
2868) -> Result<()> {
2869    match outer.get_mut(target) {
2870        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2871            // SET on a null target (typically an unmatched OPTIONAL
2872            // MATCH binding) is a silent no-op in openCypher.
2873            return Ok(());
2874        }
2875        Some(Value::Node(n)) => {
2876            if matches!(prop, Property::Null) {
2877                n.properties.remove(key);
2878            } else {
2879                n.properties.insert(key.to_string(), prop);
2880            }
2881            exec_ctx.writer.put_node(n)?;
2882        }
2883        Some(Value::Edge(e)) => {
2884            if matches!(prop, Property::Null) {
2885                e.properties.remove(key);
2886            } else {
2887                e.properties.insert(key.to_string(), prop);
2888            }
2889            exec_ctx.writer.put_edge(e)?;
2890        }
2891        _ => return Err(Error::UnboundVariable(target.to_string())),
2892    }
2893    Ok(())
2894}
2895
2896/// Apply a property-map assignment (`SET target = {..}` when
2897/// `replace`, or `SET target += {..}` when not) to a node or
2898/// edge bound in the outer row.
2899fn apply_set_map_to_outer(
2900    outer: &mut Row,
2901    exec_ctx: &ExecCtx,
2902    target: &str,
2903    props: Vec<(String, Property)>,
2904    replace: bool,
2905) -> Result<()> {
2906    match outer.get_mut(target) {
2907        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2908        Some(Value::Node(n)) => {
2909            if replace {
2910                n.properties.clear();
2911            }
2912            for (k, p) in props {
2913                if replace || !matches!(p, Property::Null) {
2914                    n.properties.insert(k, p);
2915                } else {
2916                    n.properties.remove(&k);
2917                }
2918            }
2919            exec_ctx.writer.put_node(n)?;
2920            Ok(())
2921        }
2922        Some(Value::Edge(e)) => {
2923            if replace {
2924                e.properties.clear();
2925            }
2926            for (k, p) in props {
2927                if replace || !matches!(p, Property::Null) {
2928                    e.properties.insert(k, p);
2929                } else {
2930                    e.properties.remove(&k);
2931                }
2932            }
2933            exec_ctx.writer.put_edge(e)?;
2934            Ok(())
2935        }
2936        _ => Err(Error::UnboundVariable(target.to_string())),
2937    }
2938}
2939
2940/// Apply a labels assignment to a node bound in the outer row.
2941fn apply_set_labels_to_outer(
2942    outer: &mut Row,
2943    exec_ctx: &ExecCtx,
2944    target: &str,
2945    labels: &[String],
2946) -> Result<()> {
2947    match outer.get_mut(target) {
2948        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2949        Some(Value::Node(n)) => {
2950            for label in labels {
2951                if !n.labels.contains(label) {
2952                    n.labels.push(label.clone());
2953                }
2954            }
2955            exec_ctx.writer.put_node(n)?;
2956            Ok(())
2957        }
2958        _ => Err(Error::UnboundVariable(target.to_string())),
2959    }
2960}
2961
2962/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
2963/// dispatch but specialized to a single bound variable so we
2964/// don't have to materialize a full row dispatcher.
2965///
2966/// Value expressions are evaluated against a temporary row
2967/// `{var → Node(node.clone())}` so the RHS can reference the
2968/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
2969/// works the same as if the SET ran in a SetPropertyOp pipeline.
2970fn apply_merge_actions(
2971    node: &mut Node,
2972    actions: &[SetAssignment],
2973    var: &str,
2974    exec_ctx: &ExecCtx,
2975    base_row: &Row,
2976) -> Result<()> {
2977    if actions.is_empty() {
2978        return Ok(());
2979    }
2980    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
2981    // can resolve `other` — then overlay the merged node under `var`.
2982    let mut row = base_row.clone();
2983    row.insert(var.to_string(), Value::Node(node.clone()));
2984    for action in actions {
2985        let sub_ctx = exec_ctx.eval_ctx(&row);
2986        match action {
2987            SetAssignment::Property {
2988                var: target,
2989                key,
2990                value,
2991            } => {
2992                if target != var {
2993                    return Err(Error::UnboundVariable(target.clone()));
2994                }
2995                let evaluated = eval_expr(value, &sub_ctx)?;
2996                let prop = value_to_property(evaluated)?;
2997                node.properties.insert(key.clone(), prop);
2998                row.insert(var.to_string(), Value::Node(node.clone()));
2999            }
3000            SetAssignment::Labels {
3001                var: target,
3002                labels,
3003            } => {
3004                if target != var {
3005                    return Err(Error::UnboundVariable(target.clone()));
3006                }
3007                for label in labels {
3008                    if !node.labels.contains(label) {
3009                        node.labels.push(label.clone());
3010                    }
3011                }
3012                row.insert(var.to_string(), Value::Node(node.clone()));
3013            }
3014            SetAssignment::Replace {
3015                var: target,
3016                properties,
3017            } => {
3018                if target != var {
3019                    return Err(Error::UnboundVariable(target.clone()));
3020                }
3021                let resolved: Vec<(String, Property)> = properties
3022                    .iter()
3023                    .map(|(k, expr)| {
3024                        let v = eval_expr(expr, &sub_ctx)?;
3025                        Ok((k.clone(), value_to_property(v)?))
3026                    })
3027                    .collect::<Result<Vec<_>>>()?;
3028                node.properties.clear();
3029                for (k, p) in resolved {
3030                    node.properties.insert(k, p);
3031                }
3032                row.insert(var.to_string(), Value::Node(node.clone()));
3033            }
3034            SetAssignment::Merge {
3035                var: target,
3036                properties,
3037            } => {
3038                if target != var {
3039                    return Err(Error::UnboundVariable(target.clone()));
3040                }
3041                let resolved: Vec<(String, Property)> = properties
3042                    .iter()
3043                    .map(|(k, expr)| {
3044                        let v = eval_expr(expr, &sub_ctx)?;
3045                        Ok((k.clone(), value_to_property(v)?))
3046                    })
3047                    .collect::<Result<Vec<_>>>()?;
3048                for (k, p) in resolved {
3049                    node.properties.insert(k, p);
3050                }
3051                row.insert(var.to_string(), Value::Node(node.clone()));
3052            }
3053            SetAssignment::ReplaceFromExpr {
3054                var: target,
3055                source,
3056                replace,
3057            } => {
3058                if target != var {
3059                    return Err(Error::UnboundVariable(target.clone()));
3060                }
3061                let v = eval_expr(source, &sub_ctx)?;
3062                let props = extract_property_map(&v)?;
3063                if *replace {
3064                    node.properties.clear();
3065                }
3066                for (k, p) in props {
3067                    node.properties.insert(k, p);
3068                }
3069                row.insert(var.to_string(), Value::Node(node.clone()));
3070            }
3071        }
3072    }
3073    Ok(())
3074}
3075
3076struct EdgeExpandOp {
3077    input: Box<dyn Operator>,
3078    src_var: String,
3079    edge_var: Option<String>,
3080    dst_var: String,
3081    dst_labels: Vec<String>,
3082    edge_properties: Vec<(String, Expr)>,
3083    edge_types: Vec<String>,
3084    direction: Direction,
3085    /// When set, only the specific edge whose id matches the
3086    /// row-bound value counts as a match. Used by fresh-scan
3087    /// MATCH patterns that reuse an edge variable from a prior
3088    /// clause so the new hop stays an existence check instead of
3089    /// rebinding and clobbering the outer edge.
3090    edge_constraint_var: Option<String>,
3091    current_row: Option<Row>,
3092    pending: Vec<(EdgeId, NodeId)>,
3093    pending_idx: usize,
3094}
3095
3096impl EdgeExpandOp {
3097    #[allow(clippy::too_many_arguments)]
3098    fn new(
3099        input: Box<dyn Operator>,
3100        src_var: String,
3101        edge_var: Option<String>,
3102        dst_var: String,
3103        dst_labels: Vec<String>,
3104        edge_properties: Vec<(String, Expr)>,
3105        edge_types: Vec<String>,
3106        direction: Direction,
3107        edge_constraint_var: Option<String>,
3108    ) -> Self {
3109        Self {
3110            input,
3111            src_var,
3112            edge_var,
3113            dst_var,
3114            dst_labels,
3115            edge_properties,
3116            edge_types,
3117            direction,
3118            edge_constraint_var,
3119            current_row: None,
3120            pending: Vec::new(),
3121            pending_idx: 0,
3122        }
3123    }
3124}
3125
3126impl Operator for EdgeExpandOp {
3127    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3128        loop {
3129            while self.pending_idx < self.pending.len() {
3130                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3131                self.pending_idx += 1;
3132
3133                let edge = match ctx.store.get_edge(edge_id)? {
3134                    Some(e) => e,
3135                    None => continue,
3136                };
3137                if !self.edge_types.is_empty()
3138                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3139                {
3140                    continue;
3141                }
3142                // Pre-bound edge constraint: only accept the edge
3143                // whose id matches the row-bound value. Falls
3144                // through to outer scopes so a fresh right-side
3145                // scan of a CartesianProduct can still see the
3146                // bound edge from the left side. Non-edge / null
3147                // bindings trigger no matches at all — the
3148                // expansion yields nothing for that input row.
3149                if let Some(constraint_var) = &self.edge_constraint_var {
3150                    let base = self
3151                        .current_row
3152                        .as_ref()
3153                        .expect("pending edges without source row");
3154                    let expected = match ctx.lookup_binding(base, constraint_var) {
3155                        Some(Value::Edge(e)) => Some(e.id),
3156                        _ => None,
3157                    };
3158                    match expected {
3159                        Some(id) if id != edge.id => continue,
3160                        None => continue,
3161                        _ => {}
3162                    }
3163                }
3164                // Inline edge property filter: every (key, value)
3165                // must match the traversed edge's property of the
3166                // same name via `=` equality. Missing keys fail the
3167                // check (matching Neo4j).
3168                if !self.edge_properties.is_empty() {
3169                    let base = self
3170                        .current_row
3171                        .as_ref()
3172                        .expect("pending edges without source row");
3173                    let ectx = ctx.eval_ctx(base);
3174                    let mut ok = true;
3175                    for (key, expr) in &self.edge_properties {
3176                        let expected = eval_expr(expr, &ectx)?;
3177                        let actual = match edge.properties.get(key) {
3178                            Some(v) => Value::Property(v.clone()),
3179                            None => {
3180                                ok = false;
3181                                break;
3182                            }
3183                        };
3184                        if !values_equal(&actual, &expected) {
3185                            ok = false;
3186                            break;
3187                        }
3188                    }
3189                    if !ok {
3190                        continue;
3191                    }
3192                }
3193
3194                let neighbor = match ctx.store.get_node(neighbor_id)? {
3195                    Some(n) => n,
3196                    None => continue,
3197                };
3198                if !has_all_labels(&neighbor, &self.dst_labels) {
3199                    continue;
3200                }
3201
3202                let base = self
3203                    .current_row
3204                    .as_ref()
3205                    .expect("pending edges without source row");
3206                let mut out = base.clone();
3207                if let Some(ev) = &self.edge_var {
3208                    out.insert(ev.clone(), Value::Edge(edge));
3209                }
3210                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3211                return Ok(Some(out));
3212            }
3213
3214            match self.input.next(ctx)? {
3215                None => return Ok(None),
3216                Some(row) => {
3217                    let src_id = match row.get(&self.src_var) {
3218                        Some(Value::Node(n)) => n.id,
3219                        // A null source (e.g. from OPTIONAL MATCH
3220                        // that matched nothing) drops the input
3221                        // row — `MATCH (a)-->(b)` against a null
3222                        // `a` is just empty, not an error.
3223                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3224                            continue
3225                        }
3226                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3227                    };
3228                    self.pending = match self.direction {
3229                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3230                        Direction::Incoming => ctx.store.incoming(src_id)?,
3231                        Direction::Both => {
3232                            // For undirected traversal, a self-loop
3233                            // appears once as outgoing and once as
3234                            // incoming. The Cypher spec visits each
3235                            // physical edge once, so dedupe by
3236                            // edge_id before emitting.
3237                            let mut all = ctx.store.outgoing(src_id)?;
3238                            let mut seen: std::collections::HashSet<EdgeId> =
3239                                all.iter().map(|(e, _)| *e).collect();
3240                            for (e, n) in ctx.store.incoming(src_id)? {
3241                                if seen.insert(e) {
3242                                    all.push((e, n));
3243                                }
3244                            }
3245                            all
3246                        }
3247                    };
3248                    self.pending_idx = 0;
3249                    self.current_row = Some(row);
3250                }
3251            }
3252        }
3253    }
3254}
3255
3256/// Left-join variant of [`EdgeExpandOp`]. For each input row,
3257/// expands the adjacency in the configured direction and
3258/// filters by `edge_type` / `dst_labels` — if **any** neighbor
3259/// survives the filters, emits rows exactly like
3260/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
3261/// that carries the input row's bindings plus `edge_var` /
3262/// `dst_var` set to `Value::Null`, preserving the input row in
3263/// the output stream. This is the left-outer-join semantics
3264/// OPTIONAL MATCH needs.
3265///
3266/// Tracks per-input-row whether any output was produced so the
3267/// fallback Null row is only emitted after the pending buffer
3268/// drains without yielding anything. The `yielded_for_current`
3269/// flag is reset whenever a new input row is pulled.
3270struct OptionalEdgeExpandOp {
3271    input: Box<dyn Operator>,
3272    src_var: String,
3273    edge_var: Option<String>,
3274    dst_var: String,
3275    dst_labels: Vec<String>,
3276    dst_properties: Vec<(String, Expr)>,
3277    edge_types: Vec<String>,
3278    direction: Direction,
3279    /// When set, edges whose target id differs from the node
3280    /// bound at this variable in the current row are skipped
3281    /// inside the expansion loop. A row whose outgoing edges all
3282    /// fail the constraint then triggers the same null-fallback
3283    /// as having no edges at all.
3284    dst_constraint_var: Option<String>,
3285    /// When set, the expansion only considers the single edge
3286    /// whose id matches the edge already bound at this row
3287    /// variable — used when the OPTIONAL MATCH pattern reuses
3288    /// an edge variable from a prior clause.
3289    edge_constraint_var: Option<String>,
3290    current_row: Option<Row>,
3291    pending: Vec<(EdgeId, NodeId)>,
3292    pending_idx: usize,
3293    yielded_for_current: bool,
3294}
3295
3296impl OptionalEdgeExpandOp {
3297    #[allow(clippy::too_many_arguments)]
3298    fn new(
3299        input: Box<dyn Operator>,
3300        src_var: String,
3301        edge_var: Option<String>,
3302        dst_var: String,
3303        dst_labels: Vec<String>,
3304        dst_properties: Vec<(String, Expr)>,
3305        edge_types: Vec<String>,
3306        direction: Direction,
3307        dst_constraint_var: Option<String>,
3308        edge_constraint_var: Option<String>,
3309    ) -> Self {
3310        Self {
3311            input,
3312            src_var,
3313            edge_var,
3314            dst_var,
3315            dst_labels,
3316            dst_properties,
3317            edge_types,
3318            direction,
3319            dst_constraint_var,
3320            edge_constraint_var,
3321            current_row: None,
3322            pending: Vec::new(),
3323            pending_idx: 0,
3324            yielded_for_current: false,
3325        }
3326    }
3327}
3328
3329impl Operator for OptionalEdgeExpandOp {
3330    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3331        loop {
3332            while self.pending_idx < self.pending.len() {
3333                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3334                self.pending_idx += 1;
3335
3336                let edge = match ctx.store.get_edge(edge_id)? {
3337                    Some(e) => e,
3338                    None => continue,
3339                };
3340                if !self.edge_types.is_empty()
3341                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3342                {
3343                    continue;
3344                }
3345                // Pre-bound edge constraint: only the specific
3346                // edge whose id matches the row-bound value
3347                // counts as a match. Falls through to outer
3348                // scopes so the constraint works for fresh
3349                // scans on the right side of a CartesianProduct.
3350                if let Some(constraint_var) = &self.edge_constraint_var {
3351                    let base = self
3352                        .current_row
3353                        .as_ref()
3354                        .expect("pending without source row");
3355                    let expected = match ctx.lookup_binding(base, constraint_var) {
3356                        Some(Value::Edge(e)) => Some(e.id),
3357                        _ => None,
3358                    };
3359                    match expected {
3360                        Some(id) if id != edge.id => continue,
3361                        None => continue,
3362                        _ => {}
3363                    }
3364                }
3365
3366                let neighbor = match ctx.store.get_node(neighbor_id)? {
3367                    Some(n) => n,
3368                    None => continue,
3369                };
3370                if !has_all_labels(&neighbor, &self.dst_labels) {
3371                    continue;
3372                }
3373                // Bound-endpoint constraint: when the declared
3374                // target is already bound in the row, only edges
3375                // that lead to that exact node count as a match.
3376                // Edges failing the constraint are silently
3377                // skipped — if every candidate fails, the
3378                // per-row left-join fallback below still fires.
3379                if let Some(constraint_var) = &self.dst_constraint_var {
3380                    let base = self
3381                        .current_row
3382                        .as_ref()
3383                        .expect("pending without source row");
3384                    let bound_id = match base.get(constraint_var) {
3385                        Some(Value::Node(n)) => Some(n.id),
3386                        Some(Value::Null)
3387                        | Some(Value::Property(meshdb_core::Property::Null))
3388                        | None => None,
3389                        _ => None,
3390                    };
3391                    match bound_id {
3392                        Some(id) if id != neighbor.id => continue,
3393                        None => continue,
3394                        _ => {}
3395                    }
3396                }
3397                if !self.dst_properties.is_empty() {
3398                    let base = self
3399                        .current_row
3400                        .as_ref()
3401                        .expect("pending without source row");
3402                    let ectx = ctx.eval_ctx(base);
3403                    let mut props_ok = true;
3404                    for (key, expr) in &self.dst_properties {
3405                        let expected = eval_expr(expr, &ectx)?;
3406                        let actual = neighbor
3407                            .properties
3408                            .get(key)
3409                            .cloned()
3410                            .map(Value::Property)
3411                            .unwrap_or(Value::Null);
3412                        if !values_equal(&expected, &actual) {
3413                            props_ok = false;
3414                            break;
3415                        }
3416                    }
3417                    if !props_ok {
3418                        continue;
3419                    }
3420                }
3421
3422                let base = self
3423                    .current_row
3424                    .as_ref()
3425                    .expect("pending edges without source row");
3426                let mut out = base.clone();
3427                if let Some(ev) = &self.edge_var {
3428                    out.insert(ev.clone(), Value::Edge(edge));
3429                }
3430                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3431                self.yielded_for_current = true;
3432                return Ok(Some(out));
3433            }
3434
3435            // Pending drained for the current row. If nothing was
3436            // yielded, emit the left-join fallback: preserve the
3437            // input row with the optional variables set to Null.
3438            //
3439            // Exception: when an edge / dst constraint names a
3440            // variable that was pre-bound in the input row, the
3441            // fallback must NOT clobber that variable — the
3442            // constraint turns the expansion into an existence
3443            // check and the outer value should survive through.
3444            if let Some(base) = self.current_row.take() {
3445                if !self.yielded_for_current {
3446                    let mut out = base;
3447                    if let Some(ev) = &self.edge_var {
3448                        let preserve = self
3449                            .edge_constraint_var
3450                            .as_ref()
3451                            .map(|c| c == ev)
3452                            .unwrap_or(false);
3453                        if !preserve {
3454                            out.insert(ev.clone(), Value::Null);
3455                        }
3456                    }
3457                    let preserve_dst = self
3458                        .dst_constraint_var
3459                        .as_ref()
3460                        .map(|c| c == &self.dst_var)
3461                        .unwrap_or(false);
3462                    if !preserve_dst {
3463                        out.insert(self.dst_var.clone(), Value::Null);
3464                    }
3465                    self.yielded_for_current = true;
3466                    return Ok(Some(out));
3467                }
3468            }
3469
3470            match self.input.next(ctx)? {
3471                None => return Ok(None),
3472                Some(row) => {
3473                    let src_id = match row.get(&self.src_var) {
3474                        Some(Value::Node(n)) => n.id,
3475                        // src_var is Null (because a prior
3476                        // OPTIONAL MATCH chained before this one
3477                        // Null-bound it). Skip adjacency entirely
3478                        // and fall through to the fallback Null
3479                        // row so downstream clauses see the
3480                        // preserved input.
3481                        Some(Value::Null) => {
3482                            self.pending = Vec::new();
3483                            self.pending_idx = 0;
3484                            self.yielded_for_current = false;
3485                            self.current_row = Some(row);
3486                            continue;
3487                        }
3488                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3489                    };
3490                    self.pending = match self.direction {
3491                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3492                        Direction::Incoming => ctx.store.incoming(src_id)?,
3493                        Direction::Both => {
3494                            // For undirected traversal, a self-loop
3495                            // appears once as outgoing and once as
3496                            // incoming. The Cypher spec visits each
3497                            // physical edge once, so dedupe by
3498                            // edge_id before emitting.
3499                            let mut all = ctx.store.outgoing(src_id)?;
3500                            let mut seen: std::collections::HashSet<EdgeId> =
3501                                all.iter().map(|(e, _)| *e).collect();
3502                            for (e, n) in ctx.store.incoming(src_id)? {
3503                                if seen.insert(e) {
3504                                    all.push((e, n));
3505                                }
3506                            }
3507                            all
3508                        }
3509                    };
3510                    self.pending_idx = 0;
3511                    self.yielded_for_current = false;
3512                    self.current_row = Some(row);
3513                }
3514            }
3515        }
3516    }
3517}
3518
3519struct VarLengthExpandOp {
3520    input: Box<dyn Operator>,
3521    src_var: String,
3522    edge_var: Option<String>,
3523    dst_var: String,
3524    dst_labels: Vec<String>,
3525    edge_types: Vec<String>,
3526    /// Per-edge property filter — every edge along the walked
3527    /// path must have these `(key, value)` pairs. Mirrors the
3528    /// inline filter on `EdgeExpandOp`; applied during DFS so
3529    /// failing edges prune the branch instead of generating
3530    /// wrong-length results.
3531    edge_properties: Vec<(String, Expr)>,
3532    direction: Direction,
3533    min_hops: u64,
3534    max_hops: u64,
3535    path_var: Option<String>,
3536    /// Per-row left-join mode: when `true`, an input row that
3537    /// produces no matching paths still emits one row with the
3538    /// expansion's output vars bound to Null. Set for
3539    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
3540    /// even when the path search is empty.
3541    optional: bool,
3542    /// When set, paths whose terminal node id differs from the
3543    /// node bound at this variable in the current row are
3544    /// filtered out before counting as a match. Combined with
3545    /// `optional`, an input row whose candidate paths all miss
3546    /// the bound target triggers the null-fallback instead of
3547    /// silently dropping.
3548    dst_constraint_var: Option<String>,
3549    /// Replay mode: read the walked edge sequence from the list
3550    /// already bound at this row variable instead of doing DFS.
3551    /// Used by openCypher's "walk a pre-bound edge list" form.
3552    bound_edge_list_var: Option<String>,
3553    /// Row variables whose bound edges (or edge lists) must not
3554    /// appear in the walked path — enforces openCypher's
3555    /// relationship-uniqueness rule across hops within the same
3556    /// MATCH pattern. Each entry resolves against the current row
3557    /// (falling through to outer-scope rows) at run time; the
3558    /// union of every referenced edge id becomes the DFS
3559    /// exclusion set.
3560    excluded_edge_vars: Vec<String>,
3561    current_row: Option<Row>,
3562    pending_paths: Vec<Vec<Edge>>,
3563    pending_node_paths: Vec<Vec<NodeId>>,
3564    pending_targets: Vec<NodeId>,
3565    pending_idx: usize,
3566}
3567
3568impl VarLengthExpandOp {
3569    #[allow(clippy::too_many_arguments)]
3570    fn new(
3571        input: Box<dyn Operator>,
3572        src_var: String,
3573        edge_var: Option<String>,
3574        dst_var: String,
3575        dst_labels: Vec<String>,
3576        edge_types: Vec<String>,
3577        edge_properties: Vec<(String, Expr)>,
3578        direction: Direction,
3579        min_hops: u64,
3580        max_hops: u64,
3581        path_var: Option<String>,
3582        optional: bool,
3583        dst_constraint_var: Option<String>,
3584        bound_edge_list_var: Option<String>,
3585        excluded_edge_vars: Vec<String>,
3586    ) -> Self {
3587        Self {
3588            input,
3589            src_var,
3590            edge_var,
3591            dst_var,
3592            dst_labels,
3593            edge_types,
3594            edge_properties,
3595            direction,
3596            min_hops,
3597            max_hops,
3598            path_var,
3599            optional,
3600            dst_constraint_var,
3601            bound_edge_list_var,
3602            excluded_edge_vars,
3603            current_row: None,
3604            pending_paths: Vec::new(),
3605            pending_node_paths: Vec::new(),
3606            pending_targets: Vec::new(),
3607            pending_idx: 0,
3608        }
3609    }
3610
3611    fn enumerate(
3612        &self,
3613        ctx: &ExecCtx,
3614        start: NodeId,
3615        input_row: &Row,
3616    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3617        let mut paths: Vec<Vec<Edge>> = Vec::new();
3618        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3619        let mut targets: Vec<NodeId> = Vec::new();
3620        let mut edge_buf: Vec<Edge> = Vec::new();
3621        let mut node_buf: Vec<NodeId> = vec![start];
3622        // Seed `used` with edges from outer-scope bindings the
3623        // pattern flags as exclusions (e.g. the other hops'
3624        // edge / edge-list vars). A fresh walk can't revisit an
3625        // edge that's already bound as a relationship variable
3626        // elsewhere in the MATCH — that's openCypher's
3627        // relationship-uniqueness rule.
3628        let mut used: HashSet<EdgeId> = HashSet::new();
3629        for var in &self.excluded_edge_vars {
3630            match ctx.lookup_binding(input_row, var) {
3631                Some(Value::Edge(e)) => {
3632                    used.insert(e.id);
3633                }
3634                Some(Value::List(items)) => {
3635                    for item in items {
3636                        if let Value::Edge(e) = item {
3637                            used.insert(e.id);
3638                        }
3639                    }
3640                }
3641                _ => {}
3642            }
3643        }
3644        // Evaluate edge-property expected values once per input row
3645        // — they may reference row bindings or `$`-parameters, but
3646        // don't vary per walked edge.
3647        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3648            Vec::new()
3649        } else {
3650            let ectx = ctx.eval_ctx(input_row);
3651            self.edge_properties
3652                .iter()
3653                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3654                .collect::<Result<Vec<_>>>()?
3655        };
3656        self.dfs(
3657            ctx,
3658            start,
3659            &expected_edge_props,
3660            &mut edge_buf,
3661            &mut node_buf,
3662            &mut used,
3663            &mut paths,
3664            &mut node_paths,
3665            &mut targets,
3666        )?;
3667        Ok((paths, node_paths, targets))
3668    }
3669
3670    #[allow(clippy::too_many_arguments)]
3671    fn dfs(
3672        &self,
3673        ctx: &ExecCtx,
3674        current_node: NodeId,
3675        expected_edge_props: &[(String, Value)],
3676        edge_buf: &mut Vec<Edge>,
3677        node_buf: &mut Vec<NodeId>,
3678        used: &mut HashSet<EdgeId>,
3679        out_paths: &mut Vec<Vec<Edge>>,
3680        out_node_paths: &mut Vec<Vec<NodeId>>,
3681        out_targets: &mut Vec<NodeId>,
3682    ) -> Result<()> {
3683        let depth = edge_buf.len() as u64;
3684
3685        if depth >= self.min_hops && depth <= self.max_hops {
3686            let terminal_ok = match ctx.store.get_node(current_node)? {
3687                Some(node) => has_all_labels(&node, &self.dst_labels),
3688                None => false,
3689            };
3690            if terminal_ok {
3691                out_paths.push(edge_buf.clone());
3692                out_node_paths.push(node_buf.clone());
3693                out_targets.push(current_node);
3694            }
3695        }
3696
3697        if depth >= self.max_hops {
3698            return Ok(());
3699        }
3700
3701        let neighbors = match self.direction {
3702            Direction::Outgoing => ctx.store.outgoing(current_node)?,
3703            Direction::Incoming => ctx.store.incoming(current_node)?,
3704            Direction::Both => {
3705                let mut all = ctx.store.outgoing(current_node)?;
3706                all.extend(ctx.store.incoming(current_node)?);
3707                all
3708            }
3709        };
3710
3711        for (eid, neighbor_id) in neighbors {
3712            if used.contains(&eid) {
3713                continue;
3714            }
3715            let edge = match ctx.store.get_edge(eid)? {
3716                Some(e) => e,
3717                None => continue,
3718            };
3719            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3720            {
3721                continue;
3722            }
3723            // Inline edge property filter: skip edges whose
3724            // properties don't equal the expected values computed
3725            // per input row. A missing property fails the check,
3726            // matching `EdgeExpandOp`.
3727            if !expected_edge_props.is_empty() {
3728                let mut ok = true;
3729                for (key, expected) in expected_edge_props {
3730                    let actual = match edge.properties.get(key) {
3731                        Some(v) => Value::Property(v.clone()),
3732                        None => {
3733                            ok = false;
3734                            break;
3735                        }
3736                    };
3737                    if !values_equal(&actual, expected) {
3738                        ok = false;
3739                        break;
3740                    }
3741                }
3742                if !ok {
3743                    continue;
3744                }
3745            }
3746            used.insert(eid);
3747            edge_buf.push(edge);
3748            node_buf.push(neighbor_id);
3749            self.dfs(
3750                ctx,
3751                neighbor_id,
3752                expected_edge_props,
3753                edge_buf,
3754                node_buf,
3755                used,
3756                out_paths,
3757                out_node_paths,
3758                out_targets,
3759            )?;
3760            edge_buf.pop();
3761            node_buf.pop();
3762            used.remove(&eid);
3763        }
3764
3765        Ok(())
3766    }
3767}
3768
3769/// Replay the edge sequence stored at `list_var` in `row` as a
3770/// var-length walk starting from `src_id`. Returns `(paths,
3771/// node_paths, targets)` in the same shape `enumerate` produces
3772/// — either one entry when the list reads as a valid connected
3773/// walk in `direction`, or empty when it doesn't.
3774///
3775/// Validation per step:
3776/// * the list element is a `Value::Edge`,
3777/// * the edge's optional type filter matches `edge_types`,
3778/// * the edge actually touches the current node and proceeds to
3779///   the other endpoint in the requested direction (undirected
3780///   accepts either).
3781///
3782/// A null / missing / non-list value, a null source, or any
3783/// failed step all produce an empty result — which, when the
3784/// caller is in optional mode, triggers the left-join null
3785/// fallback.
3786fn replay_edge_list(
3787    ctx: &ExecCtx,
3788    row: &Row,
3789    list_var: &str,
3790    src_id: Option<NodeId>,
3791    direction: Direction,
3792    edge_types: &[String],
3793) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3794    let start = match src_id {
3795        Some(id) => id,
3796        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3797    };
3798    let list = match ctx.lookup_binding(row, list_var) {
3799        Some(Value::List(items)) => items.clone(),
3800        Some(Value::Property(meshdb_core::Property::List(items))) => items
3801            .iter()
3802            .cloned()
3803            .map(Value::Property)
3804            .collect::<Vec<_>>(),
3805        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3806    };
3807    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3808    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3809    node_buf.push(start);
3810    let mut current = start;
3811    for item in list {
3812        let edge = match item {
3813            Value::Edge(e) => e,
3814            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3815        };
3816        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3817            return Ok((Vec::new(), Vec::new(), Vec::new()));
3818        }
3819        let next_node = match direction {
3820            Direction::Outgoing => {
3821                if edge.source != current {
3822                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3823                }
3824                edge.target
3825            }
3826            Direction::Incoming => {
3827                if edge.target != current {
3828                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3829                }
3830                edge.source
3831            }
3832            Direction::Both => {
3833                if edge.source == current {
3834                    edge.target
3835                } else if edge.target == current {
3836                    edge.source
3837                } else {
3838                    return Ok((Vec::new(), Vec::new(), Vec::new()));
3839                }
3840            }
3841        };
3842        // The stored edge shape should still exist in the graph;
3843        // a missing node mid-walk means the snapshot shifted and
3844        // the list is no longer valid.
3845        if ctx.store.get_node(next_node)?.is_none() {
3846            return Ok((Vec::new(), Vec::new(), Vec::new()));
3847        }
3848        edge_buf.push(edge);
3849        node_buf.push(next_node);
3850        current = next_node;
3851    }
3852    Ok((vec![edge_buf], vec![node_buf], vec![current]))
3853}
3854
3855impl Operator for VarLengthExpandOp {
3856    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3857        loop {
3858            while self.pending_idx < self.pending_paths.len() {
3859                let i = self.pending_idx;
3860                self.pending_idx += 1;
3861
3862                let target_id = self.pending_targets[i];
3863                let target = match ctx.store.get_node(target_id)? {
3864                    Some(n) => n,
3865                    None => continue,
3866                };
3867
3868                let base = self
3869                    .current_row
3870                    .as_ref()
3871                    .expect("pending without source row");
3872                let mut out = base.clone();
3873                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3874                if let Some(ev) = &self.edge_var {
3875                    let edges: Vec<Value> = self.pending_paths[i]
3876                        .iter()
3877                        .cloned()
3878                        .map(Value::Edge)
3879                        .collect();
3880                    out.insert(ev.clone(), Value::List(edges));
3881                }
3882                if let Some(pv) = &self.path_var {
3883                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3884                    for nid in &self.pending_node_paths[i] {
3885                        match ctx.store.get_node(*nid)? {
3886                            Some(n) => nodes.push(n),
3887                            None => continue,
3888                        }
3889                    }
3890                    let edges = self.pending_paths[i].clone();
3891                    out.insert(pv.clone(), Value::Path { nodes, edges });
3892                }
3893                return Ok(Some(out));
3894            }
3895
3896            match self.input.next(ctx)? {
3897                None => return Ok(None),
3898                Some(row) => {
3899                    let src_id = match row.get(&self.src_var) {
3900                        Some(Value::Node(n)) => Some(n.id),
3901                        // Null source → no paths. Same
3902                        // null-propagating semantics as
3903                        // `EdgeExpandOp`. In optional mode we
3904                        // still emit the left-join fallback;
3905                        // otherwise we just skip.
3906                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3907                            None
3908                        }
3909                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3910                    };
3911                    // Replay path: when the hop names an already-
3912                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
3913                    // `rs = [r1, r2]` from an earlier WITH), skip
3914                    // DFS entirely and verify the listed edges
3915                    // form a connected walk from `src_id` in the
3916                    // required direction. Produces at most one
3917                    // path — the exact list that was supplied.
3918                    let (mut paths, mut node_paths, mut targets) =
3919                        if let Some(list_var) = &self.bound_edge_list_var {
3920                            replay_edge_list(
3921                                ctx,
3922                                &row,
3923                                list_var,
3924                                src_id,
3925                                self.direction,
3926                                &self.edge_types,
3927                            )?
3928                        } else {
3929                            match src_id {
3930                                Some(id) => self.enumerate(ctx, id, &row)?,
3931                                None => (Vec::new(), Vec::new(), Vec::new()),
3932                            }
3933                        };
3934                    // Bound-endpoint constraint: drop paths whose
3935                    // terminal node doesn't match the node already
3936                    // bound at the constraint var. When combined
3937                    // with `optional`, an input whose paths are
3938                    // all filtered out still triggers the
3939                    // null-fallback below.
3940                    if let Some(constraint_var) = &self.dst_constraint_var {
3941                        let target_id = match row.get(constraint_var) {
3942                            Some(Value::Node(n)) => Some(n.id),
3943                            _ => None,
3944                        };
3945                        match target_id {
3946                            Some(id) => {
3947                                let mut kept_paths = Vec::new();
3948                                let mut kept_node_paths = Vec::new();
3949                                let mut kept_targets = Vec::new();
3950                                for ((p, np), t) in paths
3951                                    .drain(..)
3952                                    .zip(node_paths.drain(..))
3953                                    .zip(targets.drain(..))
3954                                {
3955                                    if t == id {
3956                                        kept_paths.push(p);
3957                                        kept_node_paths.push(np);
3958                                        kept_targets.push(t);
3959                                    }
3960                                }
3961                                paths = kept_paths;
3962                                node_paths = kept_node_paths;
3963                                targets = kept_targets;
3964                            }
3965                            None => {
3966                                paths.clear();
3967                                node_paths.clear();
3968                                targets.clear();
3969                            }
3970                        }
3971                    }
3972                    if paths.is_empty() && self.optional {
3973                        // Left-join fallback: emit one row with
3974                        // the expansion's output vars set to Null
3975                        // so the outer OPTIONAL MATCH preserves
3976                        // this input row.
3977                        let mut out = row;
3978                        if let Some(ev) = &self.edge_var {
3979                            out.insert(ev.clone(), Value::Null);
3980                        }
3981                        out.insert(self.dst_var.clone(), Value::Null);
3982                        if let Some(pv) = &self.path_var {
3983                            out.insert(pv.clone(), Value::Null);
3984                        }
3985                        return Ok(Some(out));
3986                    }
3987                    self.pending_paths = paths;
3988                    self.pending_node_paths = node_paths;
3989                    self.pending_targets = targets;
3990                    self.pending_idx = 0;
3991                    self.current_row = Some(row);
3992                }
3993            }
3994        }
3995    }
3996}
3997
3998struct FilterOp {
3999    input: Box<dyn Operator>,
4000    predicate: Expr,
4001}
4002
4003impl FilterOp {
4004    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4005        Self { input, predicate }
4006    }
4007}
4008
4009impl Operator for FilterOp {
4010    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4011        while let Some(row) = self.input.next(ctx)? {
4012            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4013                Ok(v) => v,
4014                // Type mismatches and non-boolean errors in filter predicates
4015                // are treated as false (row filtered out), not hard errors
4016                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4017                Err(e) => return Err(e),
4018            };
4019            if to_bool(&v).unwrap_or(false) {
4020                return Ok(Some(row));
4021            }
4022        }
4023        Ok(None)
4024    }
4025}
4026
4027/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
4028/// row from the input unchanged.
4029struct IdentityOp {
4030    input: Box<dyn Operator>,
4031}
4032
4033impl IdentityOp {
4034    fn new(input: Box<dyn Operator>) -> Self {
4035        Self { input }
4036    }
4037}
4038
4039impl Operator for IdentityOp {
4040    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4041        self.input.next(ctx)
4042    }
4043}
4044
4045/// Passes input rows through; if the input produces zero rows,
4046/// emits exactly one row with `null_vars` bound to Value::Null.
4047/// Implements standalone OPTIONAL MATCH semantics (e.g.
4048/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
4049/// row with n=null rather than the empty result set).
4050struct CoalesceNullRowOp {
4051    input: Box<dyn Operator>,
4052    null_vars: Vec<String>,
4053    produced_any: bool,
4054    done: bool,
4055}
4056
4057impl CoalesceNullRowOp {
4058    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
4059        Self {
4060            input,
4061            null_vars,
4062            produced_any: false,
4063            done: false,
4064        }
4065    }
4066}
4067
4068impl Operator for CoalesceNullRowOp {
4069    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4070        if self.done {
4071            return Ok(None);
4072        }
4073        match self.input.next(ctx)? {
4074            Some(row) => {
4075                self.produced_any = true;
4076                Ok(Some(row))
4077            }
4078            None => {
4079                self.done = true;
4080                if self.produced_any {
4081                    Ok(None)
4082                } else {
4083                    let mut row = Row::new();
4084                    for v in &self.null_vars {
4085                        row.insert(v.clone(), Value::Null);
4086                    }
4087                    Ok(Some(row))
4088                }
4089            }
4090        }
4091    }
4092}
4093
4094struct ProjectOp {
4095    input: Box<dyn Operator>,
4096    items: Vec<ReturnItem>,
4097}
4098
4099impl ProjectOp {
4100    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4101        Self { input, items }
4102    }
4103}
4104
4105impl Operator for ProjectOp {
4106    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4107        match self.input.next(ctx)? {
4108            Some(row) => {
4109                let mut out = Row::new();
4110                for (i, item) in self.items.iter().enumerate() {
4111                    let name = item.alias.clone().unwrap_or_else(|| {
4112                        item.raw_text
4113                            .clone()
4114                            .unwrap_or_else(|| default_name(&item.expr, i))
4115                    });
4116                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4117                    out.insert(name, value);
4118                }
4119                Ok(Some(out))
4120            }
4121            None => Ok(None),
4122        }
4123    }
4124}
4125
4126fn default_name(expr: &Expr, idx: usize) -> String {
4127    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4128}
4129
4130fn render_expr_name(expr: &Expr) -> Option<String> {
4131    Some(match expr {
4132        Expr::Identifier(s) => s.clone(),
4133        Expr::Property { var, key } => format!("{var}.{key}"),
4134        Expr::PropertyAccess { base, key } => {
4135            // Match the source syntax's parenthesisation of a
4136            // bracketed base: `(list[1]).k` round-trips, while a
4137            // plain identifier base stays bare (`a.b`).
4138            if matches!(
4139                base.as_ref(),
4140                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4141            ) {
4142                format!("({}).{key}", render_expr_name(base)?)
4143            } else {
4144                format!("{}.{key}", render_expr_name(base)?)
4145            }
4146        }
4147        Expr::Parameter(name) => format!("${name}"),
4148        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4149        Expr::Literal(Literal::Integer(i)) => i.to_string(),
4150        Expr::Literal(Literal::Float(f)) => f.to_string(),
4151        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4152        Expr::Literal(Literal::Null) => "NULL".into(),
4153        Expr::Call { name, args } => {
4154            let arg_str = match args {
4155                CallArgs::Star => "*".into(),
4156                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4157                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4158                        "DISTINCT "
4159                    } else {
4160                        ""
4161                    };
4162                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4163                    if inner.len() != es.len() {
4164                        return None;
4165                    }
4166                    format!("{prefix}{}", inner.join(", "))
4167                }
4168            };
4169            format!("{name}({arg_str})")
4170        }
4171        Expr::BinaryOp { op, left, right } => {
4172            let op_str = match op {
4173                BinaryOp::Add => " + ",
4174                BinaryOp::Sub => " - ",
4175                BinaryOp::Mul => " * ",
4176                BinaryOp::Div => " / ",
4177                BinaryOp::Mod => " % ",
4178                BinaryOp::Pow => " ^ ",
4179            };
4180            format!(
4181                "{}{op_str}{}",
4182                render_expr_name(left)?,
4183                render_expr_name(right)?
4184            )
4185        }
4186        Expr::UnaryOp { op, operand } => {
4187            let op_str = match op {
4188                UnaryOp::Neg => "-",
4189            };
4190            format!("{op_str}{}", render_expr_name(operand)?)
4191        }
4192        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4193        Expr::IsNull { negated, inner } => {
4194            if *negated {
4195                format!("{} IS NOT NULL", render_expr_name(inner)?)
4196            } else {
4197                format!("{} IS NULL", render_expr_name(inner)?)
4198            }
4199        }
4200        Expr::Compare { op, left, right } => {
4201            let op_str = match op {
4202                CompareOp::Eq => " = ",
4203                CompareOp::Ne => " <> ",
4204                CompareOp::Lt => " < ",
4205                CompareOp::Le => " <= ",
4206                CompareOp::Gt => " > ",
4207                CompareOp::Ge => " >= ",
4208                CompareOp::StartsWith => " STARTS WITH ",
4209                CompareOp::EndsWith => " ENDS WITH ",
4210                CompareOp::Contains => " CONTAINS ",
4211                CompareOp::RegexMatch => " =~ ",
4212            };
4213            format!(
4214                "{}{op_str}{}",
4215                render_expr_name(left)?,
4216                render_expr_name(right)?
4217            )
4218        }
4219        Expr::List(items) => {
4220            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4221            if inner.len() != items.len() {
4222                return None;
4223            }
4224            format!("[{}]", inner.join(", "))
4225        }
4226        Expr::Map(entries) => {
4227            let inner: Vec<String> = entries
4228                .iter()
4229                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4230                .collect::<Option<Vec<_>>>()?;
4231            format!("{{{}}}", inner.join(", "))
4232        }
4233        Expr::IndexAccess { base, index } => {
4234            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4235        }
4236        Expr::InList { element, list } => {
4237            format!(
4238                "{} IN {}",
4239                render_expr_name(element)?,
4240                render_expr_name(list)?
4241            )
4242        }
4243        Expr::HasLabels { expr, labels } => {
4244            let mut s = format!("({}", render_expr_name(expr)?);
4245            for l in labels {
4246                s.push(':');
4247                s.push_str(l);
4248            }
4249            s.push(')');
4250            s
4251        }
4252        _ => return None,
4253    })
4254}
4255
4256struct DistinctOp {
4257    input: Box<dyn Operator>,
4258    seen: HashSet<String>,
4259}
4260
4261impl DistinctOp {
4262    fn new(input: Box<dyn Operator>) -> Self {
4263        Self {
4264            input,
4265            seen: HashSet::new(),
4266        }
4267    }
4268}
4269
4270impl Operator for DistinctOp {
4271    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4272        while let Some(row) = self.input.next(ctx)? {
4273            let key = row_key(&row);
4274            if self.seen.insert(key) {
4275                return Ok(Some(row));
4276            }
4277        }
4278        Ok(None)
4279    }
4280}
4281
4282/// Assemble a `Value::Path` from a row's already-bound node and
4283/// edge variables. Emitted by `plan_pattern` when the source
4284/// pattern carries `path_var`. The operator pulls `node_vars[i]`
4285/// and `edge_vars[i]` out of every row, walks them in order, and
4286/// inserts the result into the row under `path_var` before
4287/// forwarding downstream.
4288///
4289/// Rows where any referenced variable is missing or not the
4290/// expected Node/Edge shape get a `Value::Null` at `path_var` —
4291/// matching how `OPTIONAL MATCH` flows null bindings through the
4292/// downstream projection without hard-erroring. Missing bindings
4293/// shouldn't normally happen (the planner fills in synthetic
4294/// names via `ensure_path_bindings`), but the null fallback
4295/// means an unexpected upstream shape degrades gracefully
4296/// instead of crashing the whole query.
4297struct BindPathOp {
4298    input: Box<dyn Operator>,
4299    path_var: String,
4300    node_vars: Vec<String>,
4301    edge_vars: Vec<String>,
4302}
4303
4304impl BindPathOp {
4305    fn new(
4306        input: Box<dyn Operator>,
4307        path_var: String,
4308        node_vars: Vec<String>,
4309        edge_vars: Vec<String>,
4310    ) -> Self {
4311        Self {
4312            input,
4313            path_var,
4314            node_vars,
4315            edge_vars,
4316        }
4317    }
4318}
4319
4320impl Operator for BindPathOp {
4321    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4322        let Some(mut row) = self.input.next(ctx)? else {
4323            return Ok(None);
4324        };
4325        // Extract the ordered node + edge sequence from the row.
4326        // Any missing or wrong-shaped entry collapses the whole
4327        // path binding to null and falls through.
4328        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4329        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4330        let mut abort = false;
4331        // Interleave node/edge vars. For each hop i:
4332        //   node_vars[i] = start/intermediate node
4333        //   edge_vars[i] = edge (or sub-path for var-length)
4334        //   node_vars[i+1] = target node
4335        // For var-length hops, edge_vars[i] may contain a
4336        // Value::Path — splice its interior into the running path.
4337        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4338            nodes.push(n.clone());
4339        } else {
4340            abort = true;
4341        }
4342        if !abort {
4343            for (i, ev) in self.edge_vars.iter().enumerate() {
4344                match row.get(ev) {
4345                    Some(Value::Edge(e)) => {
4346                        edges.push(e.clone());
4347                        match row.get(&self.node_vars[i + 1]) {
4348                            Some(Value::Node(n)) => nodes.push(n.clone()),
4349                            _ => {
4350                                abort = true;
4351                                break;
4352                            }
4353                        }
4354                    }
4355                    Some(Value::Path {
4356                        nodes: sub_nodes,
4357                        edges: sub_edges,
4358                    }) => {
4359                        // Splice the sub-path. The sub-path's first
4360                        // node is the same as nodes.last() (already
4361                        // pushed), so skip it. All sub-edges go in.
4362                        // Sub-path interior nodes go in. The sub-path's
4363                        // last node is the target for this hop.
4364                        edges.extend(sub_edges.iter().cloned());
4365                        if sub_nodes.len() > 1 {
4366                            nodes.extend(sub_nodes[1..].iter().cloned());
4367                        }
4368                    }
4369                    _ => {
4370                        abort = true;
4371                        break;
4372                    }
4373                }
4374            }
4375        }
4376        if abort {
4377            row.insert(self.path_var.clone(), Value::Null);
4378        } else {
4379            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4380        }
4381        Ok(Some(row))
4382    }
4383}
4384
4385/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
4386/// row (which must already bind both `src_var` and `dst_var`
4387/// to `Value::Node`), runs a breadth-first search from the
4388/// source node toward the target, filtering edges by
4389/// `edge_type` and walking up to `max_hops` steps. Emits one
4390/// row per successful search with `path_var` set to a
4391/// `Value::Path` carrying the traversed node/edge sequence;
4392/// rows where BFS finds no path are dropped entirely (matching
4393/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
4394///
4395/// The BFS uses classic parent-pointer reconstruction: each
4396/// visited node's `(parent_node_id, edge_id)` pair is stored
4397/// in a hashmap, and once the target is reached we walk back
4398/// to the source, reversing the accumulated edges to produce
4399/// a forward-order path. Cycle detection is a side-effect of
4400/// the visited set — the first time BFS sees a node is
4401/// necessarily via a shortest path, so later visits are
4402/// ignored.
4403struct ShortestPathOp {
4404    input: Box<dyn Operator>,
4405    src_var: String,
4406    dst_var: String,
4407    path_var: String,
4408    edge_types: Vec<String>,
4409    direction: meshdb_cypher::Direction,
4410    max_hops: u64,
4411    kind: meshdb_cypher::ShortestKind,
4412    /// Buffered `(base_row, path)` pairs from the current
4413    /// input row, used only by `AllShortest` mode where a
4414    /// single input can expand into multiple shortest paths
4415    /// of the same length. Each call to `next` drains one
4416    /// entry before pulling a fresh input row; in `Shortest`
4417    /// mode the buffer is always empty or has one element.
4418    pending: std::collections::VecDeque<(Row, Value)>,
4419}
4420
4421impl ShortestPathOp {
4422    #[allow(clippy::too_many_arguments)]
4423    fn new(
4424        input: Box<dyn Operator>,
4425        src_var: String,
4426        dst_var: String,
4427        path_var: String,
4428        edge_types: Vec<String>,
4429        direction: meshdb_cypher::Direction,
4430        max_hops: u64,
4431        kind: meshdb_cypher::ShortestKind,
4432    ) -> Self {
4433        Self {
4434            input,
4435            src_var,
4436            dst_var,
4437            path_var,
4438            edge_types,
4439            direction,
4440            max_hops,
4441            kind,
4442            pending: std::collections::VecDeque::new(),
4443        }
4444    }
4445}
4446
4447impl Operator for ShortestPathOp {
4448    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4449        loop {
4450            // Drain buffered paths from a prior input row
4451            // before pulling the next one. Only reachable in
4452            // `AllShortest` mode; `Shortest` mode's buffer
4453            // never accumulates more than one entry.
4454            if let Some((mut row, path)) = self.pending.pop_front() {
4455                row.insert(self.path_var.clone(), path);
4456                return Ok(Some(row));
4457            }
4458            let Some(row) = self.input.next(ctx)? else {
4459                return Ok(None);
4460            };
4461            let src = match row.get(&self.src_var) {
4462                Some(Value::Node(n)) => n.clone(),
4463                _ => continue,
4464            };
4465            let dst = match row.get(&self.dst_var) {
4466                Some(Value::Node(n)) => n.clone(),
4467                _ => continue,
4468            };
4469            let paths = bfs_shortest_paths(
4470                &src,
4471                &dst,
4472                &self.edge_types,
4473                self.direction,
4474                self.max_hops,
4475                self.kind,
4476                ctx.store,
4477            )?;
4478            if paths.is_empty() {
4479                // No path of length ≤ max_hops — drop the row.
4480                continue;
4481            }
4482            for path in paths {
4483                self.pending.push_back((row.clone(), path));
4484            }
4485        }
4486    }
4487}
4488
4489/// Layered breadth-first search from `src` toward `dst`,
4490/// constrained by edge type, direction, and max hop count.
4491/// Returns every path of minimum length when `kind` is
4492/// `AllShortest`, or just the first discovered shortest path
4493/// when `kind` is `Shortest`. An empty vector means no path
4494/// of length ≤ `max_hops` exists.
4495///
4496/// `src == dst` is a zero-hop special case that always
4497/// returns a singleton path regardless of `max_hops`.
4498///
4499/// The algorithm builds a parent DAG as it expands each
4500/// level: every node that was first reached at depth `d+1`
4501/// records every `(parent, edge)` pair from frontier[d] that
4502/// reaches it, not just the first one. This lets the
4503/// reconstruction walk enumerate all source-to-target paths
4504/// of the discovered shortest length. For `Shortest` mode
4505/// the reconstruction short-circuits on the first complete
4506/// path.
4507fn bfs_shortest_paths(
4508    src: &Node,
4509    dst: &Node,
4510    edge_types: &[String],
4511    direction: meshdb_cypher::Direction,
4512    max_hops: u64,
4513    kind: meshdb_cypher::ShortestKind,
4514    reader: &dyn crate::reader::GraphReader,
4515) -> Result<Vec<Value>> {
4516    use meshdb_cypher::Direction;
4517
4518    if src.id == dst.id {
4519        return Ok(vec![Value::Path {
4520            nodes: vec![src.clone()],
4521            edges: vec![],
4522        }]);
4523    }
4524
4525    // `dist` tracks the shortest distance from `src` to each
4526    // reached node. `parents` maps each reached node id to
4527    // every `(parent_id, edge_id)` pair that reached it at
4528    // that shortest distance — a node may have multiple
4529    // parents in the parent DAG for `AllShortest`.
4530    let mut dist: HashMap<NodeId, u64> = HashMap::new();
4531    dist.insert(src.id, 0);
4532    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4533
4534    let mut frontier: Vec<NodeId> = vec![src.id];
4535    let mut depth: u64 = 0;
4536    let mut found = false;
4537
4538    while !frontier.is_empty() && depth < max_hops && !found {
4539        let mut next_frontier: Vec<NodeId> = Vec::new();
4540        for node_id in &frontier {
4541            let neighbors = match direction {
4542                Direction::Outgoing => reader.outgoing(*node_id)?,
4543                Direction::Incoming => reader.incoming(*node_id)?,
4544                Direction::Both => {
4545                    let mut out = reader.outgoing(*node_id)?;
4546                    out.extend(reader.incoming(*node_id)?);
4547                    out
4548                }
4549            };
4550            for (edge_id, neighbor_id) in neighbors {
4551                // Edge-type filter. Only fetch the edge record
4552                // when a type constraint is present.
4553                if !edge_types.is_empty() {
4554                    let edge = match reader.get_edge(edge_id)? {
4555                        Some(e) => e,
4556                        None => continue,
4557                    };
4558                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
4559                        continue;
4560                    }
4561                }
4562                match dist.get(&neighbor_id) {
4563                    Some(&d) if d == depth + 1 => {
4564                        // Alternate parent at the same
4565                        // shortest-path level — record the
4566                        // additional edge so the reconstruction
4567                        // can enumerate it, but don't re-add to
4568                        // the next frontier.
4569                        parents
4570                            .entry(neighbor_id)
4571                            .or_default()
4572                            .push((*node_id, edge_id));
4573                    }
4574                    Some(_) => {
4575                        // Already reached at a strictly shorter
4576                        // depth; this edge isn't on a shortest
4577                        // path.
4578                    }
4579                    None => {
4580                        dist.insert(neighbor_id, depth + 1);
4581                        parents
4582                            .entry(neighbor_id)
4583                            .or_default()
4584                            .push((*node_id, edge_id));
4585                        if neighbor_id == dst.id {
4586                            found = true;
4587                        } else {
4588                            next_frontier.push(neighbor_id);
4589                        }
4590                    }
4591                }
4592            }
4593        }
4594        depth += 1;
4595        if !found {
4596            frontier = next_frontier;
4597        }
4598    }
4599
4600    if !found {
4601        return Ok(Vec::new());
4602    }
4603
4604    // Enumerate all shortest paths from the parent DAG.
4605    // `Shortest` mode short-circuits after the first complete
4606    // walk; `AllShortest` collects every distinct path.
4607    let mut out: Vec<Value> = Vec::new();
4608    let mut nodes_rev: Vec<Node> = Vec::new();
4609    let mut edges_rev: Vec<Edge> = Vec::new();
4610    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4611    collect_shortest_paths(
4612        src,
4613        dst,
4614        &parents,
4615        reader,
4616        &mut nodes_rev,
4617        &mut edges_rev,
4618        &mut out,
4619        only_first,
4620    )?;
4621    Ok(out)
4622}
4623
4624/// Depth-first walk through the parent DAG built by
4625/// `bfs_shortest_paths`, collecting every source-to-target
4626/// path of the BFS-determined shortest length. The recursion
4627/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
4628/// representing the partially-reconstructed path in reverse
4629/// order; on reaching `src` we copy the reversed accumulators
4630/// into a forward-order `Value::Path` and push it into `out`.
4631///
4632/// `only_first = true` short-circuits after the first
4633/// complete path, giving `Shortest` mode the optimal-case
4634/// early exit.
4635#[allow(clippy::too_many_arguments)]
4636fn collect_shortest_paths(
4637    src: &Node,
4638    current: &Node,
4639    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4640    reader: &dyn crate::reader::GraphReader,
4641    nodes_rev: &mut Vec<Node>,
4642    edges_rev: &mut Vec<Edge>,
4643    out: &mut Vec<Value>,
4644    only_first: bool,
4645) -> Result<()> {
4646    if current.id == src.id {
4647        // Complete walk: `nodes_rev` holds dst, ..., (last
4648        // node before src) in reverse; prepend src and
4649        // reverse to produce the forward-order node list.
4650        // Edges are similarly reversed.
4651        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4652        nodes.push(src.clone());
4653        nodes.extend(nodes_rev.iter().rev().cloned());
4654        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4655        out.push(Value::Path { nodes, edges });
4656        return Ok(());
4657    }
4658    let Some(parent_edges) = parents.get(&current.id) else {
4659        // Unreachable in normal flow — BFS only inserts dst
4660        // into `parents` when it found at least one incoming
4661        // edge — but handled defensively.
4662        return Ok(());
4663    };
4664    for (parent_id, edge_id) in parent_edges {
4665        if only_first && !out.is_empty() {
4666            return Ok(());
4667        }
4668        let edge = reader
4669            .get_edge(*edge_id)?
4670            .expect("BFS inserted this edge id; it must still exist");
4671        let parent_node = reader
4672            .get_node(*parent_id)?
4673            .expect("BFS visited this node id; it must still exist");
4674        nodes_rev.push(current.clone());
4675        edges_rev.push(edge);
4676        collect_shortest_paths(
4677            src,
4678            &parent_node,
4679            parents,
4680            reader,
4681            nodes_rev,
4682            edges_rev,
4683            out,
4684            only_first,
4685        )?;
4686        nodes_rev.pop();
4687        edges_rev.pop();
4688    }
4689    Ok(())
4690}
4691
4692/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
4693/// its rows through. For plain `UNION` (`all = false`) we
4694/// deduplicate across the combined stream using the same
4695/// `row_key` the `DistinctOp` uses, so the semantics match
4696/// Neo4j's "set union" shape regardless of whether duplicates
4697/// sit inside a single branch or straddle branches. For
4698/// `UNION ALL` the `seen` set stays `None` and every produced
4699/// row is forwarded as-is.
4700struct UnionOp {
4701    branches: Vec<Box<dyn Operator>>,
4702    current: usize,
4703    seen: Option<HashSet<String>>,
4704}
4705
4706impl UnionOp {
4707    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4708        Self {
4709            branches,
4710            current: 0,
4711            seen: if all { None } else { Some(HashSet::new()) },
4712        }
4713    }
4714}
4715
4716impl Operator for UnionOp {
4717    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4718        while self.current < self.branches.len() {
4719            match self.branches[self.current].next(ctx)? {
4720                Some(row) => {
4721                    if let Some(seen) = self.seen.as_mut() {
4722                        let key = row_key(&row);
4723                        if !seen.insert(key) {
4724                            continue;
4725                        }
4726                    }
4727                    return Ok(Some(row));
4728                }
4729                None => {
4730                    self.current += 1;
4731                }
4732            }
4733        }
4734        Ok(None)
4735    }
4736}
4737
4738struct OrderByOp {
4739    input: Box<dyn Operator>,
4740    sort_items: Vec<SortItem>,
4741    sorted: Option<Vec<Row>>,
4742    cursor: usize,
4743}
4744
4745impl OrderByOp {
4746    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4747        Self {
4748            input,
4749            sort_items,
4750            sorted: None,
4751            cursor: 0,
4752        }
4753    }
4754}
4755
4756impl Operator for OrderByOp {
4757    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4758        if self.sorted.is_none() {
4759            let mut rows: Vec<Row> = Vec::new();
4760            while let Some(row) = self.input.next(ctx)? {
4761                rows.push(row);
4762            }
4763            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4764            for row in rows {
4765                let mut keys = Vec::with_capacity(self.sort_items.len());
4766                for item in &self.sort_items {
4767                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4768                }
4769                keyed.push((keys, row));
4770            }
4771            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4772            keyed.sort_by(|a, b| {
4773                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4774                    let ord = compare_values(va, vb);
4775                    let ord = if descs[i] { ord.reverse() } else { ord };
4776                    if ord != Ordering::Equal {
4777                        return ord;
4778                    }
4779                }
4780                Ordering::Equal
4781            });
4782            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4783        }
4784        let rows = self.sorted.as_ref().unwrap();
4785        if self.cursor < rows.len() {
4786            let row = rows[self.cursor].clone();
4787            self.cursor += 1;
4788            Ok(Some(row))
4789        } else {
4790            Ok(None)
4791        }
4792    }
4793}
4794
4795struct AggregateOp {
4796    input: Box<dyn Operator>,
4797    group_keys: Vec<ReturnItem>,
4798    aggregates: Vec<AggregateSpec>,
4799    results: Option<Vec<Row>>,
4800    cursor: usize,
4801}
4802
4803impl AggregateOp {
4804    fn new(
4805        input: Box<dyn Operator>,
4806        group_keys: Vec<ReturnItem>,
4807        aggregates: Vec<AggregateSpec>,
4808    ) -> Self {
4809        Self {
4810            input,
4811            group_keys,
4812            aggregates,
4813            results: None,
4814            cursor: 0,
4815        }
4816    }
4817
4818    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4819        let mut groups: HashMap<String, GroupState> = HashMap::new();
4820        let mut order: Vec<String> = Vec::new();
4821
4822        // If there are no input rows AND no group keys, we still emit one row
4823        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
4824        let mut saw_any = false;
4825
4826        while let Some(row) = self.input.next(ctx)? {
4827            saw_any = true;
4828            let mut key_values = Vec::with_capacity(self.group_keys.len());
4829            for item in &self.group_keys {
4830                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4831            }
4832            let mut hash_key = String::new();
4833            for v in &key_values {
4834                hash_key.push_str(&value_key(v));
4835                hash_key.push('|');
4836            }
4837            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4838                order.push(hash_key.clone());
4839                GroupState {
4840                    key_values: key_values.clone(),
4841                    agg_states: self
4842                        .aggregates
4843                        .iter()
4844                        .map(|a| AggState::initial(a.function))
4845                        .collect(),
4846                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4847                }
4848            });
4849            for (i, spec) in self.aggregates.iter().enumerate() {
4850                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4851                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4852                    if matches!(v, Value::Null) {
4853                        continue;
4854                    }
4855                    let key = value_key(&v);
4856                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4857                    if !seen.insert(key) {
4858                        continue;
4859                    }
4860                }
4861                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4862                // The percentile is a constant expression — evaluate
4863                // it once against the first row we see and stash it
4864                // in the state so finalize() has a number to use.
4865                if let Some(extra_expr) = &spec.extra_arg {
4866                    let need_resolve = matches!(
4867                        &entry.agg_states[i],
4868                        AggState::PercentileDisc {
4869                            percentile: None,
4870                            ..
4871                        } | AggState::PercentileCont {
4872                            percentile: None,
4873                            ..
4874                        }
4875                    );
4876                    if need_resolve {
4877                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4878                        let p = match pv {
4879                            Value::Property(Property::Float64(f)) => f,
4880                            Value::Property(Property::Int64(i)) => i as f64,
4881                            _ => 0.0,
4882                        };
4883                        // openCypher requires the percentile to
4884                        // lie in [0.0, 1.0]; anything else is an
4885                        // `ArgumentError: NumberOutOfRange`.
4886                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
4887                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
4888                        }
4889                        match &mut entry.agg_states[i] {
4890                            AggState::PercentileDisc { percentile, .. }
4891                            | AggState::PercentileCont { percentile, .. } => {
4892                                *percentile = Some(p);
4893                            }
4894                            _ => {}
4895                        }
4896                    }
4897                }
4898            }
4899        }
4900
4901        let mut out = Vec::new();
4902        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4903            // Empty group, single aggregate row
4904            let mut row = Row::new();
4905            for spec in &self.aggregates {
4906                row.insert(
4907                    spec.alias.clone(),
4908                    AggState::initial(spec.function).finalize(),
4909                );
4910            }
4911            out.push(row);
4912        } else {
4913            for key in order {
4914                let state = groups.remove(&key).unwrap();
4915                let mut row = Row::new();
4916                for (i, item) in self.group_keys.iter().enumerate() {
4917                    let name = item
4918                        .alias
4919                        .clone()
4920                        .unwrap_or_else(|| default_name(&item.expr, i));
4921                    row.insert(name, state.key_values[i].clone());
4922                }
4923                for (i, spec) in self.aggregates.iter().enumerate() {
4924                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4925                }
4926                out.push(row);
4927            }
4928        }
4929        self.results = Some(out);
4930        Ok(())
4931    }
4932}
4933
4934impl Operator for AggregateOp {
4935    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4936        if self.results.is_none() {
4937            self.compute(ctx)?;
4938        }
4939        let rows = self.results.as_ref().unwrap();
4940        if self.cursor < rows.len() {
4941            let row = rows[self.cursor].clone();
4942            self.cursor += 1;
4943            Ok(Some(row))
4944        } else {
4945            Ok(None)
4946        }
4947    }
4948}
4949
4950struct GroupState {
4951    key_values: Vec<Value>,
4952    agg_states: Vec<AggState>,
4953    distinct_seen: Vec<Option<HashSet<String>>>,
4954}
4955
4956enum AggState {
4957    Count(i64),
4958    Sum {
4959        int_part: i64,
4960        float_part: f64,
4961        is_float: bool,
4962    },
4963    Avg {
4964        total: f64,
4965        count: i64,
4966    },
4967    Min(Option<Value>),
4968    Max(Option<Value>),
4969    Collect(Vec<Value>),
4970    StDev {
4971        sum: f64,
4972        sum_sq: f64,
4973        count: i64,
4974    },
4975    StDevP {
4976        sum: f64,
4977        sum_sq: f64,
4978        count: i64,
4979    },
4980    PercentileDisc {
4981        items: Vec<Value>,
4982        percentile: Option<f64>,
4983    },
4984    PercentileCont {
4985        items: Vec<Value>,
4986        percentile: Option<f64>,
4987    },
4988}
4989
4990impl AggState {
4991    fn initial(func: AggregateFn) -> Self {
4992        match func {
4993            AggregateFn::Count => AggState::Count(0),
4994            AggregateFn::Sum => AggState::Sum {
4995                int_part: 0,
4996                float_part: 0.0,
4997                is_float: false,
4998            },
4999            AggregateFn::Avg => AggState::Avg {
5000                total: 0.0,
5001                count: 0,
5002            },
5003            AggregateFn::Min => AggState::Min(None),
5004            AggregateFn::Max => AggState::Max(None),
5005            AggregateFn::Collect => AggState::Collect(Vec::new()),
5006            AggregateFn::StDev => AggState::StDev {
5007                sum: 0.0,
5008                sum_sq: 0.0,
5009                count: 0,
5010            },
5011            AggregateFn::StDevP => AggState::StDevP {
5012                sum: 0.0,
5013                sum_sq: 0.0,
5014                count: 0,
5015            },
5016            AggregateFn::PercentileDisc => AggState::PercentileDisc {
5017                items: Vec::new(),
5018                percentile: None,
5019            },
5020            AggregateFn::PercentileCont => AggState::PercentileCont {
5021                items: Vec::new(),
5022                percentile: None,
5023            },
5024        }
5025    }
5026
5027    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
5028        match self {
5029            AggState::Count(c) => match arg {
5030                AggregateArg::Star => *c += 1,
5031                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
5032                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
5033                        *c += 1;
5034                    }
5035                }
5036            },
5037            AggState::Sum {
5038                int_part,
5039                float_part,
5040                is_float,
5041            } => {
5042                let v = expr_arg_value(arg, ctx)?;
5043                match v {
5044                    Value::Null => {}
5045                    Value::Property(Property::Int64(i)) => *int_part += i,
5046                    Value::Property(Property::Float64(f)) => {
5047                        *float_part += f;
5048                        *is_float = true;
5049                    }
5050                    _ => return Err(Error::AggregateTypeError),
5051                }
5052            }
5053            AggState::Avg { total, count } => {
5054                let v = expr_arg_value(arg, ctx)?;
5055                match v {
5056                    Value::Null => {}
5057                    Value::Property(Property::Int64(i)) => {
5058                        *total += i as f64;
5059                        *count += 1;
5060                    }
5061                    Value::Property(Property::Float64(f)) => {
5062                        *total += f;
5063                        *count += 1;
5064                    }
5065                    _ => return Err(Error::AggregateTypeError),
5066                }
5067            }
5068            AggState::Min(slot) => {
5069                // `min` / `max` ignore null inputs and operate
5070                // over any `Value` (including lists, nodes etc.),
5071                // not just scalar Properties. The ordering is
5072                // `compare_values`: within a single type the
5073                // natural order, across types the
5074                // `type_order_value` rank.
5075                let v = expr_arg_value(arg, ctx)?;
5076                if matches!(v, Value::Null | Value::Property(Property::Null)) {
5077                    // skip
5078                } else {
5079                    match slot {
5080                        None => *slot = Some(v),
5081                        Some(cur) => {
5082                            if compare_values(&v, cur) == Ordering::Less {
5083                                *cur = v;
5084                            }
5085                        }
5086                    }
5087                }
5088            }
5089            AggState::Max(slot) => {
5090                let v = expr_arg_value(arg, ctx)?;
5091                if matches!(v, Value::Null | Value::Property(Property::Null)) {
5092                    // skip
5093                } else {
5094                    match slot {
5095                        None => *slot = Some(v),
5096                        Some(cur) => {
5097                            if compare_values(&v, cur) == Ordering::Greater {
5098                                *cur = v;
5099                            }
5100                        }
5101                    }
5102                }
5103            }
5104            AggState::Collect(items) => {
5105                let v = expr_arg_value(arg, ctx)?;
5106                if !matches!(v, Value::Null) {
5107                    items.push(v);
5108                }
5109            }
5110            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5111                let v = expr_arg_value(arg, ctx)?;
5112                if !matches!(v, Value::Null) {
5113                    items.push(v);
5114                }
5115            }
5116            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5117                let v = expr_arg_value(arg, ctx)?;
5118                match v {
5119                    Value::Null => {}
5120                    Value::Property(Property::Int64(i)) => {
5121                        let f = i as f64;
5122                        *sum += f;
5123                        *sum_sq += f * f;
5124                        *count += 1;
5125                    }
5126                    Value::Property(Property::Float64(f)) => {
5127                        *sum += f;
5128                        *sum_sq += f * f;
5129                        *count += 1;
5130                    }
5131                    _ => return Err(Error::AggregateTypeError),
5132                }
5133            }
5134        }
5135        Ok(())
5136    }
5137
5138    fn finalize(&self) -> Value {
5139        match self {
5140            AggState::Count(c) => Value::Property(Property::Int64(*c)),
5141            AggState::Sum {
5142                int_part,
5143                float_part,
5144                is_float,
5145            } => {
5146                if *is_float {
5147                    Value::Property(Property::Float64(*float_part + *int_part as f64))
5148                } else {
5149                    Value::Property(Property::Int64(*int_part))
5150                }
5151            }
5152            AggState::Avg { total, count } => {
5153                if *count == 0 {
5154                    Value::Null
5155                } else {
5156                    Value::Property(Property::Float64(*total / *count as f64))
5157                }
5158            }
5159            AggState::Min(slot) | AggState::Max(slot) => match slot {
5160                Some(v) => v.clone(),
5161                None => Value::Null,
5162            },
5163            AggState::Collect(items) => Value::List(items.clone()),
5164            AggState::StDevP { sum, sum_sq, count } => {
5165                if *count == 0 {
5166                    Value::Property(Property::Float64(0.0))
5167                } else {
5168                    let n = *count as f64;
5169                    let variance = *sum_sq / n - (*sum / n).powi(2);
5170                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5171                }
5172            }
5173            AggState::StDev { sum, sum_sq, count } => {
5174                if *count < 2 {
5175                    Value::Property(Property::Float64(0.0))
5176                } else {
5177                    let n = *count as f64;
5178                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5179                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5180                }
5181            }
5182            AggState::PercentileDisc { items, percentile } => {
5183                percentile_disc(items, percentile.unwrap_or(0.0))
5184            }
5185            AggState::PercentileCont { items, percentile } => {
5186                percentile_cont(items, percentile.unwrap_or(0.0))
5187            }
5188        }
5189    }
5190}
5191
5192fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5193    match arg {
5194        AggregateArg::Star => Err(Error::AggregateTypeError),
5195        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5196    }
5197}
5198
5199/// Coerce a collected aggregate value into an f64 for percentile
5200/// math. Unhandled types fall back to NaN; the caller sorts NaN
5201/// out of the stream before computing the percentile.
5202fn value_to_f64(v: &Value) -> f64 {
5203    match v {
5204        Value::Property(Property::Int64(i)) => *i as f64,
5205        Value::Property(Property::Float64(f)) => *f,
5206        _ => f64::NAN,
5207    }
5208}
5209
5210/// `percentileDisc(expr, p)` — discrete percentile. Returns the
5211/// smallest value at or above the `p`-ranked position. Numbers only;
5212/// non-numeric values get sorted to the end and are effectively
5213/// ignored unless the percentile lands on one.
5214fn percentile_disc(items: &[Value], p: f64) -> Value {
5215    let mut nums: Vec<(f64, Value)> = items
5216        .iter()
5217        .map(|v| (value_to_f64(v), v.clone()))
5218        .filter(|(f, _)| !f.is_nan())
5219        .collect();
5220    if nums.is_empty() {
5221        return Value::Null;
5222    }
5223    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5224    let p = p.clamp(0.0, 1.0);
5225    let n = nums.len();
5226    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
5227    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5228    nums[idx.min(n - 1)].1.clone()
5229}
5230
5231/// `percentileCont(expr, p)` — continuous percentile. Linearly
5232/// interpolates between the two ranks that bracket the fractional
5233/// position `p * (n - 1)`. Returns a Float64.
5234fn percentile_cont(items: &[Value], p: f64) -> Value {
5235    let mut nums: Vec<f64> = items
5236        .iter()
5237        .map(value_to_f64)
5238        .filter(|f| !f.is_nan())
5239        .collect();
5240    if nums.is_empty() {
5241        return Value::Null;
5242    }
5243    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5244    let p = p.clamp(0.0, 1.0);
5245    let n = nums.len();
5246    if n == 1 {
5247        return Value::Property(Property::Float64(nums[0]));
5248    }
5249    let pos = p * (n as f64 - 1.0);
5250    let lo = pos.floor() as usize;
5251    let hi = pos.ceil() as usize;
5252    let frac = pos - lo as f64;
5253    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5254    Value::Property(Property::Float64(v))
5255}
5256
5257struct SkipOp {
5258    input: Box<dyn Operator>,
5259    count_expr: Expr,
5260    remaining: Option<i64>,
5261}
5262
5263impl SkipOp {
5264    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5265        Self {
5266            input,
5267            count_expr,
5268            remaining: None,
5269        }
5270    }
5271}
5272
5273impl Operator for SkipOp {
5274    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5275        if self.remaining.is_none() {
5276            let empty = Row::new();
5277            let ectx = ctx.eval_ctx(&empty);
5278            let val = eval_expr(&self.count_expr, &ectx)?;
5279            self.remaining = Some(expr_to_count(val)?);
5280        }
5281        let rem = self.remaining.as_mut().unwrap();
5282        while *rem > 0 {
5283            if self.input.next(ctx)?.is_none() {
5284                return Ok(None);
5285            }
5286            *rem -= 1;
5287        }
5288        self.input.next(ctx)
5289    }
5290}
5291
5292struct LimitOp {
5293    input: Box<dyn Operator>,
5294    count_expr: Expr,
5295    remaining: Option<i64>,
5296}
5297
5298impl LimitOp {
5299    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5300        Self {
5301            input,
5302            count_expr,
5303            remaining: None,
5304        }
5305    }
5306}
5307
5308impl Operator for LimitOp {
5309    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5310        if self.remaining.is_none() {
5311            let empty = Row::new();
5312            let ectx = ctx.eval_ctx(&empty);
5313            let val = eval_expr(&self.count_expr, &ectx)?;
5314            self.remaining = Some(expr_to_count(val)?);
5315        }
5316        let rem = self.remaining.as_mut().unwrap();
5317        if *rem <= 0 {
5318            return Ok(None);
5319        }
5320        match self.input.next(ctx)? {
5321            Some(row) => {
5322                *rem -= 1;
5323                Ok(Some(row))
5324            }
5325            None => Ok(None),
5326        }
5327    }
5328}
5329
5330fn expr_to_count(val: Value) -> Result<i64> {
5331    match val {
5332        Value::Null | Value::Property(Property::Null) => Ok(0),
5333        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5334        // openCypher: SKIP/LIMIT require an integer. Float values
5335        // (including integer-valued floats) are rejected as
5336        // InvalidArgumentType — they'd only be valid after an
5337        // explicit cast, which the user must write themselves.
5338        _ => Err(Error::TypeMismatch),
5339    }
5340}