Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12    ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13    Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14    ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17    ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18    PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24/// Shared tombstone set for nodes / edges that have been DELETEd
25/// earlier in the current query. Property / label / type / keys /
26/// id accesses consult this set and raise `DeletedEntityAccess`
27/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
28/// runtime instead of reading off a stale clone.
29#[derive(Default)]
30pub struct Tombstones {
31    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36    pub store: &'a dyn GraphReader,
37    pub writer: &'a dyn GraphWriter,
38    /// Per-query parameter bindings. Empty for the single-node and Raft
39    /// entry points that don't carry a Bolt RUN; populated with the
40    /// driver-supplied params on the Bolt path. Threaded into every
41    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
42    pub params: &'a ParamMap,
43    /// Procedures known to this execution. Defaults to an empty
44    /// registry on call sites that don't care; the TCK harness and
45    /// any future server startup plug in a populated registry so
46    /// `CALL ns.name(...)` resolves.
47    pub procedures: &'a ProcedureRegistry,
48    /// Outer-scope rows contributed by enclosing operators. The
49    /// innermost slot (first entry) is the nearest outer. Set by
50    /// [`CartesianProductOp`] when running its right side so that
51    /// operators inside — typically `EdgeExpandOp` /
52    /// `VarLengthExpandOp` constraint lookups — can resolve
53    /// variables the right-side scan didn't bind directly. Empty
54    /// at the top level.
55    pub outer_rows: &'a [&'a Row],
56    /// Tombstones for entities deleted earlier in the same query.
57    /// Shared by reference so DeleteOp can insert and downstream
58    /// eval can check.
59    pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64    fn put_node(&self, _: &Node) -> Result<()> {
65        Ok(())
66    }
67    fn put_edge(&self, _: &Edge) -> Result<()> {
68        Ok(())
69    }
70    fn delete_edge(&self, _: EdgeId) -> Result<()> {
71        Ok(())
72    }
73    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74        Ok(())
75    }
76}
77
78impl<'a> ExecCtx<'a> {
79    /// Build an `EvalCtx` wrapping the given row alongside this
80    /// exec context's params and reader. Used by operators to
81    /// bridge the per-operator context into the expression
82    /// evaluator without repeating the field list at every
83    /// `eval_expr` call site.
84    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85    where
86        'a: 'b,
87    {
88        EvalCtx {
89            row,
90            params: self.params,
91            reader: self.store,
92            procedures: self.procedures,
93            outer_rows: self.outer_rows,
94            tombstones: self.tombstones,
95        }
96    }
97
98    /// Look up a variable in `row` first, then in each outer-scope
99    /// row in order. Returns the nearest match. Used for constraint
100    /// lookups inside right-side operators of a `CartesianProduct`
101    /// so a fresh scan that references an outer binding
102    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
103    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104        if let Some(v) = row.get(name) {
105            return Some(v);
106        }
107        for outer in self.outer_rows {
108            if let Some(v) = outer.get(name) {
109                return Some(v);
110            }
111        }
112        None
113    }
114}
115
116pub trait Operator {
117    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120/// Execute a plan using the given store for both reads and writes.
121/// Equivalent to [`execute_with_reader`] with the store acting as both
122/// and an empty parameter map.
123pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124    let params = ParamMap::new();
125    execute_with_reader(
126        plan,
127        store as &dyn GraphReader,
128        store as &dyn GraphWriter,
129        &params,
130    )
131}
132
133/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
134/// sending mutations to a separate [`GraphWriter`]. Used by in-process
135/// setups where reads are always cheap-local (single node or
136/// full-replica Raft). No parameters.
137pub fn execute_with_writer(
138    plan: &LogicalPlan,
139    store: &RocksDbStorageEngine,
140    writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142    let params = ParamMap::new();
143    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
144}
145
146/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
147/// parameter map for `$param` resolution. Routing mode uses a
148/// partitioned reader that fans out reads across peers; the Bolt
149/// listener supplies the driver-bound param map.
150pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151    let text = meshdb_cypher::format_plan(plan);
152    let mut row = Row::new();
153    row.insert("plan".to_string(), Value::Property(Property::String(text)));
154    vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158    let result_rows = execute(plan, store)?;
159    let row_count = result_rows.len() as i64;
160    let plan_text = meshdb_cypher::format_plan(plan);
161    let summary = format!("{plan_text}\nRows: {row_count}");
162    let mut row = Row::new();
163    row.insert(
164        "profile".to_string(),
165        Value::Property(Property::String(summary)),
166    );
167    row.insert(
168        "rows".to_string(),
169        Value::Property(Property::Int64(row_count)),
170    );
171    Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175    plan: &LogicalPlan,
176    reader: &dyn GraphReader,
177    writer: &dyn GraphWriter,
178    params: &ParamMap,
179) -> Result<Vec<Row>> {
180    let empty_procs = ProcedureRegistry::new();
181    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184/// Like [`execute_with_reader`] but with an explicit procedure
185/// registry in scope. Used by the TCK harness to mount mock
186/// procedures declared by `there exists a procedure ...` steps,
187/// and reserved for the server startup path once built-in
188/// procedures land.
189pub fn execute_with_reader_and_procs(
190    plan: &LogicalPlan,
191    reader: &dyn GraphReader,
192    writer: &dyn GraphWriter,
193    params: &ParamMap,
194    procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196    // `datetime()`, `localtime()`, and friends cache "now" in a
197    // thread-local so multiple calls inside the same statement see
198    // the same instant. Clear it at the start of every execution so
199    // the next query gets a fresh reading.
200    crate::eval::reset_statement_time();
201    // Schema DDL never enters the operator pipeline — it's a
202    // side-effect on the backing store, not a row-producing query.
203    // Short-circuit here so `build_op` stays a pure plan-to-operator
204    // mapping and doesn't need DDL-specific cases.
205    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206        return Ok(rows);
207    }
208    let suppress_output = is_write_only_plan(plan);
209    let mut op = build_op(plan);
210    let tombstones = Tombstones::default();
211    let ctx = ExecCtx {
212        store: reader,
213        writer,
214        params,
215        procedures,
216        outer_rows: &[],
217        tombstones: &tombstones,
218    };
219    let mut rows = Vec::new();
220    while let Some(row) = op.next(&ctx)? {
221        rows.push(row);
222    }
223    if suppress_output {
224        Ok(Vec::new())
225    } else {
226        Ok(rows)
227    }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231    // A plan is write-only when its top-level operator is a mutation
232    // with no projection (Project/Identity) wrapping it. Mutations
233    // with RETURN have a Project on top.
234    match plan {
235        LogicalPlan::CreatePath { .. }
236        | LogicalPlan::Delete { .. }
237        | LogicalPlan::SetProperty { .. }
238        | LogicalPlan::Remove { .. }
239        | LogicalPlan::Foreach { .. }
240        | LogicalPlan::MergeNode { .. }
241        | LogicalPlan::MergeEdge { .. } => true,
242        _ => false,
243    }
244}
245
246/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
247/// statement and was handled; `Ok(None)` when it's a regular graph
248/// operation that the operator pipeline should execute normally.
249///
250/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
251/// value so Bolt clients see a non-empty RECORD stream rather than
252/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
253/// `SHOW INDEXES` returns one row per registered index with
254/// `label`, `property`, and `state` columns.
255fn try_execute_ddl(
256    plan: &LogicalPlan,
257    reader: &dyn GraphReader,
258    writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260    match plan {
261        LogicalPlan::CreatePropertyIndex { label, properties } => {
262            writer.create_property_index(label, properties)?;
263            Ok(Some(vec![node_index_ddl_ack_row(
264                "created", label, properties,
265            )]))
266        }
267        LogicalPlan::DropPropertyIndex { label, properties } => {
268            writer.drop_property_index(label, properties)?;
269            Ok(Some(vec![node_index_ddl_ack_row(
270                "dropped", label, properties,
271            )]))
272        }
273        LogicalPlan::CreateEdgePropertyIndex {
274            edge_type,
275            properties,
276        } => {
277            writer.create_edge_property_index(edge_type, properties)?;
278            Ok(Some(vec![edge_index_ddl_ack_row(
279                "created", edge_type, properties,
280            )]))
281        }
282        LogicalPlan::DropEdgePropertyIndex {
283            edge_type,
284            properties,
285        } => {
286            writer.drop_edge_property_index(edge_type, properties)?;
287            Ok(Some(vec![edge_index_ddl_ack_row(
288                "dropped", edge_type, properties,
289            )]))
290        }
291        LogicalPlan::ShowPropertyIndexes => {
292            // SHOW is a pure read — source it from the reader so
293            // overlay/partitioned readers deliver the right view
294            // without round-tripping through a writer. Both
295            // node-scoped and edge-scoped indexes land in the same
296            // row stream, distinguished by the `scope` column.
297            let mut rows: Vec<Row> = Vec::new();
298            for (label, properties) in reader.list_property_indexes()? {
299                rows.push(show_index_row("NODE", label, properties));
300            }
301            for (edge_type, properties) in reader.list_edge_property_indexes()? {
302                rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
303            }
304            Ok(Some(rows))
305        }
306        LogicalPlan::CreatePointIndex { label, property } => {
307            writer.create_point_index(label, property)?;
308            Ok(Some(vec![point_index_ddl_ack_row(
309                "created", "NODE", label, property,
310            )]))
311        }
312        LogicalPlan::DropPointIndex { label, property } => {
313            writer.drop_point_index(label, property)?;
314            Ok(Some(vec![point_index_ddl_ack_row(
315                "dropped", "NODE", label, property,
316            )]))
317        }
318        LogicalPlan::CreateEdgePointIndex {
319            edge_type,
320            property,
321        } => {
322            writer.create_edge_point_index(edge_type, property)?;
323            Ok(Some(vec![point_index_ddl_ack_row(
324                "created",
325                "RELATIONSHIP",
326                edge_type,
327                property,
328            )]))
329        }
330        LogicalPlan::DropEdgePointIndex {
331            edge_type,
332            property,
333        } => {
334            writer.drop_edge_point_index(edge_type, property)?;
335            Ok(Some(vec![point_index_ddl_ack_row(
336                "dropped",
337                "RELATIONSHIP",
338                edge_type,
339                property,
340            )]))
341        }
342        LogicalPlan::ShowPointIndexes => {
343            // Kept on its own row stream rather than folded into
344            // `SHOW INDEXES` so the point-specific `type` column
345            // stays a stable part of the shape and callers can
346            // filter without depending on string-matching `type`
347            // across mixed rows. Node- and edge-scope rows are
348            // merged here, disambiguated by the `scope` column.
349            let mut rows: Vec<Row> = Vec::new();
350            for (label, property) in reader.list_point_indexes()? {
351                rows.push(show_point_index_row("NODE", label, property));
352            }
353            for (edge_type, property) in reader.list_edge_point_indexes()? {
354                rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
355            }
356            Ok(Some(rows))
357        }
358        LogicalPlan::CreatePropertyConstraint {
359            name,
360            scope,
361            properties,
362            kind,
363            if_not_exists,
364        } => {
365            let storage_kind = match kind {
366                ConstraintKind::Unique => PropertyConstraintKind::Unique,
367                ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
368                ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
369                ConstraintKind::PropertyType(t) => {
370                    PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
371                }
372            };
373            let storage_scope = cypher_to_storage_scope(scope);
374            let spec = writer.create_property_constraint(
375                name.as_deref(),
376                &storage_scope,
377                properties,
378                storage_kind,
379                *if_not_exists,
380            )?;
381            Ok(Some(vec![constraint_ack_row("created", &spec)]))
382        }
383        LogicalPlan::DropPropertyConstraint { name, if_exists } => {
384            writer.drop_property_constraint(name, *if_exists)?;
385            let mut row = Row::default();
386            row.insert(
387                "state".into(),
388                Value::Property(Property::String("dropped".into())),
389            );
390            row.insert(
391                "name".into(),
392                Value::Property(Property::String(name.clone())),
393            );
394            Ok(Some(vec![row]))
395        }
396        LogicalPlan::ShowPropertyConstraints => {
397            let specs = reader.list_property_constraints()?;
398            let rows = specs.into_iter().map(constraint_show_row).collect();
399            Ok(Some(rows))
400        }
401        _ => Ok(None),
402    }
403}
404
405/// One-row acknowledgement emitted after `CREATE CONSTRAINT`. Carries
406/// the resolved spec so callers see the final name when they omitted
407/// it. Columns match `SHOW CONSTRAINTS` (plus `state`) so Bolt clients
408/// can uniformly format DDL responses.
409fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
410    let mut row = constraint_show_row(spec.clone());
411    row.insert(
412        "state".into(),
413        Value::Property(Property::String(state.into())),
414    );
415    row
416}
417
418/// Row shape for `SHOW CONSTRAINTS` and `db.constraints()`. Columns:
419/// `name`, `scope` ("NODE" / "RELATIONSHIP"), `label` (label or edge
420/// type, depending on scope), `properties` (list), and `type`.
421/// Single-property constraints still carry a one-element `properties`
422/// list; composite kinds (e.g. `NodeKey`) carry the full tuple.
423fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
424    let mut row = Row::default();
425    row.insert("name".into(), Value::Property(Property::String(spec.name)));
426    let (scope_tag, target) = match spec.scope {
427        meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
428        meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
429    };
430    row.insert(
431        "scope".into(),
432        Value::Property(Property::String(scope_tag.into())),
433    );
434    // `label` stays for backwards compatibility — it carries the
435    // scope's target regardless of node vs relationship, matching
436    // the index DDL row's column for historical consistency. A
437    // separate `scope` column tells you what kind of target it is.
438    row.insert("label".into(), Value::Property(Property::String(target)));
439    let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
440    row.insert("properties".into(), Value::Property(Property::List(props)));
441    row.insert(
442        "type".into(),
443        Value::Property(Property::String(spec.kind.as_string())),
444    );
445    row
446}
447
448/// Convert the Cypher AST's `ConstraintScope` into the storage-layer
449/// enum. Narrow bridge — same shape on both sides; kept in
450/// different crates to preserve the dependency direction.
451fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
452    match scope {
453        CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
454        CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
455    }
456}
457
458/// Convert the Cypher AST's `PropertyType` into the storage-layer
459/// enum. Narrow bridge — the two enums carry the same four variants
460/// but live in different crates to keep the dependency direction
461/// clean (cypher → executor → storage, never the reverse).
462fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
463    match t {
464        CypherPropertyType::String => StoragePropertyType::String,
465        CypherPropertyType::Integer => StoragePropertyType::Integer,
466        CypherPropertyType::Float => StoragePropertyType::Float,
467        CypherPropertyType::Boolean => StoragePropertyType::Boolean,
468    }
469}
470
471/// One-row acknowledgement emitted after `CREATE INDEX` /
472/// `DROP INDEX` on a node scope. Columns mirror the `SHOW INDEXES`
473/// shape (`scope`, `label`, `property`) so Bolt clients can format
474/// DDL responses uniformly with schema-read responses. `label` is
475/// kept as a column name for backwards-compat with pre-edge-index
476/// clients; the `scope` column disambiguates node from edge.
477fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
478    let mut row = Row::default();
479    row.insert(
480        "state".into(),
481        Value::Property(Property::String(state.into())),
482    );
483    row.insert(
484        "scope".into(),
485        Value::Property(Property::String("NODE".into())),
486    );
487    row.insert(
488        "label".into(),
489        Value::Property(Property::String(label.into())),
490    );
491    // `property` stays a single string for backward compat with
492    // drivers that read it directly. Composite specs render as a
493    // comma-joined preview; the `properties` column carries the
494    // authoritative list.
495    row.insert(
496        "property".into(),
497        Value::Property(Property::String(properties.join(","))),
498    );
499    row.insert("properties".into(), properties_list_value(properties));
500    row
501}
502
503/// One-row acknowledgement emitted after `CREATE INDEX` /
504/// `DROP INDEX` on a relationship scope. Mirrors
505/// [`node_index_ddl_ack_row`] but the target lives under
506/// `edge_type` (and `label` carries the same string so existing
507/// driver code that read `label` still sees the target name).
508fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
509    let mut row = Row::default();
510    row.insert(
511        "state".into(),
512        Value::Property(Property::String(state.into())),
513    );
514    row.insert(
515        "scope".into(),
516        Value::Property(Property::String("RELATIONSHIP".into())),
517    );
518    row.insert(
519        "label".into(),
520        Value::Property(Property::String(edge_type.into())),
521    );
522    row.insert(
523        "edge_type".into(),
524        Value::Property(Property::String(edge_type.into())),
525    );
526    row.insert(
527        "property".into(),
528        Value::Property(Property::String(properties.join(","))),
529    );
530    row.insert("properties".into(), properties_list_value(properties));
531    row
532}
533
534fn properties_list_value(properties: &[String]) -> Value {
535    Value::List(
536        properties
537            .iter()
538            .map(|p| Value::Property(Property::String(p.clone())))
539            .collect(),
540    )
541}
542
543/// One-row ack emitted after `CREATE POINT INDEX` / `DROP POINT
544/// INDEX`. `scope` disambiguates node vs relationship; `label`
545/// carries the scope target string (label for node, edge type for
546/// relationship) and — when relationship — `edge_type` carries it
547/// too so drivers can read either column. Matches the two-column
548/// convention from `show_index_row`.
549fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
550    let mut row = Row::default();
551    row.insert(
552        "state".into(),
553        Value::Property(Property::String(state.into())),
554    );
555    row.insert(
556        "scope".into(),
557        Value::Property(Property::String(scope.into())),
558    );
559    row.insert(
560        "type".into(),
561        Value::Property(Property::String("POINT".into())),
562    );
563    row.insert(
564        "label".into(),
565        Value::Property(Property::String(target.into())),
566    );
567    if scope == "RELATIONSHIP" {
568        row.insert(
569            "edge_type".into(),
570            Value::Property(Property::String(target.into())),
571        );
572    }
573    row.insert(
574        "property".into(),
575        Value::Property(Property::String(property.into())),
576    );
577    row
578}
579
580/// Row shape for `SHOW POINT INDEXES`. One row per registered point
581/// index under both scopes, disambiguated by the `scope` column.
582/// Same column shape as `point_index_ddl_ack_row` minus `state`'s
583/// DDL verb (it carries `"online"` here instead).
584fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
585    let mut row = Row::default();
586    row.insert(
587        "scope".into(),
588        Value::Property(Property::String(scope.into())),
589    );
590    row.insert(
591        "type".into(),
592        Value::Property(Property::String("POINT".into())),
593    );
594    row.insert(
595        "label".into(),
596        Value::Property(Property::String(target.clone())),
597    );
598    if scope == "RELATIONSHIP" {
599        row.insert(
600            "edge_type".into(),
601            Value::Property(Property::String(target)),
602        );
603    }
604    row.insert(
605        "property".into(),
606        Value::Property(Property::String(property)),
607    );
608    row.insert(
609        "state".into(),
610        Value::Property(Property::String("online".into())),
611    );
612    row
613}
614
615/// Row shape for `SHOW INDEXES` output. Columns: `scope`
616/// (`"NODE"` / `"RELATIONSHIP"`), `label` (scope target — label for
617/// node, edge type for relationship; kept for backwards-compat),
618/// `property` (comma-joined preview of the properties for composite
619/// specs), `properties` (full list — authoritative), and `state`.
620/// `label` and `edge_type` carry the same string when
621/// `scope == "RELATIONSHIP"` so clients can read either column.
622fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
623    let mut row = Row::default();
624    row.insert(
625        "scope".into(),
626        Value::Property(Property::String(scope.into())),
627    );
628    row.insert(
629        "label".into(),
630        Value::Property(Property::String(target.clone())),
631    );
632    if scope == "RELATIONSHIP" {
633        row.insert(
634            "edge_type".into(),
635            Value::Property(Property::String(target)),
636        );
637    }
638    row.insert(
639        "property".into(),
640        Value::Property(Property::String(properties.join(","))),
641    );
642    row.insert("properties".into(), properties_list_value(&properties));
643    row.insert(
644        "state".into(),
645        Value::Property(Property::String("online".into())),
646    );
647    row
648}
649
650fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
651    build_op_inner(plan, None)
652}
653
654pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
655    macro_rules! child {
656        ($p:expr) => {
657            build_op_inner($p, seed)
658        };
659    }
660    match plan {
661        LogicalPlan::CreatePath {
662            input,
663            nodes,
664            edges,
665        } => Box::new(CreatePathOp::new(
666            input.as_ref().map(|p| child!(p)),
667            nodes.clone(),
668            edges.clone(),
669        )),
670        LogicalPlan::CartesianProduct { left, right } => {
671            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
672        }
673        LogicalPlan::Delete {
674            input,
675            detach,
676            vars,
677            exprs,
678        } => Box::new(DeleteOp::new(
679            child!(input),
680            *detach,
681            vars.clone(),
682            exprs.clone(),
683        )),
684        LogicalPlan::SetProperty { input, assignments } => {
685            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
686        }
687        LogicalPlan::Remove { input, items } => {
688            Box::new(RemoveOp::new(child!(input), items.clone()))
689        }
690        LogicalPlan::LoadCsv {
691            input,
692            path_expr,
693            var,
694            with_headers,
695        } => Box::new(LoadCsvOp::new(
696            input.as_ref().map(|p| child!(p)),
697            path_expr.clone(),
698            var.clone(),
699            *with_headers,
700        )),
701        LogicalPlan::Foreach {
702            input,
703            var,
704            list_expr,
705            set_assignments,
706            remove_items,
707        } => Box::new(ForeachOp::new(
708            child!(input),
709            var.clone(),
710            list_expr.clone(),
711            set_assignments.clone(),
712            remove_items.clone(),
713        )),
714        LogicalPlan::CallSubquery { input, body } => {
715            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
716        }
717        LogicalPlan::OptionalApply {
718            input,
719            body,
720            null_vars,
721        } => Box::new(OptionalApplyOp::new(
722            child!(input),
723            (**body).clone(),
724            null_vars.clone(),
725        )),
726        LogicalPlan::ProcedureCall {
727            input,
728            qualified_name,
729            args,
730            yield_spec,
731            standalone,
732        } => Box::new(ProcedureCallOp::new(
733            input.as_ref().map(|p| child!(p)),
734            qualified_name.clone(),
735            args.clone(),
736            yield_spec.clone(),
737            *standalone,
738        )),
739        LogicalPlan::SeedRow => match seed {
740            Some(r) => Box::new(SeededRowOp {
741                row: Some(r.clone()),
742            }),
743            None => Box::new(SeedRowOp { done: false }),
744        },
745        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
746        LogicalPlan::NodeScanByLabels { var, labels } => {
747            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
748        }
749        LogicalPlan::EdgeExpand {
750            input,
751            src_var,
752            edge_var,
753            dst_var,
754            dst_labels,
755            edge_properties,
756            edge_types,
757            direction,
758            edge_constraint_var,
759        } => Box::new(EdgeExpandOp::new(
760            child!(input),
761            src_var.clone(),
762            edge_var.clone(),
763            dst_var.clone(),
764            dst_labels.clone(),
765            edge_properties.clone(),
766            edge_types.clone(),
767            *direction,
768            edge_constraint_var.clone(),
769        )),
770        LogicalPlan::OptionalEdgeExpand {
771            input,
772            src_var,
773            edge_var,
774            dst_var,
775            dst_labels,
776            dst_properties,
777            edge_types,
778            direction,
779            dst_constraint_var,
780            edge_constraint_var,
781        } => Box::new(OptionalEdgeExpandOp::new(
782            child!(input),
783            src_var.clone(),
784            edge_var.clone(),
785            dst_var.clone(),
786            dst_labels.clone(),
787            dst_properties.clone(),
788            edge_types.clone(),
789            *direction,
790            dst_constraint_var.clone(),
791            edge_constraint_var.clone(),
792        )),
793        LogicalPlan::VarLengthExpand {
794            input,
795            src_var,
796            edge_var,
797            dst_var,
798            dst_labels,
799            edge_types,
800            edge_properties,
801            direction,
802            min_hops,
803            max_hops,
804            path_var,
805            optional,
806            dst_constraint_var,
807            bound_edge_list_var,
808            excluded_edge_vars,
809        } => Box::new(VarLengthExpandOp::new(
810            child!(input),
811            src_var.clone(),
812            edge_var.clone(),
813            dst_var.clone(),
814            dst_labels.clone(),
815            edge_types.clone(),
816            edge_properties.clone(),
817            *direction,
818            *min_hops,
819            *max_hops,
820            path_var.clone(),
821            *optional,
822            dst_constraint_var.clone(),
823            bound_edge_list_var.clone(),
824            excluded_edge_vars.clone(),
825        )),
826        LogicalPlan::Filter { input, predicate } => {
827            Box::new(FilterOp::new(child!(input), predicate.clone()))
828        }
829        LogicalPlan::Project { input, items } => {
830            Box::new(ProjectOp::new(child!(input), items.clone()))
831        }
832        LogicalPlan::Aggregate {
833            input,
834            group_keys,
835            aggregates,
836        } => Box::new(AggregateOp::new(
837            child!(input),
838            group_keys.clone(),
839            aggregates.clone(),
840        )),
841        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
842        LogicalPlan::CoalesceNullRow { input, null_vars } => {
843            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
844        }
845        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
846        LogicalPlan::OrderBy { input, sort_items } => {
847            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
848        }
849        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
850        LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
851        LogicalPlan::MergeNode {
852            input,
853            var,
854            labels,
855            properties,
856            on_create,
857            on_match,
858        } => Box::new(MergeNodeOp::new(
859            input.as_ref().map(|p| child!(p)),
860            var.clone(),
861            labels.clone(),
862            properties.clone(),
863            on_create.clone(),
864            on_match.clone(),
865        )),
866        LogicalPlan::MergeEdge {
867            input,
868            edge_var,
869            src_var,
870            dst_var,
871            edge_type,
872            undirected,
873            properties,
874            on_create,
875            on_match,
876        } => Box::new(MergeEdgeOp::new(
877            child!(input),
878            edge_var.clone(),
879            src_var.clone(),
880            dst_var.clone(),
881            edge_type.clone(),
882            *undirected,
883            properties.clone(),
884            on_create.clone(),
885            on_match.clone(),
886        )),
887        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
888        LogicalPlan::UnwindChain { input, var, expr } => {
889            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
890        }
891        LogicalPlan::IndexSeek {
892            var,
893            label,
894            properties,
895            values,
896        } => Box::new(IndexSeekOp::new(
897            var.clone(),
898            label.clone(),
899            properties.clone(),
900            values.clone(),
901        )),
902        LogicalPlan::PointIndexSeek {
903            var,
904            label,
905            property,
906            bounds,
907        } => Box::new(PointIndexSeekOp::new(
908            var.clone(),
909            label.clone(),
910            property.clone(),
911            bounds.clone(),
912        )),
913        LogicalPlan::EdgeSeek {
914            edge_var,
915            src_var,
916            dst_var,
917            edge_type,
918            property,
919            value,
920            direction,
921            residual_properties,
922        } => Box::new(EdgeSeekOp::new(
923            edge_var.clone(),
924            src_var.clone(),
925            dst_var.clone(),
926            edge_type.clone(),
927            property.clone(),
928            value.clone(),
929            *direction,
930            residual_properties.clone(),
931        )),
932        LogicalPlan::EdgePointIndexSeek {
933            edge_var,
934            src_var,
935            dst_var,
936            edge_type,
937            property,
938            direction,
939            bounds,
940        } => Box::new(EdgePointIndexSeekOp::new(
941            edge_var.clone(),
942            src_var.clone(),
943            dst_var.clone(),
944            edge_type.clone(),
945            property.clone(),
946            *direction,
947            bounds.clone(),
948        )),
949        // DDL plans are handled by `try_execute_ddl` before the
950        // operator pipeline is built, so they should never reach
951        // this point. An assertion here catches any future
952        // refactor that forgets to gate them.
953        LogicalPlan::Union { branches, all } => {
954            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
955            Box::new(UnionOp::new(branch_ops, *all))
956        }
957        LogicalPlan::BindPath {
958            input,
959            path_var,
960            node_vars,
961            edge_vars,
962        } => Box::new(BindPathOp::new(
963            child!(input),
964            path_var.clone(),
965            node_vars.clone(),
966            edge_vars.clone(),
967        )),
968        LogicalPlan::ShortestPath {
969            input,
970            src_var,
971            dst_var,
972            path_var,
973            edge_types,
974            direction,
975            max_hops,
976            kind,
977        } => Box::new(ShortestPathOp::new(
978            child!(input),
979            src_var.clone(),
980            dst_var.clone(),
981            path_var.clone(),
982            edge_types.clone(),
983            *direction,
984            *max_hops,
985            *kind,
986        )),
987        LogicalPlan::CreatePropertyIndex { .. }
988        | LogicalPlan::DropPropertyIndex { .. }
989        | LogicalPlan::CreateEdgePropertyIndex { .. }
990        | LogicalPlan::DropEdgePropertyIndex { .. }
991        | LogicalPlan::ShowPropertyIndexes
992        | LogicalPlan::CreatePointIndex { .. }
993        | LogicalPlan::DropPointIndex { .. }
994        | LogicalPlan::CreateEdgePointIndex { .. }
995        | LogicalPlan::DropEdgePointIndex { .. }
996        | LogicalPlan::ShowPointIndexes
997        | LogicalPlan::CreatePropertyConstraint { .. }
998        | LogicalPlan::DropPropertyConstraint { .. }
999        | LogicalPlan::ShowPropertyConstraints => {
1000            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1001        }
1002    }
1003}
1004
1005struct UnwindOp {
1006    var: String,
1007    expr: Expr,
1008    items: Option<Vec<Value>>,
1009    cursor: usize,
1010}
1011
1012impl UnwindOp {
1013    fn new(var: String, expr: Expr) -> Self {
1014        Self {
1015            var,
1016            expr,
1017            items: None,
1018            cursor: 0,
1019        }
1020    }
1021}
1022
1023impl Operator for UnwindOp {
1024    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1025        if self.items.is_none() {
1026            let empty = Row::new();
1027            let ectx = EvalCtx {
1028                row: &empty,
1029                params: ctx.params,
1030                reader: ctx.store,
1031                procedures: ctx.procedures,
1032                outer_rows: ctx.outer_rows,
1033                tombstones: ctx.tombstones,
1034            };
1035            let val = eval_expr(&self.expr, &ectx)?;
1036            self.items = Some(coerce_unwind_list(val)?);
1037        }
1038        let items = self.items.as_ref().unwrap();
1039        if self.cursor < items.len() {
1040            let v = items[self.cursor].clone();
1041            self.cursor += 1;
1042            let mut row = Row::new();
1043            row.insert(self.var.clone(), v);
1044            Ok(Some(row))
1045        } else {
1046            Ok(None)
1047        }
1048    }
1049}
1050
1051/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
1052/// to produce a list, and emits one output row per list element.
1053/// Each output row inherits every binding from the input row plus
1054/// a new `var` binding. Empty / null lists drop the input row and
1055/// the operator immediately pulls the next input row.
1056struct UnwindChainOp {
1057    input: Box<dyn Operator>,
1058    var: String,
1059    expr: Expr,
1060    current_row: Option<Row>,
1061    items: Vec<Value>,
1062    cursor: usize,
1063}
1064
1065impl UnwindChainOp {
1066    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1067        Self {
1068            input,
1069            var,
1070            expr,
1071            current_row: None,
1072            items: Vec::new(),
1073            cursor: 0,
1074        }
1075    }
1076}
1077
1078impl Operator for UnwindChainOp {
1079    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1080        loop {
1081            if let Some(base) = &self.current_row {
1082                if self.cursor < self.items.len() {
1083                    let v = self.items[self.cursor].clone();
1084                    self.cursor += 1;
1085                    let mut row = base.clone();
1086                    row.insert(self.var.clone(), v);
1087                    return Ok(Some(row));
1088                }
1089                self.current_row = None;
1090                self.items.clear();
1091                self.cursor = 0;
1092            }
1093            let base = match self.input.next(ctx)? {
1094                Some(r) => r,
1095                None => return Ok(None),
1096            };
1097            let ectx = EvalCtx {
1098                row: &base,
1099                params: ctx.params,
1100                reader: ctx.store,
1101                procedures: ctx.procedures,
1102                outer_rows: ctx.outer_rows,
1103                tombstones: ctx.tombstones,
1104            };
1105            let val = eval_expr(&self.expr, &ectx)?;
1106            self.items = coerce_unwind_list(val)?;
1107            self.current_row = Some(base);
1108        }
1109    }
1110}
1111
1112/// Shared list-coercion used by both UNWIND operators. Accepts a
1113/// native executor `Value::List`, a property-list
1114/// `Property::List`, or `Null` (treated as an empty list). Any
1115/// other shape is a type error — UNWIND is defined over lists.
1116fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1117    match val {
1118        Value::List(items) => Ok(items),
1119        Value::Property(Property::List(props)) => {
1120            Ok(props.into_iter().map(Value::Property).collect())
1121        }
1122        Value::Null => Ok(Vec::new()),
1123        _ => Err(Error::TypeMismatch),
1124    }
1125}
1126
1127struct CreatePathOp {
1128    input: Option<Box<dyn Operator>>,
1129    nodes: Vec<CreateNodeSpec>,
1130    edges: Vec<CreateEdgeSpec>,
1131    done: bool,
1132    buffered: Option<Vec<Row>>,
1133    cursor: usize,
1134}
1135
1136impl CreatePathOp {
1137    fn new(
1138        input: Option<Box<dyn Operator>>,
1139        nodes: Vec<CreateNodeSpec>,
1140        edges: Vec<CreateEdgeSpec>,
1141    ) -> Self {
1142        Self {
1143            input,
1144            nodes,
1145            edges,
1146            done: false,
1147            buffered: None,
1148            cursor: 0,
1149        }
1150    }
1151
1152    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1153        let mut out = row.clone();
1154        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1155        for spec in &self.nodes {
1156            match spec {
1157                CreateNodeSpec::New {
1158                    var,
1159                    labels,
1160                    properties,
1161                } => {
1162                    let mut node = Node::new();
1163                    for label in labels {
1164                        node.labels.push(label.clone());
1165                    }
1166                    // Property values are `Expr` (literal | parameter
1167                    // per the grammar). Evaluate against the current row
1168                    // + params and convert to a stored Property via the
1169                    // existing helper, which rejects Node/Edge values.
1170                    // Null-valued properties aren't stored — openCypher
1171                    // treats `{k: null}` as "no such property," so
1172                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
1173                    for (k, expr) in properties {
1174                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1175                        let prop = value_to_property(value)?;
1176                        if matches!(prop, Property::Null) {
1177                            continue;
1178                        }
1179                        node.properties.insert(k.clone(), prop);
1180                    }
1181                    ctx.writer.put_node(&node)?;
1182                    node_ids.push(node.id);
1183                    if let Some(v) = var {
1184                        out.insert(v.clone(), Value::Node(node));
1185                    }
1186                }
1187                CreateNodeSpec::Reference(name) => {
1188                    let id = match out.get(name) {
1189                        Some(Value::Node(n)) => n.id,
1190                        _ => return Err(Error::UnboundVariable(name.clone())),
1191                    };
1192                    node_ids.push(id);
1193                }
1194            }
1195        }
1196        for spec in &self.edges {
1197            let src = node_ids[spec.src_idx];
1198            let dst = node_ids[spec.dst_idx];
1199            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1200            for (k, expr) in &spec.properties {
1201                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1202                let prop = value_to_property(value)?;
1203                if matches!(prop, Property::Null) {
1204                    continue;
1205                }
1206                edge.properties.insert(k.clone(), prop);
1207            }
1208            ctx.writer.put_edge(&edge)?;
1209            if let Some(v) = &spec.var {
1210                out.insert(v.clone(), Value::Edge(edge));
1211            }
1212        }
1213        Ok(out)
1214    }
1215}
1216
1217impl Operator for CreatePathOp {
1218    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1219        if self.input.is_some() {
1220            // Drain the whole input first, then replay sequentially. Draining up
1221            // front avoids aliasing the source scan's cursor while we're writing
1222            // to the store (node-label scans cache ids lazily on first call).
1223            if let Some(buffered) = self.buffered.as_mut() {
1224                if self.cursor < buffered.len() {
1225                    let row = buffered[self.cursor].clone();
1226                    self.cursor += 1;
1227                    return Ok(Some(self.apply(ctx, &row)?));
1228                }
1229                return Ok(None);
1230            }
1231            let mut rows: Vec<Row> = Vec::new();
1232            {
1233                let input = self.input.as_mut().unwrap();
1234                while let Some(row) = input.next(ctx)? {
1235                    rows.push(row);
1236                }
1237            }
1238            self.buffered = Some(rows);
1239            self.cursor = 0;
1240            // Fall through to next call via recursion.
1241            self.next(ctx)
1242        } else {
1243            if self.done {
1244                return Ok(None);
1245            }
1246            self.done = true;
1247            let empty = Row::new();
1248            Ok(Some(self.apply(ctx, &empty)?))
1249        }
1250    }
1251}
1252
1253struct CartesianProductOp {
1254    left: Box<dyn Operator>,
1255    right_plan: LogicalPlan,
1256    left_row: Option<Row>,
1257    right_op: Option<Box<dyn Operator>>,
1258}
1259
1260impl CartesianProductOp {
1261    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1262        Self {
1263            left,
1264            right_plan,
1265            left_row: None,
1266            right_op: None,
1267        }
1268    }
1269}
1270
1271impl Operator for CartesianProductOp {
1272    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1273        loop {
1274            if self.left_row.is_none() {
1275                match self.left.next(ctx)? {
1276                    None => return Ok(None),
1277                    Some(row) => {
1278                        self.left_row = Some(row);
1279                        self.right_op = Some(build_op(&self.right_plan));
1280                    }
1281                }
1282            }
1283            let right_op = self.right_op.as_mut().expect("right_op set");
1284            // Expose the current left row to right-side operators
1285            // via a shadow context so constraint lookups (for
1286            // bound edges / bound edge lists) can find vars that
1287            // the right-side scan itself doesn't bind.
1288            let left_ref = self.left_row.as_ref().unwrap();
1289            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1290            stacked.push(left_ref);
1291            stacked.extend_from_slice(ctx.outer_rows);
1292            let inner_ctx = ExecCtx {
1293                store: ctx.store,
1294                writer: ctx.writer,
1295                params: ctx.params,
1296                procedures: ctx.procedures,
1297                outer_rows: &stacked,
1298                tombstones: ctx.tombstones,
1299            };
1300            match right_op.next(&inner_ctx)? {
1301                Some(right_row) => {
1302                    let mut combined = left_ref.clone();
1303                    for (k, v) in right_row {
1304                        combined.insert(k, v);
1305                    }
1306                    return Ok(Some(combined));
1307                }
1308                None => {
1309                    self.left_row = None;
1310                    self.right_op = None;
1311                }
1312            }
1313        }
1314    }
1315}
1316
1317struct DeleteOp {
1318    input: Box<dyn Operator>,
1319    detach: bool,
1320    #[allow(dead_code)]
1321    vars: Vec<String>,
1322    exprs: Vec<Expr>,
1323    /// Rows drained from `input` before any deletes have run.
1324    /// Populated on first call to `next`. openCypher treats DELETE
1325    /// as a batch mutation: the whole preceding row set is
1326    /// materialised, then the mutations happen, then rows are
1327    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
1328    /// a, b RETURN count(*)` deletes the first row's nodes before
1329    /// the pipelined scan reaches the second row.
1330    buffered: Option<Vec<Row>>,
1331    cursor: usize,
1332}
1333
1334impl DeleteOp {
1335    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1336        Self {
1337            input,
1338            detach,
1339            vars,
1340            exprs,
1341            buffered: None,
1342            cursor: 0,
1343        }
1344    }
1345
1346    /// Gather every edge and node id referenced by the row's
1347    /// DELETE expressions and run the delete in two phases —
1348    /// edges first, then nodes. The two-phase ordering is what
1349    /// lets `DELETE p1, p2` succeed when two paths share nodes
1350    /// connected by each other's edges: by the time the node
1351    /// phase runs, the in-batch edges are already gone. The
1352    /// non-detach check probes the graph then filters out
1353    /// any edge we just deleted, so the batch-level detachment
1354    /// is counted as "no longer attached" rather than failing.
1355    fn apply_deletes(
1356        &self,
1357        ctx: &ExecCtx,
1358        row: &Row,
1359        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1360        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1361    ) -> Result<()> {
1362        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1363        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1364        for expr in &self.exprs {
1365            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1366            match v {
1367                Value::Node(n) => node_ids.push(n.id),
1368                Value::Edge(e) => edge_ids.push(e.id),
1369                Value::Path { nodes, edges } => {
1370                    for e in edges {
1371                        edge_ids.push(e.id);
1372                    }
1373                    for n in nodes {
1374                        node_ids.push(n.id);
1375                    }
1376                }
1377                Value::Null | Value::Property(Property::Null) => continue,
1378                _ => return Err(Error::TypeMismatch),
1379            }
1380        }
1381        for eid in &edge_ids {
1382            if seen_edges.insert(*eid) {
1383                ctx.writer.delete_edge(*eid)?;
1384                ctx.tombstones.edges.borrow_mut().insert(*eid);
1385            }
1386        }
1387        for nid in &node_ids {
1388            if !seen_nodes.insert(*nid) {
1389                continue;
1390            }
1391            if self.detach {
1392                // DETACH DELETE also removes every attached edge;
1393                // tombstone them too so a later projection over a
1394                // formerly-attached edge clone also raises rather
1395                // than reading stale properties.
1396                for (eid, _) in ctx.store.outgoing(*nid)? {
1397                    ctx.tombstones.edges.borrow_mut().insert(eid);
1398                }
1399                for (eid, _) in ctx.store.incoming(*nid)? {
1400                    ctx.tombstones.edges.borrow_mut().insert(eid);
1401                }
1402                ctx.writer.detach_delete_node(*nid)?;
1403            } else {
1404                let out = ctx.store.outgoing(*nid)?;
1405                let inc = ctx.store.incoming(*nid)?;
1406                let still_attached = out
1407                    .iter()
1408                    .chain(inc.iter())
1409                    .any(|(eid, _)| !seen_edges.contains(eid));
1410                if still_attached {
1411                    return Err(Error::CannotDeleteAttachedNode);
1412                }
1413                ctx.writer.detach_delete_node(*nid)?;
1414            }
1415            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1416        }
1417        Ok(())
1418    }
1419}
1420
1421impl Operator for DeleteOp {
1422    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1423        // First call: drain the input, run all deletes, then
1424        // start forwarding rows. Buffering protects against the
1425        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1426        // DELETE r, a, b RETURN count(*)` must observe every
1427        // MATCH row before any node or edge disappears, otherwise
1428        // the undirected expansion hits an already-deleted source
1429        // on the second iteration.
1430        if self.buffered.is_none() {
1431            let mut rows: Vec<Row> = Vec::new();
1432            while let Some(row) = self.input.next(ctx)? {
1433                rows.push(row);
1434            }
1435            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1436            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1437            for row in &rows {
1438                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1439            }
1440            self.buffered = Some(rows);
1441            self.cursor = 0;
1442        }
1443        let rows = self.buffered.as_ref().unwrap();
1444        if self.cursor < rows.len() {
1445            let row = rows[self.cursor].clone();
1446            self.cursor += 1;
1447            return Ok(Some(row));
1448        }
1449        Ok(None)
1450    }
1451}
1452
1453struct SetPropertyOp {
1454    input: Box<dyn Operator>,
1455    assignments: Vec<SetAssignment>,
1456}
1457
1458impl SetPropertyOp {
1459    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1460        Self { input, assignments }
1461    }
1462}
1463
1464impl Operator for SetPropertyOp {
1465    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1466        match self.input.next(ctx)? {
1467            None => Ok(None),
1468            Some(mut row) => {
1469                // Phase 1: evaluate any RHSes against the original row bindings.
1470                enum Action {
1471                    SetKey {
1472                        var: String,
1473                        key: String,
1474                        prop: Property,
1475                    },
1476                    AddLabels {
1477                        var: String,
1478                        labels: Vec<String>,
1479                    },
1480                    Replace {
1481                        var: String,
1482                        props: Vec<(String, Property)>,
1483                    },
1484                    Merge {
1485                        var: String,
1486                        props: Vec<(String, Property)>,
1487                    },
1488                }
1489                let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1490                for a in &self.assignments {
1491                    match a {
1492                        SetAssignment::Property { var, key, value } => {
1493                            let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1494                            let prop = value_to_property(evaluated)?;
1495                            actions.push(Action::SetKey {
1496                                var: var.clone(),
1497                                key: key.clone(),
1498                                prop,
1499                            });
1500                        }
1501                        SetAssignment::Labels { var, labels } => {
1502                            actions.push(Action::AddLabels {
1503                                var: var.clone(),
1504                                labels: labels.clone(),
1505                            });
1506                        }
1507                        SetAssignment::Replace { var, properties } => {
1508                            // Property values are now `Expr`. Evaluate
1509                            // each against the current row + params and
1510                            // convert via the shared helper, which
1511                            // surfaces InvalidSetValue for Node/Edge.
1512                            let props = properties
1513                                .iter()
1514                                .map(|(k, expr)| {
1515                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1516                                    Ok((k.clone(), value_to_property(v)?))
1517                                })
1518                                .collect::<Result<Vec<(String, Property)>>>()?;
1519                            actions.push(Action::Replace {
1520                                var: var.clone(),
1521                                props,
1522                            });
1523                        }
1524                        SetAssignment::Merge { var, properties } => {
1525                            let props = properties
1526                                .iter()
1527                                .map(|(k, expr)| {
1528                                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1529                                    Ok((k.clone(), value_to_property(v)?))
1530                                })
1531                                .collect::<Result<Vec<(String, Property)>>>()?;
1532                            actions.push(Action::Merge {
1533                                var: var.clone(),
1534                                props,
1535                            });
1536                        }
1537                        SetAssignment::ReplaceFromExpr {
1538                            var,
1539                            source,
1540                            replace,
1541                        } => {
1542                            let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1543                            let props = extract_property_map(&v)?;
1544                            if *replace {
1545                                actions.push(Action::Replace {
1546                                    var: var.clone(),
1547                                    props,
1548                                });
1549                            } else {
1550                                actions.push(Action::Merge {
1551                                    var: var.clone(),
1552                                    props,
1553                                });
1554                            }
1555                        }
1556                    }
1557                }
1558
1559                // Phase 2: apply updates in-place to the row bindings.
1560                let mut updated_nodes: HashSet<String> = HashSet::new();
1561                let mut updated_edges: HashSet<String> = HashSet::new();
1562                for action in actions {
1563                    match action {
1564                        Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1565                            // openCypher: SET on a null target (from
1566                            // OPTIONAL MATCH that didn't bind) is a
1567                            // silent no-op rather than an error.
1568                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1569                                continue
1570                            }
1571                            // openCypher: SET a.key = null removes the
1572                            // property rather than storing a null value.
1573                            Some(Value::Node(n)) => {
1574                                if matches!(prop, Property::Null) {
1575                                    n.properties.remove(&key);
1576                                } else {
1577                                    n.properties.insert(key, prop);
1578                                }
1579                                updated_nodes.insert(var);
1580                            }
1581                            Some(Value::Edge(e)) => {
1582                                if matches!(prop, Property::Null) {
1583                                    e.properties.remove(&key);
1584                                } else {
1585                                    e.properties.insert(key, prop);
1586                                }
1587                                updated_edges.insert(var);
1588                            }
1589                            _ => return Err(Error::UnboundVariable(var)),
1590                        },
1591                        Action::AddLabels { var, labels } => match row.get_mut(&var) {
1592                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1593                                continue
1594                            }
1595                            Some(Value::Node(n)) => {
1596                                for label in labels {
1597                                    if !n.labels.contains(&label) {
1598                                        n.labels.push(label);
1599                                    }
1600                                }
1601                                updated_nodes.insert(var);
1602                            }
1603                            _ => return Err(Error::UnboundVariable(var)),
1604                        },
1605                        Action::Replace { var, props } => match row.get_mut(&var) {
1606                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1607                                continue
1608                            }
1609                            Some(Value::Node(n)) => {
1610                                n.properties.clear();
1611                                for (k, v) in props {
1612                                    if !matches!(v, Property::Null) {
1613                                        n.properties.insert(k, v);
1614                                    }
1615                                }
1616                                updated_nodes.insert(var);
1617                            }
1618                            Some(Value::Edge(e)) => {
1619                                e.properties.clear();
1620                                for (k, v) in props {
1621                                    if !matches!(v, Property::Null) {
1622                                        e.properties.insert(k, v);
1623                                    }
1624                                }
1625                                updated_edges.insert(var);
1626                            }
1627                            _ => return Err(Error::UnboundVariable(var)),
1628                        },
1629                        Action::Merge { var, props } => match row.get_mut(&var) {
1630                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1631                                continue
1632                            }
1633                            Some(Value::Node(n)) => {
1634                                for (k, v) in props {
1635                                    if matches!(v, Property::Null) {
1636                                        n.properties.remove(&k);
1637                                    } else {
1638                                        n.properties.insert(k, v);
1639                                    }
1640                                }
1641                                updated_nodes.insert(var);
1642                            }
1643                            Some(Value::Edge(e)) => {
1644                                for (k, v) in props {
1645                                    if matches!(v, Property::Null) {
1646                                        e.properties.remove(&k);
1647                                    } else {
1648                                        e.properties.insert(k, v);
1649                                    }
1650                                }
1651                                updated_edges.insert(var);
1652                            }
1653                            _ => return Err(Error::UnboundVariable(var)),
1654                        },
1655                    }
1656                }
1657
1658                // Phase 3: flush each mutated entity once to the writer.
1659                for var in &updated_nodes {
1660                    if let Some(Value::Node(n)) = row.get(var) {
1661                        ctx.writer.put_node(n)?;
1662                    }
1663                }
1664                for var in &updated_edges {
1665                    if let Some(Value::Edge(e)) = row.get(var) {
1666                        ctx.writer.put_edge(e)?;
1667                    }
1668                }
1669
1670                Ok(Some(row))
1671            }
1672        }
1673    }
1674}
1675
1676struct RemoveOp {
1677    input: Box<dyn Operator>,
1678    items: Vec<RemoveSpec>,
1679}
1680
1681impl RemoveOp {
1682    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1683        Self { input, items }
1684    }
1685}
1686
1687impl Operator for RemoveOp {
1688    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1689        match self.input.next(ctx)? {
1690            None => Ok(None),
1691            Some(mut row) => {
1692                let mut updated_nodes: HashSet<String> = HashSet::new();
1693                let mut updated_edges: HashSet<String> = HashSet::new();
1694                for item in &self.items {
1695                    match item {
1696                        RemoveSpec::Property { var, key } => match row.get_mut(var) {
1697                            // Null target (from OPTIONAL MATCH) is a
1698                            // no-op, matching Neo4j's REMOVE semantics.
1699                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1700                                continue
1701                            }
1702                            Some(Value::Node(n)) => {
1703                                n.properties.remove(key);
1704                                updated_nodes.insert(var.clone());
1705                            }
1706                            Some(Value::Edge(e)) => {
1707                                e.properties.remove(key);
1708                                updated_edges.insert(var.clone());
1709                            }
1710                            _ => return Err(Error::UnboundVariable(var.clone())),
1711                        },
1712                        RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1713                            Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1714                                continue
1715                            }
1716                            Some(Value::Node(n)) => {
1717                                n.labels.retain(|l| !labels.contains(l));
1718                                updated_nodes.insert(var.clone());
1719                            }
1720                            _ => return Err(Error::UnboundVariable(var.clone())),
1721                        },
1722                    }
1723                }
1724                for var in &updated_nodes {
1725                    if let Some(Value::Node(n)) = row.get(var) {
1726                        ctx.writer.put_node(n)?;
1727                    }
1728                }
1729                for var in &updated_edges {
1730                    if let Some(Value::Edge(e)) = row.get(var) {
1731                        ctx.writer.put_edge(e)?;
1732                    }
1733                }
1734                Ok(Some(row))
1735            }
1736        }
1737    }
1738}
1739
1740struct LoadCsvOp {
1741    input: Option<Box<dyn Operator>>,
1742    path_expr: Expr,
1743    var: String,
1744    with_headers: bool,
1745    rows: Option<Vec<Value>>,
1746    cursor: usize,
1747}
1748
1749impl LoadCsvOp {
1750    fn new(
1751        input: Option<Box<dyn Operator>>,
1752        path_expr: Expr,
1753        var: String,
1754        with_headers: bool,
1755    ) -> Self {
1756        Self {
1757            input,
1758            path_expr,
1759            var,
1760            with_headers,
1761            rows: None,
1762            cursor: 0,
1763        }
1764    }
1765
1766    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1767        let ectx = ctx.eval_ctx(base_row);
1768        let path_val = eval_expr(&self.path_expr, &ectx)?;
1769        let path = match path_val {
1770            Value::Property(Property::String(s)) => s,
1771            _ => return Err(Error::TypeMismatch),
1772        };
1773        let content = std::fs::read_to_string(&path).map_err(|e| {
1774            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1775        })?;
1776        let mut lines = content.lines();
1777        let headers: Option<Vec<String>> = if self.with_headers {
1778            lines
1779                .next()
1780                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1781        } else {
1782            None
1783        };
1784        let mut csv_rows = Vec::new();
1785        for line in lines {
1786            if line.trim().is_empty() {
1787                continue;
1788            }
1789            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1790            if let Some(hdrs) = &headers {
1791                let mut map = std::collections::HashMap::new();
1792                for (i, h) in hdrs.iter().enumerate() {
1793                    let val = fields.get(i).cloned().unwrap_or_default();
1794                    map.insert(h.clone(), Property::String(val));
1795                }
1796                csv_rows.push(Value::Property(Property::Map(map)));
1797            } else {
1798                let list: Vec<Value> = fields
1799                    .into_iter()
1800                    .map(|f| Value::Property(Property::String(f)))
1801                    .collect();
1802                csv_rows.push(Value::List(list));
1803            }
1804        }
1805        self.rows = Some(csv_rows);
1806        self.cursor = 0;
1807        Ok(())
1808    }
1809}
1810
1811impl Operator for LoadCsvOp {
1812    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1813        if self.rows.is_none() {
1814            let base = if let Some(input) = &mut self.input {
1815                match input.next(ctx)? {
1816                    Some(r) => r,
1817                    None => return Ok(None),
1818                }
1819            } else {
1820                Row::new()
1821            };
1822            self.load(ctx, &base)?;
1823        }
1824        let rows = self.rows.as_ref().unwrap();
1825        if self.cursor < rows.len() {
1826            let val = rows[self.cursor].clone();
1827            self.cursor += 1;
1828            let mut row = Row::new();
1829            row.insert(self.var.clone(), val);
1830            Ok(Some(row))
1831        } else {
1832            Ok(None)
1833        }
1834    }
1835}
1836
1837struct ForeachOp {
1838    input: Box<dyn Operator>,
1839    var: String,
1840    list_expr: Expr,
1841    set_assignments: Vec<SetAssignment>,
1842    remove_items: Vec<RemoveSpec>,
1843}
1844
1845impl ForeachOp {
1846    fn new(
1847        input: Box<dyn Operator>,
1848        var: String,
1849        list_expr: Expr,
1850        set_assignments: Vec<SetAssignment>,
1851        remove_items: Vec<RemoveSpec>,
1852    ) -> Self {
1853        Self {
1854            input,
1855            var,
1856            list_expr,
1857            set_assignments,
1858            remove_items,
1859        }
1860    }
1861}
1862
1863impl Operator for ForeachOp {
1864    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1865        let Some(row) = self.input.next(ctx)? else {
1866            return Ok(None);
1867        };
1868        let ectx = ctx.eval_ctx(&row);
1869        let list_val = eval_expr(&self.list_expr, &ectx)?;
1870        let items = match list_val {
1871            Value::List(items) => items,
1872            Value::Property(Property::List(props)) => {
1873                props.into_iter().map(Value::Property).collect()
1874            }
1875            Value::Null | Value::Property(Property::Null) => Vec::new(),
1876            _ => return Err(Error::TypeMismatch),
1877        };
1878        for item in items {
1879            let mut scratch = row.clone();
1880            scratch.insert(self.var.clone(), item);
1881            for a in &self.set_assignments {
1882                match a {
1883                    SetAssignment::Property { var, key, value } => {
1884                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1885                        let prop = value_to_property(evaluated)?;
1886                        match scratch.get_mut(var) {
1887                            Some(Value::Node(n)) => {
1888                                n.properties.insert(key.clone(), prop);
1889                            }
1890                            Some(Value::Edge(e)) => {
1891                                e.properties.insert(key.clone(), prop);
1892                            }
1893                            _ => return Err(Error::UnboundVariable(var.clone())),
1894                        }
1895                    }
1896                    SetAssignment::Labels { var, labels } => {
1897                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1898                            for l in labels {
1899                                if !n.labels.contains(l) {
1900                                    n.labels.push(l.clone());
1901                                }
1902                            }
1903                        }
1904                    }
1905                    _ => {}
1906                }
1907            }
1908            for ri in &self.remove_items {
1909                match ri {
1910                    RemoveSpec::Property { var, key } => {
1911                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1912                            n.properties.remove(key);
1913                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1914                            e.properties.remove(key);
1915                        }
1916                    }
1917                    RemoveSpec::Labels { var, labels } => {
1918                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
1919                            n.labels.retain(|l| !labels.contains(l));
1920                        }
1921                    }
1922                }
1923            }
1924            // Flush mutated entities for each iteration
1925            for (_, val) in scratch.iter() {
1926                match val {
1927                    Value::Node(n) => ctx.writer.put_node(n)?,
1928                    Value::Edge(e) => ctx.writer.put_edge(e)?,
1929                    _ => {}
1930                }
1931            }
1932        }
1933        Ok(Some(row))
1934    }
1935}
1936
1937struct SeedRowOp {
1938    done: bool,
1939}
1940
1941impl Operator for SeedRowOp {
1942    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1943        if self.done {
1944            return Ok(None);
1945        }
1946        self.done = true;
1947        Ok(Some(Row::new()))
1948    }
1949}
1950
1951struct SeededRowOp {
1952    row: Option<Row>,
1953}
1954
1955impl Operator for SeededRowOp {
1956    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1957        Ok(self.row.take())
1958    }
1959}
1960
1961struct CallSubqueryOp {
1962    input: Box<dyn Operator>,
1963    body_plan: LogicalPlan,
1964    pending: Vec<Row>,
1965    pending_idx: usize,
1966}
1967
1968impl CallSubqueryOp {
1969    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1970        Self {
1971            input,
1972            body_plan,
1973            pending: Vec::new(),
1974            pending_idx: 0,
1975        }
1976    }
1977}
1978
1979impl Operator for CallSubqueryOp {
1980    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1981        loop {
1982            if self.pending_idx < self.pending.len() {
1983                let row = self.pending[self.pending_idx].clone();
1984                self.pending_idx += 1;
1985                return Ok(Some(row));
1986            }
1987            let outer_row = match self.input.next(ctx)? {
1988                Some(r) => r,
1989                None => return Ok(None),
1990            };
1991            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1992            let mut results = Vec::new();
1993            while let Some(body_row) = body_op.next(ctx)? {
1994                let mut merged = outer_row.clone();
1995                for (k, v) in body_row {
1996                    merged.insert(k, v);
1997                }
1998                results.push(merged);
1999            }
2000            if results.is_empty() {
2001                continue;
2002            }
2003            self.pending = results;
2004            self.pending_idx = 0;
2005        }
2006    }
2007}
2008
2009/// Per-input-row left-join driver (see
2010/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
2011/// outer row with the row as its seed; forwards all emitted rows
2012/// merged with the outer bindings, and emits one null-fallback
2013/// row (outer bindings plus `null_vars` bound to Null) only when
2014/// the body produced zero rows for that input.
2015struct OptionalApplyOp {
2016    input: Box<dyn Operator>,
2017    body_plan: LogicalPlan,
2018    null_vars: Vec<String>,
2019    pending: Vec<Row>,
2020    pending_idx: usize,
2021}
2022
2023impl OptionalApplyOp {
2024    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2025        Self {
2026            input,
2027            body_plan,
2028            null_vars,
2029            pending: Vec::new(),
2030            pending_idx: 0,
2031        }
2032    }
2033}
2034
2035impl Operator for OptionalApplyOp {
2036    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2037        loop {
2038            if self.pending_idx < self.pending.len() {
2039                let row = self.pending[self.pending_idx].clone();
2040                self.pending_idx += 1;
2041                return Ok(Some(row));
2042            }
2043            let outer_row = match self.input.next(ctx)? {
2044                Some(r) => r,
2045                None => return Ok(None),
2046            };
2047            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
2048            // Push the outer row onto `outer_rows` so Filter /
2049            // eval_expr inside the body can resolve outer
2050            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
2051            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2052            stacked.push(&outer_row);
2053            stacked.extend_from_slice(ctx.outer_rows);
2054            let inner_ctx = ExecCtx {
2055                store: ctx.store,
2056                writer: ctx.writer,
2057                params: ctx.params,
2058                procedures: ctx.procedures,
2059                outer_rows: &stacked,
2060                tombstones: ctx.tombstones,
2061            };
2062            let mut results = Vec::new();
2063            while let Some(body_row) = body_op.next(&inner_ctx)? {
2064                let mut merged = outer_row.clone();
2065                for (k, v) in body_row {
2066                    merged.insert(k, v);
2067                }
2068                results.push(merged);
2069            }
2070            if results.is_empty() {
2071                let mut fallback = outer_row;
2072                for v in &self.null_vars {
2073                    fallback.insert(v.clone(), Value::Null);
2074                }
2075                return Ok(Some(fallback));
2076            }
2077            self.pending = results;
2078            self.pending_idx = 0;
2079        }
2080    }
2081}
2082
2083/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
2084///
2085/// Resolves the procedure against the [`ProcedureRegistry`] at
2086/// `next()` time, validates arity / types / form (implicit args,
2087/// `YIELD *`, in-query YIELD requirement), then scans the registered
2088/// data-table for rows whose input cells match the call arguments.
2089/// Each matching row is projected according to the YIELD spec and
2090/// merged into the upstream row.
2091///
2092/// Two shapes are supported at once:
2093/// * Standalone (`input = None`): the op is the full plan. It
2094///   runs the procedure exactly once (against a synthetic empty row)
2095///   and emits the matching rows directly.
2096/// * In-query (`input = Some(...)`): for each upstream row, runs
2097///   the procedure with args evaluated against that row and emits
2098///   one merged row per match. Procedures with zero declared output
2099///   columns act as a pass-through — the upstream row is forwarded
2100///   unchanged, matching Neo4j's side-effect-only semantics.
2101struct ProcedureCallOp {
2102    input: Option<Box<dyn Operator>>,
2103    qualified_name: Vec<String>,
2104    args: Option<Vec<Expr>>,
2105    yield_spec: Option<YieldSpec>,
2106    standalone: bool,
2107    buffered: Vec<Row>,
2108    buffered_idx: usize,
2109    // Only set for the standalone form, which drives itself off a
2110    // synthetic seed row exactly once.
2111    done: bool,
2112}
2113
2114impl ProcedureCallOp {
2115    fn new(
2116        input: Option<Box<dyn Operator>>,
2117        qualified_name: Vec<String>,
2118        args: Option<Vec<Expr>>,
2119        yield_spec: Option<YieldSpec>,
2120        standalone: bool,
2121    ) -> Self {
2122        Self {
2123            input,
2124            qualified_name,
2125            args,
2126            yield_spec,
2127            standalone,
2128            buffered: Vec::new(),
2129            buffered_idx: 0,
2130            done: false,
2131        }
2132    }
2133
2134    /// Resolve the projection list `(source_column, output_alias)`
2135    /// from the procedure signature and this op's yield spec. Also
2136    /// validates the spec — unknown YIELD columns, duplicate
2137    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
2138    /// surface as `Error::Procedure`.
2139    fn resolve_projection(
2140        &self,
2141        proc: &crate::procedures::Procedure,
2142    ) -> Result<Vec<(String, String)>> {
2143        match &self.yield_spec {
2144            None => {
2145                if !self.standalone {
2146                    // In-query CALL with no YIELD: legal only for
2147                    // side-effect-only procedures (zero declared
2148                    // outputs). A procedure with outputs but no
2149                    // YIELD leaves those outputs unbound, so any
2150                    // downstream RETURN that references them would
2151                    // see `UndefinedVariable`; reject here so the
2152                    // error surfaces instead of silently emitting
2153                    // nulls.
2154                    if proc.outputs.is_empty() {
2155                        return Ok(Vec::new());
2156                    }
2157                    return Err(Error::Procedure(format!(
2158                        "procedure '{}' has outputs but no YIELD clause",
2159                        self.qualified_name.join(".")
2160                    )));
2161                }
2162                Ok(proc
2163                    .outputs
2164                    .iter()
2165                    .map(|o| (o.name.clone(), o.name.clone()))
2166                    .collect())
2167            }
2168            Some(YieldSpec::Star) => {
2169                if !self.standalone {
2170                    return Err(Error::Procedure(
2171                        "YIELD * is only allowed on standalone CALL".into(),
2172                    ));
2173                }
2174                Ok(proc
2175                    .outputs
2176                    .iter()
2177                    .map(|o| (o.name.clone(), o.name.clone()))
2178                    .collect())
2179            }
2180            Some(YieldSpec::Items(items)) => {
2181                let mut projection = Vec::with_capacity(items.len());
2182                let mut seen_aliases: std::collections::HashSet<String> =
2183                    std::collections::HashSet::new();
2184                for yi in items {
2185                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
2186                        return Err(Error::Procedure(format!(
2187                            "procedure '{}' has no output column '{}'",
2188                            self.qualified_name.join("."),
2189                            yi.column
2190                        )));
2191                    }
2192                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2193                    if !seen_aliases.insert(alias.clone()) {
2194                        return Err(Error::Procedure(format!(
2195                            "variable '{alias}' already bound by YIELD"
2196                        )));
2197                    }
2198                    projection.push((yi.column.clone(), alias));
2199                }
2200                Ok(projection)
2201            }
2202        }
2203    }
2204
2205    /// Evaluate the call's argument list against `row`. For the
2206    /// implicit-args form (`args = None`), each declared input
2207    /// column's value comes from the per-query parameter map
2208    /// (keyed by the input-column name). Returns the argument
2209    /// values in declaration order. Raises `ProcedureError` on
2210    /// arity mismatch, type mismatch, or missing parameter.
2211    fn evaluate_args(
2212        &self,
2213        ctx: &ExecCtx,
2214        row: &Row,
2215        proc: &crate::procedures::Procedure,
2216    ) -> Result<Vec<Value>> {
2217        match &self.args {
2218            Some(exprs) => {
2219                if exprs.len() != proc.inputs.len() {
2220                    return Err(Error::Procedure(format!(
2221                        "procedure '{}' expects {} argument(s), got {}",
2222                        self.qualified_name.join("."),
2223                        proc.inputs.len(),
2224                        exprs.len()
2225                    )));
2226                }
2227                let eval_ctx = ctx.eval_ctx(row);
2228                let mut values = Vec::with_capacity(exprs.len());
2229                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2230                    let v = eval_expr(expr, &eval_ctx)?;
2231                    if !spec.ty.accepts(&v) {
2232                        return Err(Error::Procedure(format!(
2233                            "argument '{}' has wrong type for procedure '{}'",
2234                            spec.name,
2235                            self.qualified_name.join(".")
2236                        )));
2237                    }
2238                    values.push(coerce_arg(v, spec.ty));
2239                }
2240                Ok(values)
2241            }
2242            None => {
2243                // Implicit-arg form only valid standalone.
2244                if !self.standalone {
2245                    return Err(Error::Procedure(
2246                        "in-query CALL requires explicit argument list".into(),
2247                    ));
2248                }
2249                let mut values = Vec::with_capacity(proc.inputs.len());
2250                for spec in &proc.inputs {
2251                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2252                        Error::Procedure(format!(
2253                            "missing parameter ${} for procedure '{}'",
2254                            spec.name,
2255                            self.qualified_name.join(".")
2256                        ))
2257                    })?;
2258                    if !spec.ty.accepts(&v) {
2259                        return Err(Error::Procedure(format!(
2260                            "parameter '{}' has wrong type",
2261                            spec.name
2262                        )));
2263                    }
2264                    values.push(coerce_arg(v, spec.ty));
2265                }
2266                Ok(values)
2267            }
2268        }
2269    }
2270
2271    /// Invoke the procedure once for `input_row` and emit zero or
2272    /// more output rows into `out`. Handles the "zero-output-column
2273    /// pass-through" case that keeps `MATCH (n) CALL test.doNothing()
2274    /// RETURN n.name` from filtering the match rows.
2275    fn invoke_once(
2276        &self,
2277        ctx: &ExecCtx,
2278        input_row: &Row,
2279        proc: &crate::procedures::Procedure,
2280        projection: &[(String, String)],
2281        out: &mut Vec<Row>,
2282    ) -> Result<()> {
2283        // Zero-output-column procedures are side-effect-only in
2284        // the TCK; they either suppress rows entirely (standalone)
2285        // or pass the input row through unchanged (in-query).
2286        if proc.outputs.is_empty() {
2287            if !self.standalone {
2288                out.push(input_row.clone());
2289            }
2290            return Ok(());
2291        }
2292        let args = self.evaluate_args(ctx, input_row, proc)?;
2293        let rows = proc.resolve_rows(ctx.store)?;
2294        for proc_row in &rows {
2295            if !proc.row_matches(proc_row, &args) {
2296                continue;
2297            }
2298            let mut merged = if self.standalone {
2299                Row::new()
2300            } else {
2301                input_row.clone()
2302            };
2303            for (src, alias) in projection {
2304                let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2305                merged.insert(alias.clone(), v);
2306            }
2307            out.push(merged);
2308        }
2309        Ok(())
2310    }
2311}
2312
2313/// Cast an int to a float when the declared type is `FLOAT`. Other
2314/// declared types leave the value as-is (accept() has already
2315/// gated on kind) so the comparison in `row_matches` sees a
2316/// consistent shape.
2317fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2318    use crate::procedures::ProcType;
2319    if matches!(ty, ProcType::Float) {
2320        if let Value::Property(Property::Int64(n)) = v {
2321            return Value::Property(Property::Float64(n as f64));
2322        }
2323    }
2324    v
2325}
2326
2327impl Operator for ProcedureCallOp {
2328    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2329        loop {
2330            if self.buffered_idx < self.buffered.len() {
2331                let row = self.buffered[self.buffered_idx].clone();
2332                self.buffered_idx += 1;
2333                return Ok(Some(row));
2334            }
2335            self.buffered.clear();
2336            self.buffered_idx = 0;
2337
2338            let proc = match ctx.procedures.get(&self.qualified_name) {
2339                Some(p) => p,
2340                None => {
2341                    return Err(Error::Procedure(format!(
2342                        "procedure '{}' not found",
2343                        self.qualified_name.join(".")
2344                    )));
2345                }
2346            };
2347            let projection = self.resolve_projection(proc)?;
2348
2349            let input_row = match &mut self.input {
2350                Some(inp) => match inp.next(ctx)? {
2351                    Some(r) => r,
2352                    None => return Ok(None),
2353                },
2354                None => {
2355                    if self.done {
2356                        return Ok(None);
2357                    }
2358                    self.done = true;
2359                    Row::new()
2360                }
2361            };
2362
2363            let mut produced = Vec::new();
2364            self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2365            if produced.is_empty() {
2366                if self.input.is_some() {
2367                    continue;
2368                }
2369                return Ok(None);
2370            }
2371            self.buffered = produced;
2372        }
2373    }
2374}
2375
2376/// Pull a property map out of a value. Supports node / edge
2377/// (uses their live property map) and map values (parameter or
2378/// map literal). Null propagates as an empty map — `SET x = null`
2379/// is spec'd as a property clear / no-op and SET = <null-binding>
2380/// (from an unmatched OPTIONAL MATCH) matches that shape.
2381fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2382    match v {
2383        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2384        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2385        Value::Map(pairs) => pairs
2386            .iter()
2387            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2388            .collect(),
2389        Value::Property(Property::Map(entries)) => Ok(entries
2390            .iter()
2391            .map(|(k, p)| (k.clone(), p.clone()))
2392            .collect()),
2393        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2394        _ => Err(Error::InvalidSetValue),
2395    }
2396}
2397
2398fn value_to_property(v: Value) -> Result<Property> {
2399    match v {
2400        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2401        Value::Property(p) => Ok(p),
2402        Value::Null => Ok(Property::Null),
2403        Value::List(items) => {
2404            let props: Vec<Property> = items
2405                .into_iter()
2406                .map(value_to_property)
2407                .collect::<Result<_>>()?;
2408            Ok(Property::List(props))
2409        }
2410        // Graph-aware `Value::Map` and graph elements can't be
2411        // stored as node / edge property values; SET will reject
2412        // them.
2413        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2414            Err(Error::InvalidSetValue)
2415        }
2416    }
2417}
2418
2419struct NodeScanAllOp {
2420    var: String,
2421    ids: Option<Vec<NodeId>>,
2422    cursor: usize,
2423}
2424
2425impl NodeScanAllOp {
2426    fn new(var: String) -> Self {
2427        Self {
2428            var,
2429            ids: None,
2430            cursor: 0,
2431        }
2432    }
2433}
2434
2435impl Operator for NodeScanAllOp {
2436    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2437        if self.ids.is_none() {
2438            self.ids = Some(ctx.store.all_node_ids()?);
2439        }
2440        let ids = self.ids.as_ref().unwrap();
2441        while self.cursor < ids.len() {
2442            let id = ids[self.cursor];
2443            self.cursor += 1;
2444            if let Some(node) = ctx.store.get_node(id)? {
2445                let mut row = Row::new();
2446                row.insert(self.var.clone(), Value::Node(node));
2447                return Ok(Some(row));
2448            }
2449        }
2450        Ok(None)
2451    }
2452}
2453
2454struct NodeScanByLabelsOp {
2455    var: String,
2456    labels: Vec<String>,
2457    ids: Option<Vec<NodeId>>,
2458    cursor: usize,
2459}
2460
2461impl NodeScanByLabelsOp {
2462    fn new(var: String, labels: Vec<String>) -> Self {
2463        Self {
2464            var,
2465            labels,
2466            ids: None,
2467            cursor: 0,
2468        }
2469    }
2470}
2471
2472impl Operator for NodeScanByLabelsOp {
2473    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2474        if self.ids.is_none() {
2475            // Use the first label for the index scan, filter the rest per-node.
2476            let primary = self
2477                .labels
2478                .first()
2479                .expect("NodeScanByLabels must have at least one label");
2480            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2481        }
2482        let ids = self.ids.as_ref().unwrap();
2483        while self.cursor < ids.len() {
2484            let id = ids[self.cursor];
2485            self.cursor += 1;
2486            if let Some(node) = ctx.store.get_node(id)? {
2487                if has_all_labels(&node, &self.labels) {
2488                    let mut row = Row::new();
2489                    row.insert(self.var.clone(), Value::Node(node));
2490                    return Ok(Some(row));
2491                }
2492            }
2493        }
2494        Ok(None)
2495    }
2496}
2497
2498fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2499    labels.iter().all(|l| node.labels.contains(l))
2500}
2501
2502/// Equality-lookup operator backed by a property index. Evaluates
2503/// the value expression lazily on the first `next()` — crucially
2504/// so parameters resolve against the per-query `ExecCtx::params`
2505/// map, not against a literal baked in at plan-construction time.
2506///
2507/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2508/// that slipped through) surface as `Error::InvalidSetValue`. The
2509/// planner only emits this op for indexes that do exist, so the
2510/// reader call should find a populated CF unless a concurrent DROP
2511/// raced us (in which case the result is just empty, which matches
2512/// how Neo4j's planner handles the race).
2513struct IndexSeekOp {
2514    var: String,
2515    label: String,
2516    properties: Vec<String>,
2517    value_exprs: Vec<Expr>,
2518    results: Option<Vec<NodeId>>,
2519    cursor: usize,
2520}
2521
2522impl IndexSeekOp {
2523    fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2524        assert_eq!(
2525            properties.len(),
2526            value_exprs.len(),
2527            "IndexSeekOp: properties and values must have equal length"
2528        );
2529        Self {
2530            var,
2531            label,
2532            properties,
2533            value_exprs,
2534            results: None,
2535            cursor: 0,
2536        }
2537    }
2538}
2539
2540impl Operator for IndexSeekOp {
2541    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2542        if self.results.is_none() {
2543            let empty = Row::new();
2544            let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2545            for expr in &self.value_exprs {
2546                let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2547                let property = match value {
2548                    Value::Property(p) => p,
2549                    Value::Null => Property::Null,
2550                    Value::Node(_)
2551                    | Value::Edge(_)
2552                    | Value::List(_)
2553                    | Value::Map(_)
2554                    | Value::Path { .. } => {
2555                        return Err(Error::InvalidSetValue);
2556                    }
2557                };
2558                values.push(property);
2559            }
2560            let ids = ctx
2561                .store
2562                .nodes_by_properties(&self.label, &self.properties, &values)?;
2563            self.results = Some(ids);
2564        }
2565        let ids = self.results.as_ref().unwrap();
2566        while self.cursor < ids.len() {
2567            let id = ids[self.cursor];
2568            self.cursor += 1;
2569            if let Some(node) = ctx.store.get_node(id)? {
2570                let mut row = Row::new();
2571                row.insert(self.var.clone(), Value::Node(node));
2572                return Ok(Some(row));
2573            }
2574        }
2575        Ok(None)
2576    }
2577}
2578
2579/// Physical operator for [`LogicalPlan::PointIndexSeek`]. Evaluates
2580/// its bounds once on first `next()` and drives
2581/// `reader.nodes_in_bbox(...)`.
2582///
2583/// For [`PointSeekBounds::Corners`] the operator passes the corners
2584/// through as-is and short-circuits on SRID mismatch / null / non-Point
2585/// corners — `point.withinbbox` returns null in those cases, which
2586/// excludes the row from a filter, so "no rows" is the same
2587/// observable outcome.
2588///
2589/// For [`PointSeekBounds::Radius`] the operator derives the enclosing
2590/// bbox from the center's SRID: Cartesian gets a `(cx ± r, cy ± r)`
2591/// square; WGS-84 converts `r` metres to lat/lon spans using the
2592/// `cos(lat)` factor for longitude. The enclosing bbox is always a
2593/// *superset* of the circle, so the planner keeps the original
2594/// distance predicate as a residual `Filter` above the seek — the
2595/// corners of the square that fall outside the circle are culled
2596/// there.
2597struct PointIndexSeekOp {
2598    var: String,
2599    label: String,
2600    property: String,
2601    bounds: PointSeekBounds,
2602    results: Option<Vec<NodeId>>,
2603    cursor: usize,
2604}
2605
2606impl PointIndexSeekOp {
2607    fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
2608        Self {
2609            var,
2610            label,
2611            property,
2612            bounds,
2613            results: None,
2614            cursor: 0,
2615        }
2616    }
2617}
2618
2619impl Operator for PointIndexSeekOp {
2620    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2621        if self.results.is_none() {
2622            let empty = Row::new();
2623            let ectx = ctx.eval_ctx(&empty);
2624            let ids = match &self.bounds {
2625                PointSeekBounds::Corners { lo, hi } => {
2626                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2627                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2628                    match (lo_pt, hi_pt) {
2629                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
2630                            &self.label,
2631                            &self.property,
2632                            lo.srid,
2633                            lo.x,
2634                            lo.y,
2635                            hi.x,
2636                            hi.y,
2637                        )?,
2638                        _ => Vec::new(),
2639                    }
2640                }
2641                PointSeekBounds::Radius { center, radius } => {
2642                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
2643                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2644                    match (center_pt, radius_val) {
2645                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2646                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2647                            ctx.store.nodes_in_bbox(
2648                                &self.label,
2649                                &self.property,
2650                                c.srid,
2651                                xlo,
2652                                ylo,
2653                                xhi,
2654                                yhi,
2655                            )?
2656                        }
2657                        // Null / non-point center or null / negative / NaN radius → no rows.
2658                        _ => Vec::new(),
2659                    }
2660                }
2661            };
2662            self.results = Some(ids);
2663        }
2664        let ids = self.results.as_ref().unwrap();
2665        while self.cursor < ids.len() {
2666            let id = ids[self.cursor];
2667            self.cursor += 1;
2668            if let Some(node) = ctx.store.get_node(id)? {
2669                let mut row = Row::new();
2670                row.insert(self.var.clone(), Value::Node(node));
2671                return Ok(Some(row));
2672            }
2673        }
2674        Ok(None)
2675    }
2676}
2677
2678fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
2679    match v {
2680        Value::Property(Property::Point(p)) => Some(*p),
2681        _ => None,
2682    }
2683}
2684
2685fn extract_f64(v: &Value) -> Option<f64> {
2686    match v {
2687        Value::Property(Property::Float64(f)) => Some(*f),
2688        Value::Property(Property::Int64(i)) => Some(*i as f64),
2689        _ => None,
2690    }
2691}
2692
2693/// Enclosing axis-aligned bbox for a circle of radius `r` around
2694/// `center`. Always a superset of the circle, so callers must still
2695/// apply a precision filter on distance. Cartesian SRIDs use a
2696/// straight `center ± r` square in the CRS's own units. Geographic
2697/// SRIDs (WGS-84, SRID 4326 / 4979) convert `r` metres to lat/lon
2698/// spans via the standard flat-earth approximation with a `cos(lat)`
2699/// longitude-span correction; the factor is clamped away from zero
2700/// so near-pole circles don't collapse into a zero-width bbox (the
2701/// pathological case falls back to the full longitude range).
2702fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
2703    if center.is_geographic() {
2704        // One degree of latitude ≈ 111_320 metres on a spherical
2705        // Earth. One degree of longitude shrinks by cos(latitude).
2706        const METRES_PER_DEG: f64 = 111_320.0;
2707        let dlat = r / METRES_PER_DEG;
2708        let cos_lat = center.y.to_radians().cos().abs();
2709        // `cos(lat)` goes to 0 at the poles; clamp so a polar
2710        // circle maps to a wide-but-finite longitude span rather
2711        // than a divide-by-zero.
2712        let cos_lat_floor = cos_lat.max(1.0e-6);
2713        let dlon = r / (METRES_PER_DEG * cos_lat_floor);
2714        (
2715            center.x - dlon,
2716            center.y - dlat,
2717            center.x + dlon,
2718            center.y + dlat,
2719        )
2720    } else {
2721        (center.x - r, center.y - r, center.x + r, center.y + r)
2722    }
2723}
2724
2725/// Relationship-scope analogue of [`IndexSeekOp`]. Seeks edges
2726/// through the `(edge_type, property)` index, hydrates each one,
2727/// and emits a row per matching edge binding `edge_var`, `src_var`,
2728/// `dst_var`. For `Direction::Both` each edge emits two rows — one
2729/// per orientation — so an undirected pattern surfaces both
2730/// endpoint assignments. `residual_properties` carries any
2731/// non-indexed pattern-property equalities that still need
2732/// per-edge filtering.
2733struct EdgeSeekOp {
2734    edge_var: String,
2735    src_var: String,
2736    dst_var: String,
2737    edge_type: String,
2738    property: String,
2739    value_expr: Expr,
2740    direction: Direction,
2741    residual_properties: Vec<(String, Expr)>,
2742    /// Pre-materialized output rows. Built lazily on the first
2743    /// `next()` call so the seek + endpoint fetches run once.
2744    results: Option<Vec<Row>>,
2745    cursor: usize,
2746}
2747
2748impl EdgeSeekOp {
2749    #[allow(clippy::too_many_arguments)]
2750    fn new(
2751        edge_var: String,
2752        src_var: String,
2753        dst_var: String,
2754        edge_type: String,
2755        property: String,
2756        value_expr: Expr,
2757        direction: Direction,
2758        residual_properties: Vec<(String, Expr)>,
2759    ) -> Self {
2760        Self {
2761            edge_var,
2762            src_var,
2763            dst_var,
2764            edge_type,
2765            property,
2766            value_expr,
2767            direction,
2768            residual_properties,
2769            results: None,
2770            cursor: 0,
2771        }
2772    }
2773}
2774
2775impl Operator for EdgeSeekOp {
2776    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2777        if self.results.is_none() {
2778            let empty = Row::new();
2779            let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2780            let property = match seek_value {
2781                Value::Property(p) => p,
2782                Value::Null => Property::Null,
2783                Value::Node(_)
2784                | Value::Edge(_)
2785                | Value::List(_)
2786                | Value::Map(_)
2787                | Value::Path { .. } => {
2788                    return Err(Error::InvalidSetValue);
2789                }
2790            };
2791            let ids = ctx
2792                .store
2793                .edges_by_property(&self.edge_type, &self.property, &property)?;
2794            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
2795            for id in ids {
2796                let Some(edge) = ctx.store.get_edge(id)? else {
2797                    continue;
2798                };
2799                // Residual pattern-property equality filters — same
2800                // shape as `EdgeExpandOp`'s inline edge-properties
2801                // check. Uses an empty row for expr evaluation since
2802                // an EdgeSeek has no upstream input and the seek's
2803                // own bindings aren't visible to its own filters.
2804                let mut residuals_ok = true;
2805                for (key, expr) in &self.residual_properties {
2806                    let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2807                    let Some(stored) = edge.properties.get(key) else {
2808                        residuals_ok = false;
2809                        break;
2810                    };
2811                    if !values_equal(&Value::Property(stored.clone()), &wanted) {
2812                        residuals_ok = false;
2813                        break;
2814                    }
2815                }
2816                if !residuals_ok {
2817                    continue;
2818                }
2819                // Hydrate both endpoints so downstream operators
2820                // can read labels / properties off the bound vars.
2821                // A deleted endpoint drops the whole row (same as
2822                // `IndexSeekOp` when `get_node` returns None).
2823                let Some(src_node) = ctx.store.get_node(edge.source)? else {
2824                    continue;
2825                };
2826                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
2827                    continue;
2828                };
2829                // Emit output rows per direction. Outgoing / Incoming
2830                // bind a single orientation; Both yields two rows —
2831                // the `-[r]-` pattern in openCypher matches edges in
2832                // either orientation and binds endpoints accordingly.
2833                match self.direction {
2834                    Direction::Outgoing => {
2835                        rows.push(self.make_row(&edge, &src_node, &dst_node));
2836                    }
2837                    Direction::Incoming => {
2838                        rows.push(self.make_row(&edge, &dst_node, &src_node));
2839                    }
2840                    Direction::Both => {
2841                        rows.push(self.make_row(&edge, &src_node, &dst_node));
2842                        // Self-loops still only surface once — the
2843                        // second orientation is the same row.
2844                        if edge.source != edge.target {
2845                            rows.push(self.make_row(&edge, &dst_node, &src_node));
2846                        }
2847                    }
2848                }
2849            }
2850            self.results = Some(rows);
2851        }
2852        let rows = self.results.as_ref().unwrap();
2853        if self.cursor < rows.len() {
2854            let row = rows[self.cursor].clone();
2855            self.cursor += 1;
2856            return Ok(Some(row));
2857        }
2858        Ok(None)
2859    }
2860}
2861
2862impl EdgeSeekOp {
2863    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
2864        let mut row = Row::new();
2865        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
2866        row.insert(self.src_var.clone(), Value::Node(src.clone()));
2867        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
2868        row
2869    }
2870}
2871
2872/// Relationship-scope analogue of [`PointIndexSeekOp`]. Drives
2873/// `reader.edges_in_bbox(...)` off the bounds, then hydrates each
2874/// matching edge + its endpoints and emits rows binding
2875/// `edge_var`, `src_var`, `dst_var` — same shape as [`EdgeSeekOp`].
2876/// For `Direction::Both`, each edge emits two rows (one per
2877/// orientation), matching the `-[r]-` pattern semantics.
2878struct EdgePointIndexSeekOp {
2879    edge_var: String,
2880    src_var: String,
2881    dst_var: String,
2882    edge_type: String,
2883    property: String,
2884    direction: Direction,
2885    bounds: PointSeekBounds,
2886    results: Option<Vec<Row>>,
2887    cursor: usize,
2888}
2889
2890impl EdgePointIndexSeekOp {
2891    #[allow(clippy::too_many_arguments)]
2892    fn new(
2893        edge_var: String,
2894        src_var: String,
2895        dst_var: String,
2896        edge_type: String,
2897        property: String,
2898        direction: Direction,
2899        bounds: PointSeekBounds,
2900    ) -> Self {
2901        Self {
2902            edge_var,
2903            src_var,
2904            dst_var,
2905            edge_type,
2906            property,
2907            direction,
2908            bounds,
2909            results: None,
2910            cursor: 0,
2911        }
2912    }
2913
2914    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
2915        let mut row = Row::new();
2916        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
2917        row.insert(self.src_var.clone(), Value::Node(src.clone()));
2918        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
2919        row
2920    }
2921}
2922
2923impl Operator for EdgePointIndexSeekOp {
2924    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2925        if self.results.is_none() {
2926            let empty = Row::new();
2927            let ectx = ctx.eval_ctx(&empty);
2928            let ids = match &self.bounds {
2929                PointSeekBounds::Corners { lo, hi } => {
2930                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2931                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2932                    match (lo_pt, hi_pt) {
2933                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
2934                            &self.edge_type,
2935                            &self.property,
2936                            lo.srid,
2937                            lo.x,
2938                            lo.y,
2939                            hi.x,
2940                            hi.y,
2941                        )?,
2942                        _ => Vec::new(),
2943                    }
2944                }
2945                PointSeekBounds::Radius { center, radius } => {
2946                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
2947                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2948                    match (center_pt, radius_val) {
2949                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2950                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2951                            ctx.store.edges_in_bbox(
2952                                &self.edge_type,
2953                                &self.property,
2954                                c.srid,
2955                                xlo,
2956                                ylo,
2957                                xhi,
2958                                yhi,
2959                            )?
2960                        }
2961                        _ => Vec::new(),
2962                    }
2963                }
2964            };
2965
2966            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
2967            for id in ids {
2968                let Some(edge) = ctx.store.get_edge(id)? else {
2969                    continue;
2970                };
2971                let Some(src_node) = ctx.store.get_node(edge.source)? else {
2972                    continue;
2973                };
2974                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
2975                    continue;
2976                };
2977                match self.direction {
2978                    Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
2979                    Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
2980                    Direction::Both => {
2981                        rows.push(self.make_row(&edge, &src_node, &dst_node));
2982                        if edge.source != edge.target {
2983                            rows.push(self.make_row(&edge, &dst_node, &src_node));
2984                        }
2985                    }
2986                }
2987            }
2988            self.results = Some(rows);
2989        }
2990        let rows = self.results.as_ref().unwrap();
2991        if self.cursor < rows.len() {
2992            let row = rows[self.cursor].clone();
2993            self.cursor += 1;
2994            return Ok(Some(row));
2995        }
2996        Ok(None)
2997    }
2998}
2999
3000fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3001    props.iter().all(|(k, v)| {
3002        node.properties
3003            .get(k)
3004            .map(|stored| stored == v)
3005            .unwrap_or(false)
3006    })
3007}
3008
3009struct MergeNodeOp {
3010    var: String,
3011    labels: Vec<String>,
3012    /// Pattern property expressions as they came from the planner. These
3013    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
3014    /// isn't available until `next()` is called.
3015    properties: Vec<(String, Expr)>,
3016    /// `ON CREATE SET ...` assignments — applied only when the
3017    /// merge took the create branch. Evaluated against a row
3018    /// `{var → Node}` so the value expressions can reference the
3019    /// just-created node.
3020    on_create: Vec<SetAssignment>,
3021    /// `ON MATCH SET ...` assignments — applied to every matched
3022    /// node when the merge took the match branch. Same row shape
3023    /// as `on_create`.
3024    on_match: Vec<SetAssignment>,
3025    /// Optional upstream operator. `None` means this is a
3026    /// top-level producer (`MERGE (n) RETURN n`) and emits
3027    /// rows with a fresh empty base. `Some` means this is a
3028    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
3029    /// and each emitted row is a cross-join between an input
3030    /// row and a merge-result node.
3031    input: Option<Box<dyn Operator>>,
3032    /// Cached merge result. Populated on the first `next()`
3033    /// call by running the scan + maybe-create logic *once*,
3034    /// then reused for every input row. Running the merge
3035    /// exactly once sidesteps the read-after-write issue in
3036    /// buffered-writer mode (a node created by the first input
3037    /// row wouldn't be visible to a re-scan on the second).
3038    merged_nodes: Vec<Node>,
3039    /// Whether `merged_nodes` has been populated. The merge
3040    /// logic runs lazily on the first `next()` so
3041    /// `ExecCtx::params` is available.
3042    merge_done: bool,
3043    /// Current upstream row — held between calls while we drain
3044    /// `merged_nodes` against it. `None` for the top-level
3045    /// case (no input).
3046    current_input_row: Option<Row>,
3047    cursor: usize,
3048}
3049
3050impl MergeNodeOp {
3051    fn new(
3052        input: Option<Box<dyn Operator>>,
3053        var: String,
3054        labels: Vec<String>,
3055        properties: Vec<(String, Expr)>,
3056        on_create: Vec<SetAssignment>,
3057        on_match: Vec<SetAssignment>,
3058    ) -> Self {
3059        Self {
3060            var,
3061            labels,
3062            properties,
3063            on_create,
3064            on_match,
3065            input,
3066            merged_nodes: Vec::new(),
3067            merge_done: false,
3068            current_input_row: None,
3069            cursor: 0,
3070        }
3071    }
3072
3073    /// Run the MERGE logic exactly once: scan the store for
3074    /// existing matches, apply ON MATCH SET to each; or create
3075    /// a fresh node and apply ON CREATE SET; persist everything
3076    /// via `ctx.writer`; stash the resulting nodes in
3077    /// `self.merged_nodes`. Idempotent — subsequent calls are
3078    /// no-ops once `self.merge_done` is set.
3079    /// Resolve the pattern properties against `base`, scan the
3080    /// store, and either match existing nodes or create a fresh
3081    /// one. Returns the resulting node set — can be called
3082    /// multiple times with different `base` rows for
3083    /// input-driven merges.
3084    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3085        let resolved_props: Vec<(String, Property)> = self
3086            .properties
3087            .iter()
3088            .map(|(k, expr)| {
3089                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3090                Ok((k.clone(), value_to_property(v)?))
3091            })
3092            .collect::<Result<Vec<_>>>()?;
3093
3094        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3095            ctx.store.nodes_by_label(primary)?
3096        } else {
3097            ctx.store.all_node_ids()?
3098        };
3099        let mut merged_nodes: Vec<Node> = Vec::new();
3100        for id in candidate_ids {
3101            if let Some(node) = ctx.store.get_node(id)? {
3102                if has_all_labels(&node, &self.labels)
3103                    && matches_pattern_props(&node, &resolved_props)
3104                {
3105                    merged_nodes.push(node);
3106                }
3107            }
3108        }
3109
3110        if merged_nodes.is_empty() {
3111            let mut node = Node::new();
3112            for label in &self.labels {
3113                node.labels.push(label.clone());
3114            }
3115            for (k, prop) in resolved_props {
3116                node.properties.insert(k, prop);
3117            }
3118            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3119            ctx.writer.put_node(&node)?;
3120            merged_nodes.push(node);
3121        } else if !self.on_match.is_empty() {
3122            for node in merged_nodes.iter_mut() {
3123                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3124                ctx.writer.put_node(node)?;
3125            }
3126        }
3127        Ok(merged_nodes)
3128    }
3129}
3130
3131impl Operator for MergeNodeOp {
3132    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3133        // Top-level producer: no upstream context, so the pattern
3134        // properties can only reference literals / parameters.
3135        // Run the merge once against an empty row, then emit
3136        // each result.
3137        if self.input.is_none() {
3138            if !self.merge_done {
3139                let empty = Row::new();
3140                let nodes = self.run_merge_for(ctx, &empty)?;
3141                self.merged_nodes = nodes;
3142                self.merge_done = true;
3143            }
3144            if self.cursor < self.merged_nodes.len() {
3145                let node = self.merged_nodes[self.cursor].clone();
3146                self.cursor += 1;
3147                let mut row = Row::new();
3148                row.insert(self.var.clone(), Value::Node(node));
3149                return Ok(Some(row));
3150            }
3151            return Ok(None);
3152        }
3153
3154        // Input-driven case: evaluate pattern properties *per
3155        // input row* so references like `MERGE (:City {name:
3156        // person.bornIn})` resolve against the bound `person`.
3157        // Each incoming row produces its own merged-node set,
3158        // which is then cross-joined with that row before
3159        // emission.
3160        loop {
3161            if let Some(base) = self.current_input_row.as_ref() {
3162                if self.cursor < self.merged_nodes.len() {
3163                    let node = self.merged_nodes[self.cursor].clone();
3164                    self.cursor += 1;
3165                    let mut row = base.clone();
3166                    row.insert(self.var.clone(), Value::Node(node));
3167                    return Ok(Some(row));
3168                }
3169            }
3170            match self.input.as_mut().unwrap().next(ctx)? {
3171                None => return Ok(None),
3172                Some(row) => {
3173                    let nodes = self.run_merge_for(ctx, &row)?;
3174                    self.merged_nodes = nodes;
3175                    self.cursor = 0;
3176                    self.current_input_row = Some(row);
3177                }
3178            }
3179        }
3180    }
3181}
3182
3183/// Apply MERGE-conditional SET assignments (`ON CREATE` or
3184/// Find-or-create executor for edge MERGE
3185/// (`MERGE (a)-[r:KNOWS]->(b)`).
3186///
3187/// For every row pulled from `input`, looks up the `src_var`
3188/// and `dst_var` bindings (which must be `Value::Node` — the
3189/// planner enforces that they came from a prior MATCH or
3190/// MERGE), scans `src`'s outgoing edges, and either:
3191///
3192/// - Picks the first edge of type `edge_type` whose target is
3193///   `dst` and applies `on_match` to it, or
3194/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
3195///   `on_create`, and persists it via `ctx.writer.put_edge`.
3196///
3197/// Either way, the resulting edge is bound into `edge_var` in
3198/// the output row and the row is emitted. v1 restrictions:
3199/// single directed hop, both endpoints already bound,
3200/// explicit relationship type.
3201struct MergeEdgeOp {
3202    input: Box<dyn Operator>,
3203    edge_var: String,
3204    src_var: String,
3205    dst_var: String,
3206    edge_type: String,
3207    undirected: bool,
3208    /// Inline edge property filter from the MERGE pattern
3209    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
3210    /// the create branch stamps them onto the new edge.
3211    properties: Vec<(String, Expr)>,
3212    on_create: Vec<SetAssignment>,
3213    on_match: Vec<SetAssignment>,
3214    /// Rows buffered from the current input row's MERGE result —
3215    /// one per existing matched edge (or a single synthesized edge
3216    /// if the create branch fired). Drained before the next
3217    /// `input.next()` call so multi-match semantics stay correct:
3218    /// `MATCH (a:A),(b:B) MERGE (a)-[r:T]->(b)` against two pre-
3219    /// existing edges has to yield two rows, not one.
3220    pending: std::collections::VecDeque<Row>,
3221}
3222
3223impl MergeEdgeOp {
3224    #[allow(clippy::too_many_arguments)]
3225    fn new(
3226        input: Box<dyn Operator>,
3227        edge_var: String,
3228        src_var: String,
3229        dst_var: String,
3230        edge_type: String,
3231        undirected: bool,
3232        properties: Vec<(String, Expr)>,
3233        on_create: Vec<SetAssignment>,
3234        on_match: Vec<SetAssignment>,
3235    ) -> Self {
3236        Self {
3237            input,
3238            edge_var,
3239            src_var,
3240            dst_var,
3241            edge_type,
3242            undirected,
3243            properties,
3244            on_create,
3245            on_match,
3246            pending: std::collections::VecDeque::new(),
3247        }
3248    }
3249}
3250
3251impl Operator for MergeEdgeOp {
3252    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3253        loop {
3254            if let Some(row) = self.pending.pop_front() {
3255                return Ok(Some(row));
3256            }
3257            let Some(row) = self.input.next(ctx)? else {
3258                return Ok(None);
3259            };
3260            // Resolve src/dst. Both must be Value::Node — the
3261            // planner enforces that the variables came from an
3262            // earlier producer, so anything else is a bug or a
3263            // later-added feature that didn't update the check.
3264            let src_node = match row.get(&self.src_var) {
3265                Some(Value::Node(n)) => n.clone(),
3266                _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3267            };
3268            let dst_node = match row.get(&self.dst_var) {
3269                Some(Value::Node(n)) => n.clone(),
3270                _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3271            };
3272
3273            // Evaluate the inline edge property filter once per
3274            // input row. These are AST expressions so they can
3275            // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
3276            let required_props: Vec<(String, Property)> = self
3277                .properties
3278                .iter()
3279                .map(|(k, expr)| {
3280                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3281                    Ok((k.clone(), value_to_property(v)?))
3282                })
3283                .collect::<Result<Vec<_>>>()?;
3284            let edge_matches = |edge: &Edge| -> bool {
3285                required_props.iter().all(|(k, want)| {
3286                    edge.properties
3287                        .get(k)
3288                        .map(|have| have == want)
3289                        .unwrap_or(false)
3290                })
3291            };
3292
3293            // Collect every edge of type `edge_type` from src to
3294            // dst (and, for undirected patterns, dst to src) that
3295            // also satisfies the inline property filter. If any
3296            // exist we take the match branch and yield one row per
3297            // match. If none exist we synthesize one and yield a
3298            // single row from the create branch.
3299            let mut matched: Vec<Edge> = Vec::new();
3300            for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3301                if neighbor_id != dst_node.id {
3302                    continue;
3303                }
3304                if let Some(edge) = ctx.store.get_edge(edge_id)? {
3305                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
3306                        matched.push(edge);
3307                    }
3308                }
3309            }
3310            if self.undirected {
3311                for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3312                    if neighbor_id != dst_node.id {
3313                        continue;
3314                    }
3315                    if let Some(edge) = ctx.store.get_edge(edge_id)? {
3316                        if edge.edge_type == self.edge_type && edge_matches(&edge) {
3317                            matched.push(edge);
3318                        }
3319                    }
3320                }
3321            }
3322
3323            if matched.is_empty() {
3324                let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3325                for (k, p) in &required_props {
3326                    new_edge.properties.insert(k.clone(), p.clone());
3327                }
3328                let mut row_out = row.clone();
3329                apply_merge_edge_actions(
3330                    &mut new_edge,
3331                    &self.on_create,
3332                    &self.edge_var,
3333                    ctx,
3334                    &mut row_out,
3335                )?;
3336                ctx.writer.put_edge(&new_edge)?;
3337                row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3338                self.pending.push_back(row_out);
3339            } else {
3340                for mut existing in matched {
3341                    let mut row_out = row.clone();
3342                    if !self.on_match.is_empty() {
3343                        apply_merge_edge_actions(
3344                            &mut existing,
3345                            &self.on_match,
3346                            &self.edge_var,
3347                            ctx,
3348                            &mut row_out,
3349                        )?;
3350                        ctx.writer.put_edge(&existing)?;
3351                    }
3352                    row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3353                    self.pending.push_back(row_out);
3354                }
3355            }
3356        }
3357    }
3358}
3359
3360/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
3361/// each `SetAssignment` against the outer `row` (augmented with
3362/// the current edge binding) and mutates either the edge itself
3363/// or a non-edge target node from the outer row. MERGE's ON
3364/// CREATE / ON MATCH clauses are scoped against the whole input
3365/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
3366/// reach outside the edge-local binding. Non-edge mutations
3367/// are persisted via `ctx.writer.put_node` so the change is
3368/// visible to later clauses.
3369fn apply_merge_edge_actions(
3370    edge: &mut Edge,
3371    actions: &[SetAssignment],
3372    var: &str,
3373    exec_ctx: &ExecCtx,
3374    outer: &mut Row,
3375) -> Result<()> {
3376    if actions.is_empty() {
3377        return Ok(());
3378    }
3379    // Edge binding is live in `outer` while we evaluate — RHS can
3380    // reference both the edge and any sibling node binding.
3381    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3382    for action in actions {
3383        match action {
3384            SetAssignment::Property {
3385                var: target,
3386                key,
3387                value,
3388            } => {
3389                let sub_ctx = exec_ctx.eval_ctx(outer);
3390                let evaluated = eval_expr(value, &sub_ctx)?;
3391                let prop = value_to_property(evaluated)?;
3392                if target == var {
3393                    if matches!(prop, Property::Null) {
3394                        edge.properties.remove(key);
3395                    } else {
3396                        edge.properties.insert(key.clone(), prop);
3397                    }
3398                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3399                } else {
3400                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3401                }
3402            }
3403            SetAssignment::Merge {
3404                var: target,
3405                properties,
3406            } => {
3407                let sub_ctx = exec_ctx.eval_ctx(outer);
3408                let resolved: Vec<(String, Property)> = properties
3409                    .iter()
3410                    .map(|(k, expr)| {
3411                        let v = eval_expr(expr, &sub_ctx)?;
3412                        Ok((k.clone(), value_to_property(v)?))
3413                    })
3414                    .collect::<Result<Vec<_>>>()?;
3415                if target == var {
3416                    for (k, p) in resolved {
3417                        edge.properties.insert(k, p);
3418                    }
3419                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3420                } else {
3421                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3422                }
3423            }
3424            SetAssignment::Replace {
3425                var: target,
3426                properties,
3427            } => {
3428                let sub_ctx = exec_ctx.eval_ctx(outer);
3429                let resolved: Vec<(String, Property)> = properties
3430                    .iter()
3431                    .map(|(k, expr)| {
3432                        let v = eval_expr(expr, &sub_ctx)?;
3433                        Ok((k.clone(), value_to_property(v)?))
3434                    })
3435                    .collect::<Result<Vec<_>>>()?;
3436                if target == var {
3437                    edge.properties.clear();
3438                    for (k, p) in resolved {
3439                        edge.properties.insert(k, p);
3440                    }
3441                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3442                } else {
3443                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3444                }
3445            }
3446            SetAssignment::Labels {
3447                var: target,
3448                labels,
3449            } => {
3450                if target == var {
3451                    // Edges don't carry labels.
3452                    return Err(Error::UnboundVariable(target.clone()));
3453                }
3454                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3455            }
3456            SetAssignment::ReplaceFromExpr {
3457                var: target,
3458                source,
3459                replace,
3460            } => {
3461                let sub_ctx = exec_ctx.eval_ctx(outer);
3462                let v = eval_expr(source, &sub_ctx)?;
3463                let props = extract_property_map(&v)?;
3464                if target == var {
3465                    if *replace {
3466                        edge.properties.clear();
3467                    }
3468                    for (k, p) in props {
3469                        edge.properties.insert(k, p);
3470                    }
3471                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3472                } else {
3473                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3474                }
3475            }
3476        }
3477    }
3478    Ok(())
3479}
3480
3481/// Apply a single `SET target.key = prop` to a node or edge bound
3482/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
3483/// the target isn't the merge edge itself (the common case being
3484/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
3485fn apply_set_prop_to_outer(
3486    outer: &mut Row,
3487    exec_ctx: &ExecCtx,
3488    target: &str,
3489    key: &str,
3490    prop: Property,
3491) -> Result<()> {
3492    match outer.get_mut(target) {
3493        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3494            // SET on a null target (typically an unmatched OPTIONAL
3495            // MATCH binding) is a silent no-op in openCypher.
3496            return Ok(());
3497        }
3498        Some(Value::Node(n)) => {
3499            if matches!(prop, Property::Null) {
3500                n.properties.remove(key);
3501            } else {
3502                n.properties.insert(key.to_string(), prop);
3503            }
3504            exec_ctx.writer.put_node(n)?;
3505        }
3506        Some(Value::Edge(e)) => {
3507            if matches!(prop, Property::Null) {
3508                e.properties.remove(key);
3509            } else {
3510                e.properties.insert(key.to_string(), prop);
3511            }
3512            exec_ctx.writer.put_edge(e)?;
3513        }
3514        _ => return Err(Error::UnboundVariable(target.to_string())),
3515    }
3516    Ok(())
3517}
3518
3519/// Apply a property-map assignment (`SET target = {..}` when
3520/// `replace`, or `SET target += {..}` when not) to a node or
3521/// edge bound in the outer row.
3522fn apply_set_map_to_outer(
3523    outer: &mut Row,
3524    exec_ctx: &ExecCtx,
3525    target: &str,
3526    props: Vec<(String, Property)>,
3527    replace: bool,
3528) -> Result<()> {
3529    match outer.get_mut(target) {
3530        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3531        Some(Value::Node(n)) => {
3532            if replace {
3533                n.properties.clear();
3534            }
3535            for (k, p) in props {
3536                if replace || !matches!(p, Property::Null) {
3537                    n.properties.insert(k, p);
3538                } else {
3539                    n.properties.remove(&k);
3540                }
3541            }
3542            exec_ctx.writer.put_node(n)?;
3543            Ok(())
3544        }
3545        Some(Value::Edge(e)) => {
3546            if replace {
3547                e.properties.clear();
3548            }
3549            for (k, p) in props {
3550                if replace || !matches!(p, Property::Null) {
3551                    e.properties.insert(k, p);
3552                } else {
3553                    e.properties.remove(&k);
3554                }
3555            }
3556            exec_ctx.writer.put_edge(e)?;
3557            Ok(())
3558        }
3559        _ => Err(Error::UnboundVariable(target.to_string())),
3560    }
3561}
3562
3563/// Apply a labels assignment to a node bound in the outer row.
3564fn apply_set_labels_to_outer(
3565    outer: &mut Row,
3566    exec_ctx: &ExecCtx,
3567    target: &str,
3568    labels: &[String],
3569) -> Result<()> {
3570    match outer.get_mut(target) {
3571        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3572        Some(Value::Node(n)) => {
3573            for label in labels {
3574                if !n.labels.contains(label) {
3575                    n.labels.push(label.clone());
3576                }
3577            }
3578            exec_ctx.writer.put_node(n)?;
3579            Ok(())
3580        }
3581        _ => Err(Error::UnboundVariable(target.to_string())),
3582    }
3583}
3584
3585/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
3586/// dispatch but specialized to a single bound variable so we
3587/// don't have to materialize a full row dispatcher.
3588///
3589/// Value expressions are evaluated against a temporary row
3590/// `{var → Node(node.clone())}` so the RHS can reference the
3591/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
3592/// works the same as if the SET ran in a SetPropertyOp pipeline.
3593fn apply_merge_actions(
3594    node: &mut Node,
3595    actions: &[SetAssignment],
3596    var: &str,
3597    exec_ctx: &ExecCtx,
3598    base_row: &Row,
3599) -> Result<()> {
3600    if actions.is_empty() {
3601        return Ok(());
3602    }
3603    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
3604    // can resolve `other` — then overlay the merged node under `var`.
3605    let mut row = base_row.clone();
3606    row.insert(var.to_string(), Value::Node(node.clone()));
3607    for action in actions {
3608        let sub_ctx = exec_ctx.eval_ctx(&row);
3609        match action {
3610            SetAssignment::Property {
3611                var: target,
3612                key,
3613                value,
3614            } => {
3615                if target != var {
3616                    return Err(Error::UnboundVariable(target.clone()));
3617                }
3618                let evaluated = eval_expr(value, &sub_ctx)?;
3619                let prop = value_to_property(evaluated)?;
3620                node.properties.insert(key.clone(), prop);
3621                row.insert(var.to_string(), Value::Node(node.clone()));
3622            }
3623            SetAssignment::Labels {
3624                var: target,
3625                labels,
3626            } => {
3627                if target != var {
3628                    return Err(Error::UnboundVariable(target.clone()));
3629                }
3630                for label in labels {
3631                    if !node.labels.contains(label) {
3632                        node.labels.push(label.clone());
3633                    }
3634                }
3635                row.insert(var.to_string(), Value::Node(node.clone()));
3636            }
3637            SetAssignment::Replace {
3638                var: target,
3639                properties,
3640            } => {
3641                if target != var {
3642                    return Err(Error::UnboundVariable(target.clone()));
3643                }
3644                let resolved: Vec<(String, Property)> = properties
3645                    .iter()
3646                    .map(|(k, expr)| {
3647                        let v = eval_expr(expr, &sub_ctx)?;
3648                        Ok((k.clone(), value_to_property(v)?))
3649                    })
3650                    .collect::<Result<Vec<_>>>()?;
3651                node.properties.clear();
3652                for (k, p) in resolved {
3653                    node.properties.insert(k, p);
3654                }
3655                row.insert(var.to_string(), Value::Node(node.clone()));
3656            }
3657            SetAssignment::Merge {
3658                var: target,
3659                properties,
3660            } => {
3661                if target != var {
3662                    return Err(Error::UnboundVariable(target.clone()));
3663                }
3664                let resolved: Vec<(String, Property)> = properties
3665                    .iter()
3666                    .map(|(k, expr)| {
3667                        let v = eval_expr(expr, &sub_ctx)?;
3668                        Ok((k.clone(), value_to_property(v)?))
3669                    })
3670                    .collect::<Result<Vec<_>>>()?;
3671                for (k, p) in resolved {
3672                    node.properties.insert(k, p);
3673                }
3674                row.insert(var.to_string(), Value::Node(node.clone()));
3675            }
3676            SetAssignment::ReplaceFromExpr {
3677                var: target,
3678                source,
3679                replace,
3680            } => {
3681                if target != var {
3682                    return Err(Error::UnboundVariable(target.clone()));
3683                }
3684                let v = eval_expr(source, &sub_ctx)?;
3685                let props = extract_property_map(&v)?;
3686                if *replace {
3687                    node.properties.clear();
3688                }
3689                for (k, p) in props {
3690                    node.properties.insert(k, p);
3691                }
3692                row.insert(var.to_string(), Value::Node(node.clone()));
3693            }
3694        }
3695    }
3696    Ok(())
3697}
3698
3699struct EdgeExpandOp {
3700    input: Box<dyn Operator>,
3701    src_var: String,
3702    edge_var: Option<String>,
3703    dst_var: String,
3704    dst_labels: Vec<String>,
3705    edge_properties: Vec<(String, Expr)>,
3706    edge_types: Vec<String>,
3707    direction: Direction,
3708    /// When set, only the specific edge whose id matches the
3709    /// row-bound value counts as a match. Used by fresh-scan
3710    /// MATCH patterns that reuse an edge variable from a prior
3711    /// clause so the new hop stays an existence check instead of
3712    /// rebinding and clobbering the outer edge.
3713    edge_constraint_var: Option<String>,
3714    current_row: Option<Row>,
3715    pending: Vec<(EdgeId, NodeId)>,
3716    pending_idx: usize,
3717}
3718
3719impl EdgeExpandOp {
3720    #[allow(clippy::too_many_arguments)]
3721    fn new(
3722        input: Box<dyn Operator>,
3723        src_var: String,
3724        edge_var: Option<String>,
3725        dst_var: String,
3726        dst_labels: Vec<String>,
3727        edge_properties: Vec<(String, Expr)>,
3728        edge_types: Vec<String>,
3729        direction: Direction,
3730        edge_constraint_var: Option<String>,
3731    ) -> Self {
3732        Self {
3733            input,
3734            src_var,
3735            edge_var,
3736            dst_var,
3737            dst_labels,
3738            edge_properties,
3739            edge_types,
3740            direction,
3741            edge_constraint_var,
3742            current_row: None,
3743            pending: Vec::new(),
3744            pending_idx: 0,
3745        }
3746    }
3747}
3748
3749impl Operator for EdgeExpandOp {
3750    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3751        loop {
3752            while self.pending_idx < self.pending.len() {
3753                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3754                self.pending_idx += 1;
3755
3756                let edge = match ctx.store.get_edge(edge_id)? {
3757                    Some(e) => e,
3758                    None => continue,
3759                };
3760                if !self.edge_types.is_empty()
3761                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3762                {
3763                    continue;
3764                }
3765                // Pre-bound edge constraint: only accept the edge
3766                // whose id matches the row-bound value. Falls
3767                // through to outer scopes so a fresh right-side
3768                // scan of a CartesianProduct can still see the
3769                // bound edge from the left side. Non-edge / null
3770                // bindings trigger no matches at all — the
3771                // expansion yields nothing for that input row.
3772                if let Some(constraint_var) = &self.edge_constraint_var {
3773                    let base = self
3774                        .current_row
3775                        .as_ref()
3776                        .expect("pending edges without source row");
3777                    let expected = match ctx.lookup_binding(base, constraint_var) {
3778                        Some(Value::Edge(e)) => Some(e.id),
3779                        _ => None,
3780                    };
3781                    match expected {
3782                        Some(id) if id != edge.id => continue,
3783                        None => continue,
3784                        _ => {}
3785                    }
3786                }
3787                // Inline edge property filter: every (key, value)
3788                // must match the traversed edge's property of the
3789                // same name via `=` equality. Missing keys fail the
3790                // check (matching Neo4j).
3791                if !self.edge_properties.is_empty() {
3792                    let base = self
3793                        .current_row
3794                        .as_ref()
3795                        .expect("pending edges without source row");
3796                    let ectx = ctx.eval_ctx(base);
3797                    let mut ok = true;
3798                    for (key, expr) in &self.edge_properties {
3799                        let expected = eval_expr(expr, &ectx)?;
3800                        let actual = match edge.properties.get(key) {
3801                            Some(v) => Value::Property(v.clone()),
3802                            None => {
3803                                ok = false;
3804                                break;
3805                            }
3806                        };
3807                        if !values_equal(&actual, &expected) {
3808                            ok = false;
3809                            break;
3810                        }
3811                    }
3812                    if !ok {
3813                        continue;
3814                    }
3815                }
3816
3817                let neighbor = match ctx.store.get_node(neighbor_id)? {
3818                    Some(n) => n,
3819                    None => continue,
3820                };
3821                if !has_all_labels(&neighbor, &self.dst_labels) {
3822                    continue;
3823                }
3824
3825                let base = self
3826                    .current_row
3827                    .as_ref()
3828                    .expect("pending edges without source row");
3829                let mut out = base.clone();
3830                if let Some(ev) = &self.edge_var {
3831                    out.insert(ev.clone(), Value::Edge(edge));
3832                }
3833                out.insert(self.dst_var.clone(), Value::Node(neighbor));
3834                return Ok(Some(out));
3835            }
3836
3837            match self.input.next(ctx)? {
3838                None => return Ok(None),
3839                Some(row) => {
3840                    let src_id = match row.get(&self.src_var) {
3841                        Some(Value::Node(n)) => n.id,
3842                        // A null source (e.g. from OPTIONAL MATCH
3843                        // that matched nothing) drops the input
3844                        // row — `MATCH (a)-->(b)` against a null
3845                        // `a` is just empty, not an error.
3846                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3847                            continue
3848                        }
3849                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3850                    };
3851                    self.pending = match self.direction {
3852                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
3853                        Direction::Incoming => ctx.store.incoming(src_id)?,
3854                        Direction::Both => {
3855                            // For undirected traversal, a self-loop
3856                            // appears once as outgoing and once as
3857                            // incoming. The Cypher spec visits each
3858                            // physical edge once, so dedupe by
3859                            // edge_id before emitting.
3860                            let mut all = ctx.store.outgoing(src_id)?;
3861                            let mut seen: std::collections::HashSet<EdgeId> =
3862                                all.iter().map(|(e, _)| *e).collect();
3863                            for (e, n) in ctx.store.incoming(src_id)? {
3864                                if seen.insert(e) {
3865                                    all.push((e, n));
3866                                }
3867                            }
3868                            all
3869                        }
3870                    };
3871                    self.pending_idx = 0;
3872                    self.current_row = Some(row);
3873                }
3874            }
3875        }
3876    }
3877}
3878
3879/// Left-join variant of [`EdgeExpandOp`]. For each input row,
3880/// expands the adjacency in the configured direction and
3881/// filters by `edge_type` / `dst_labels` — if **any** neighbor
3882/// survives the filters, emits rows exactly like
3883/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
3884/// that carries the input row's bindings plus `edge_var` /
3885/// `dst_var` set to `Value::Null`, preserving the input row in
3886/// the output stream. This is the left-outer-join semantics
3887/// OPTIONAL MATCH needs.
3888///
3889/// Tracks per-input-row whether any output was produced so the
3890/// fallback Null row is only emitted after the pending buffer
3891/// drains without yielding anything. The `yielded_for_current`
3892/// flag is reset whenever a new input row is pulled.
3893struct OptionalEdgeExpandOp {
3894    input: Box<dyn Operator>,
3895    src_var: String,
3896    edge_var: Option<String>,
3897    dst_var: String,
3898    dst_labels: Vec<String>,
3899    dst_properties: Vec<(String, Expr)>,
3900    edge_types: Vec<String>,
3901    direction: Direction,
3902    /// When set, edges whose target id differs from the node
3903    /// bound at this variable in the current row are skipped
3904    /// inside the expansion loop. A row whose outgoing edges all
3905    /// fail the constraint then triggers the same null-fallback
3906    /// as having no edges at all.
3907    dst_constraint_var: Option<String>,
3908    /// When set, the expansion only considers the single edge
3909    /// whose id matches the edge already bound at this row
3910    /// variable — used when the OPTIONAL MATCH pattern reuses
3911    /// an edge variable from a prior clause.
3912    edge_constraint_var: Option<String>,
3913    current_row: Option<Row>,
3914    pending: Vec<(EdgeId, NodeId)>,
3915    pending_idx: usize,
3916    yielded_for_current: bool,
3917}
3918
3919impl OptionalEdgeExpandOp {
3920    #[allow(clippy::too_many_arguments)]
3921    fn new(
3922        input: Box<dyn Operator>,
3923        src_var: String,
3924        edge_var: Option<String>,
3925        dst_var: String,
3926        dst_labels: Vec<String>,
3927        dst_properties: Vec<(String, Expr)>,
3928        edge_types: Vec<String>,
3929        direction: Direction,
3930        dst_constraint_var: Option<String>,
3931        edge_constraint_var: Option<String>,
3932    ) -> Self {
3933        Self {
3934            input,
3935            src_var,
3936            edge_var,
3937            dst_var,
3938            dst_labels,
3939            dst_properties,
3940            edge_types,
3941            direction,
3942            dst_constraint_var,
3943            edge_constraint_var,
3944            current_row: None,
3945            pending: Vec::new(),
3946            pending_idx: 0,
3947            yielded_for_current: false,
3948        }
3949    }
3950}
3951
3952impl Operator for OptionalEdgeExpandOp {
3953    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3954        loop {
3955            while self.pending_idx < self.pending.len() {
3956                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3957                self.pending_idx += 1;
3958
3959                let edge = match ctx.store.get_edge(edge_id)? {
3960                    Some(e) => e,
3961                    None => continue,
3962                };
3963                if !self.edge_types.is_empty()
3964                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3965                {
3966                    continue;
3967                }
3968                // Pre-bound edge constraint: only the specific
3969                // edge whose id matches the row-bound value
3970                // counts as a match. Falls through to outer
3971                // scopes so the constraint works for fresh
3972                // scans on the right side of a CartesianProduct.
3973                if let Some(constraint_var) = &self.edge_constraint_var {
3974                    let base = self
3975                        .current_row
3976                        .as_ref()
3977                        .expect("pending without source row");
3978                    let expected = match ctx.lookup_binding(base, constraint_var) {
3979                        Some(Value::Edge(e)) => Some(e.id),
3980                        _ => None,
3981                    };
3982                    match expected {
3983                        Some(id) if id != edge.id => continue,
3984                        None => continue,
3985                        _ => {}
3986                    }
3987                }
3988
3989                let neighbor = match ctx.store.get_node(neighbor_id)? {
3990                    Some(n) => n,
3991                    None => continue,
3992                };
3993                if !has_all_labels(&neighbor, &self.dst_labels) {
3994                    continue;
3995                }
3996                // Bound-endpoint constraint: when the declared
3997                // target is already bound in the row, only edges
3998                // that lead to that exact node count as a match.
3999                // Edges failing the constraint are silently
4000                // skipped — if every candidate fails, the
4001                // per-row left-join fallback below still fires.
4002                if let Some(constraint_var) = &self.dst_constraint_var {
4003                    let base = self
4004                        .current_row
4005                        .as_ref()
4006                        .expect("pending without source row");
4007                    let bound_id = match base.get(constraint_var) {
4008                        Some(Value::Node(n)) => Some(n.id),
4009                        Some(Value::Null)
4010                        | Some(Value::Property(meshdb_core::Property::Null))
4011                        | None => None,
4012                        _ => None,
4013                    };
4014                    match bound_id {
4015                        Some(id) if id != neighbor.id => continue,
4016                        None => continue,
4017                        _ => {}
4018                    }
4019                }
4020                if !self.dst_properties.is_empty() {
4021                    let base = self
4022                        .current_row
4023                        .as_ref()
4024                        .expect("pending without source row");
4025                    let ectx = ctx.eval_ctx(base);
4026                    let mut props_ok = true;
4027                    for (key, expr) in &self.dst_properties {
4028                        let expected = eval_expr(expr, &ectx)?;
4029                        let actual = neighbor
4030                            .properties
4031                            .get(key)
4032                            .cloned()
4033                            .map(Value::Property)
4034                            .unwrap_or(Value::Null);
4035                        if !values_equal(&expected, &actual) {
4036                            props_ok = false;
4037                            break;
4038                        }
4039                    }
4040                    if !props_ok {
4041                        continue;
4042                    }
4043                }
4044
4045                let base = self
4046                    .current_row
4047                    .as_ref()
4048                    .expect("pending edges without source row");
4049                let mut out = base.clone();
4050                if let Some(ev) = &self.edge_var {
4051                    out.insert(ev.clone(), Value::Edge(edge));
4052                }
4053                out.insert(self.dst_var.clone(), Value::Node(neighbor));
4054                self.yielded_for_current = true;
4055                return Ok(Some(out));
4056            }
4057
4058            // Pending drained for the current row. If nothing was
4059            // yielded, emit the left-join fallback: preserve the
4060            // input row with the optional variables set to Null.
4061            //
4062            // Exception: when an edge / dst constraint names a
4063            // variable that was pre-bound in the input row, the
4064            // fallback must NOT clobber that variable — the
4065            // constraint turns the expansion into an existence
4066            // check and the outer value should survive through.
4067            if let Some(base) = self.current_row.take() {
4068                if !self.yielded_for_current {
4069                    let mut out = base;
4070                    if let Some(ev) = &self.edge_var {
4071                        let preserve = self
4072                            .edge_constraint_var
4073                            .as_ref()
4074                            .map(|c| c == ev)
4075                            .unwrap_or(false);
4076                        if !preserve {
4077                            out.insert(ev.clone(), Value::Null);
4078                        }
4079                    }
4080                    let preserve_dst = self
4081                        .dst_constraint_var
4082                        .as_ref()
4083                        .map(|c| c == &self.dst_var)
4084                        .unwrap_or(false);
4085                    if !preserve_dst {
4086                        out.insert(self.dst_var.clone(), Value::Null);
4087                    }
4088                    self.yielded_for_current = true;
4089                    return Ok(Some(out));
4090                }
4091            }
4092
4093            match self.input.next(ctx)? {
4094                None => return Ok(None),
4095                Some(row) => {
4096                    let src_id = match row.get(&self.src_var) {
4097                        Some(Value::Node(n)) => n.id,
4098                        // src_var is Null (because a prior
4099                        // OPTIONAL MATCH chained before this one
4100                        // Null-bound it). Skip adjacency entirely
4101                        // and fall through to the fallback Null
4102                        // row so downstream clauses see the
4103                        // preserved input.
4104                        Some(Value::Null) => {
4105                            self.pending = Vec::new();
4106                            self.pending_idx = 0;
4107                            self.yielded_for_current = false;
4108                            self.current_row = Some(row);
4109                            continue;
4110                        }
4111                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4112                    };
4113                    self.pending = match self.direction {
4114                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
4115                        Direction::Incoming => ctx.store.incoming(src_id)?,
4116                        Direction::Both => {
4117                            // For undirected traversal, a self-loop
4118                            // appears once as outgoing and once as
4119                            // incoming. The Cypher spec visits each
4120                            // physical edge once, so dedupe by
4121                            // edge_id before emitting.
4122                            let mut all = ctx.store.outgoing(src_id)?;
4123                            let mut seen: std::collections::HashSet<EdgeId> =
4124                                all.iter().map(|(e, _)| *e).collect();
4125                            for (e, n) in ctx.store.incoming(src_id)? {
4126                                if seen.insert(e) {
4127                                    all.push((e, n));
4128                                }
4129                            }
4130                            all
4131                        }
4132                    };
4133                    self.pending_idx = 0;
4134                    self.yielded_for_current = false;
4135                    self.current_row = Some(row);
4136                }
4137            }
4138        }
4139    }
4140}
4141
4142struct VarLengthExpandOp {
4143    input: Box<dyn Operator>,
4144    src_var: String,
4145    edge_var: Option<String>,
4146    dst_var: String,
4147    dst_labels: Vec<String>,
4148    edge_types: Vec<String>,
4149    /// Per-edge property filter — every edge along the walked
4150    /// path must have these `(key, value)` pairs. Mirrors the
4151    /// inline filter on `EdgeExpandOp`; applied during DFS so
4152    /// failing edges prune the branch instead of generating
4153    /// wrong-length results.
4154    edge_properties: Vec<(String, Expr)>,
4155    direction: Direction,
4156    min_hops: u64,
4157    max_hops: u64,
4158    path_var: Option<String>,
4159    /// Per-row left-join mode: when `true`, an input row that
4160    /// produces no matching paths still emits one row with the
4161    /// expansion's output vars bound to Null. Set for
4162    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
4163    /// even when the path search is empty.
4164    optional: bool,
4165    /// When set, paths whose terminal node id differs from the
4166    /// node bound at this variable in the current row are
4167    /// filtered out before counting as a match. Combined with
4168    /// `optional`, an input row whose candidate paths all miss
4169    /// the bound target triggers the null-fallback instead of
4170    /// silently dropping.
4171    dst_constraint_var: Option<String>,
4172    /// Replay mode: read the walked edge sequence from the list
4173    /// already bound at this row variable instead of doing DFS.
4174    /// Used by openCypher's "walk a pre-bound edge list" form.
4175    bound_edge_list_var: Option<String>,
4176    /// Row variables whose bound edges (or edge lists) must not
4177    /// appear in the walked path — enforces openCypher's
4178    /// relationship-uniqueness rule across hops within the same
4179    /// MATCH pattern. Each entry resolves against the current row
4180    /// (falling through to outer-scope rows) at run time; the
4181    /// union of every referenced edge id becomes the DFS
4182    /// exclusion set.
4183    excluded_edge_vars: Vec<String>,
4184    current_row: Option<Row>,
4185    pending_paths: Vec<Vec<Edge>>,
4186    pending_node_paths: Vec<Vec<NodeId>>,
4187    pending_targets: Vec<NodeId>,
4188    pending_idx: usize,
4189}
4190
4191impl VarLengthExpandOp {
4192    #[allow(clippy::too_many_arguments)]
4193    fn new(
4194        input: Box<dyn Operator>,
4195        src_var: String,
4196        edge_var: Option<String>,
4197        dst_var: String,
4198        dst_labels: Vec<String>,
4199        edge_types: Vec<String>,
4200        edge_properties: Vec<(String, Expr)>,
4201        direction: Direction,
4202        min_hops: u64,
4203        max_hops: u64,
4204        path_var: Option<String>,
4205        optional: bool,
4206        dst_constraint_var: Option<String>,
4207        bound_edge_list_var: Option<String>,
4208        excluded_edge_vars: Vec<String>,
4209    ) -> Self {
4210        Self {
4211            input,
4212            src_var,
4213            edge_var,
4214            dst_var,
4215            dst_labels,
4216            edge_types,
4217            edge_properties,
4218            direction,
4219            min_hops,
4220            max_hops,
4221            path_var,
4222            optional,
4223            dst_constraint_var,
4224            bound_edge_list_var,
4225            excluded_edge_vars,
4226            current_row: None,
4227            pending_paths: Vec::new(),
4228            pending_node_paths: Vec::new(),
4229            pending_targets: Vec::new(),
4230            pending_idx: 0,
4231        }
4232    }
4233
4234    fn enumerate(
4235        &self,
4236        ctx: &ExecCtx,
4237        start: NodeId,
4238        input_row: &Row,
4239    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4240        let mut paths: Vec<Vec<Edge>> = Vec::new();
4241        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4242        let mut targets: Vec<NodeId> = Vec::new();
4243        let mut edge_buf: Vec<Edge> = Vec::new();
4244        let mut node_buf: Vec<NodeId> = vec![start];
4245        // Seed `used` with edges from outer-scope bindings the
4246        // pattern flags as exclusions (e.g. the other hops'
4247        // edge / edge-list vars). A fresh walk can't revisit an
4248        // edge that's already bound as a relationship variable
4249        // elsewhere in the MATCH — that's openCypher's
4250        // relationship-uniqueness rule.
4251        let mut used: HashSet<EdgeId> = HashSet::new();
4252        for var in &self.excluded_edge_vars {
4253            match ctx.lookup_binding(input_row, var) {
4254                Some(Value::Edge(e)) => {
4255                    used.insert(e.id);
4256                }
4257                Some(Value::List(items)) => {
4258                    for item in items {
4259                        if let Value::Edge(e) = item {
4260                            used.insert(e.id);
4261                        }
4262                    }
4263                }
4264                _ => {}
4265            }
4266        }
4267        // Evaluate edge-property expected values once per input row
4268        // — they may reference row bindings or `$`-parameters, but
4269        // don't vary per walked edge.
4270        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4271            Vec::new()
4272        } else {
4273            let ectx = ctx.eval_ctx(input_row);
4274            self.edge_properties
4275                .iter()
4276                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4277                .collect::<Result<Vec<_>>>()?
4278        };
4279        self.dfs(
4280            ctx,
4281            start,
4282            &expected_edge_props,
4283            &mut edge_buf,
4284            &mut node_buf,
4285            &mut used,
4286            &mut paths,
4287            &mut node_paths,
4288            &mut targets,
4289        )?;
4290        Ok((paths, node_paths, targets))
4291    }
4292
4293    #[allow(clippy::too_many_arguments)]
4294    fn dfs(
4295        &self,
4296        ctx: &ExecCtx,
4297        current_node: NodeId,
4298        expected_edge_props: &[(String, Value)],
4299        edge_buf: &mut Vec<Edge>,
4300        node_buf: &mut Vec<NodeId>,
4301        used: &mut HashSet<EdgeId>,
4302        out_paths: &mut Vec<Vec<Edge>>,
4303        out_node_paths: &mut Vec<Vec<NodeId>>,
4304        out_targets: &mut Vec<NodeId>,
4305    ) -> Result<()> {
4306        let depth = edge_buf.len() as u64;
4307
4308        if depth >= self.min_hops && depth <= self.max_hops {
4309            let terminal_ok = match ctx.store.get_node(current_node)? {
4310                Some(node) => has_all_labels(&node, &self.dst_labels),
4311                None => false,
4312            };
4313            if terminal_ok {
4314                out_paths.push(edge_buf.clone());
4315                out_node_paths.push(node_buf.clone());
4316                out_targets.push(current_node);
4317            }
4318        }
4319
4320        if depth >= self.max_hops {
4321            return Ok(());
4322        }
4323
4324        let neighbors = match self.direction {
4325            Direction::Outgoing => ctx.store.outgoing(current_node)?,
4326            Direction::Incoming => ctx.store.incoming(current_node)?,
4327            Direction::Both => {
4328                let mut all = ctx.store.outgoing(current_node)?;
4329                all.extend(ctx.store.incoming(current_node)?);
4330                all
4331            }
4332        };
4333
4334        for (eid, neighbor_id) in neighbors {
4335            if used.contains(&eid) {
4336                continue;
4337            }
4338            let edge = match ctx.store.get_edge(eid)? {
4339                Some(e) => e,
4340                None => continue,
4341            };
4342            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4343            {
4344                continue;
4345            }
4346            // Inline edge property filter: skip edges whose
4347            // properties don't equal the expected values computed
4348            // per input row. A missing property fails the check,
4349            // matching `EdgeExpandOp`.
4350            if !expected_edge_props.is_empty() {
4351                let mut ok = true;
4352                for (key, expected) in expected_edge_props {
4353                    let actual = match edge.properties.get(key) {
4354                        Some(v) => Value::Property(v.clone()),
4355                        None => {
4356                            ok = false;
4357                            break;
4358                        }
4359                    };
4360                    if !values_equal(&actual, expected) {
4361                        ok = false;
4362                        break;
4363                    }
4364                }
4365                if !ok {
4366                    continue;
4367                }
4368            }
4369            used.insert(eid);
4370            edge_buf.push(edge);
4371            node_buf.push(neighbor_id);
4372            self.dfs(
4373                ctx,
4374                neighbor_id,
4375                expected_edge_props,
4376                edge_buf,
4377                node_buf,
4378                used,
4379                out_paths,
4380                out_node_paths,
4381                out_targets,
4382            )?;
4383            edge_buf.pop();
4384            node_buf.pop();
4385            used.remove(&eid);
4386        }
4387
4388        Ok(())
4389    }
4390}
4391
4392/// Replay the edge sequence stored at `list_var` in `row` as a
4393/// var-length walk starting from `src_id`. Returns `(paths,
4394/// node_paths, targets)` in the same shape `enumerate` produces
4395/// — either one entry when the list reads as a valid connected
4396/// walk in `direction`, or empty when it doesn't.
4397///
4398/// Validation per step:
4399/// * the list element is a `Value::Edge`,
4400/// * the edge's optional type filter matches `edge_types`,
4401/// * the edge actually touches the current node and proceeds to
4402///   the other endpoint in the requested direction (undirected
4403///   accepts either).
4404///
4405/// A null / missing / non-list value, a null source, or any
4406/// failed step all produce an empty result — which, when the
4407/// caller is in optional mode, triggers the left-join null
4408/// fallback.
4409fn replay_edge_list(
4410    ctx: &ExecCtx,
4411    row: &Row,
4412    list_var: &str,
4413    src_id: Option<NodeId>,
4414    direction: Direction,
4415    edge_types: &[String],
4416) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4417    let start = match src_id {
4418        Some(id) => id,
4419        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4420    };
4421    let list = match ctx.lookup_binding(row, list_var) {
4422        Some(Value::List(items)) => items.clone(),
4423        Some(Value::Property(meshdb_core::Property::List(items))) => items
4424            .iter()
4425            .cloned()
4426            .map(Value::Property)
4427            .collect::<Vec<_>>(),
4428        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4429    };
4430    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4431    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4432    node_buf.push(start);
4433    let mut current = start;
4434    for item in list {
4435        let edge = match item {
4436            Value::Edge(e) => e,
4437            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4438        };
4439        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4440            return Ok((Vec::new(), Vec::new(), Vec::new()));
4441        }
4442        let next_node = match direction {
4443            Direction::Outgoing => {
4444                if edge.source != current {
4445                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4446                }
4447                edge.target
4448            }
4449            Direction::Incoming => {
4450                if edge.target != current {
4451                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4452                }
4453                edge.source
4454            }
4455            Direction::Both => {
4456                if edge.source == current {
4457                    edge.target
4458                } else if edge.target == current {
4459                    edge.source
4460                } else {
4461                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4462                }
4463            }
4464        };
4465        // The stored edge shape should still exist in the graph;
4466        // a missing node mid-walk means the snapshot shifted and
4467        // the list is no longer valid.
4468        if ctx.store.get_node(next_node)?.is_none() {
4469            return Ok((Vec::new(), Vec::new(), Vec::new()));
4470        }
4471        edge_buf.push(edge);
4472        node_buf.push(next_node);
4473        current = next_node;
4474    }
4475    Ok((vec![edge_buf], vec![node_buf], vec![current]))
4476}
4477
4478impl Operator for VarLengthExpandOp {
4479    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4480        loop {
4481            while self.pending_idx < self.pending_paths.len() {
4482                let i = self.pending_idx;
4483                self.pending_idx += 1;
4484
4485                let target_id = self.pending_targets[i];
4486                let target = match ctx.store.get_node(target_id)? {
4487                    Some(n) => n,
4488                    None => continue,
4489                };
4490
4491                let base = self
4492                    .current_row
4493                    .as_ref()
4494                    .expect("pending without source row");
4495                let mut out = base.clone();
4496                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4497                if let Some(ev) = &self.edge_var {
4498                    let edges: Vec<Value> = self.pending_paths[i]
4499                        .iter()
4500                        .cloned()
4501                        .map(Value::Edge)
4502                        .collect();
4503                    out.insert(ev.clone(), Value::List(edges));
4504                }
4505                if let Some(pv) = &self.path_var {
4506                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4507                    for nid in &self.pending_node_paths[i] {
4508                        match ctx.store.get_node(*nid)? {
4509                            Some(n) => nodes.push(n),
4510                            None => continue,
4511                        }
4512                    }
4513                    let edges = self.pending_paths[i].clone();
4514                    out.insert(pv.clone(), Value::Path { nodes, edges });
4515                }
4516                return Ok(Some(out));
4517            }
4518
4519            match self.input.next(ctx)? {
4520                None => return Ok(None),
4521                Some(row) => {
4522                    let src_id = match row.get(&self.src_var) {
4523                        Some(Value::Node(n)) => Some(n.id),
4524                        // Null source → no paths. Same
4525                        // null-propagating semantics as
4526                        // `EdgeExpandOp`. In optional mode we
4527                        // still emit the left-join fallback;
4528                        // otherwise we just skip.
4529                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4530                            None
4531                        }
4532                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4533                    };
4534                    // Replay path: when the hop names an already-
4535                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
4536                    // `rs = [r1, r2]` from an earlier WITH), skip
4537                    // DFS entirely and verify the listed edges
4538                    // form a connected walk from `src_id` in the
4539                    // required direction. Produces at most one
4540                    // path — the exact list that was supplied.
4541                    let (mut paths, mut node_paths, mut targets) =
4542                        if let Some(list_var) = &self.bound_edge_list_var {
4543                            replay_edge_list(
4544                                ctx,
4545                                &row,
4546                                list_var,
4547                                src_id,
4548                                self.direction,
4549                                &self.edge_types,
4550                            )?
4551                        } else {
4552                            match src_id {
4553                                Some(id) => self.enumerate(ctx, id, &row)?,
4554                                None => (Vec::new(), Vec::new(), Vec::new()),
4555                            }
4556                        };
4557                    // Bound-endpoint constraint: drop paths whose
4558                    // terminal node doesn't match the node already
4559                    // bound at the constraint var. When combined
4560                    // with `optional`, an input whose paths are
4561                    // all filtered out still triggers the
4562                    // null-fallback below.
4563                    if let Some(constraint_var) = &self.dst_constraint_var {
4564                        let target_id = match row.get(constraint_var) {
4565                            Some(Value::Node(n)) => Some(n.id),
4566                            _ => None,
4567                        };
4568                        match target_id {
4569                            Some(id) => {
4570                                let mut kept_paths = Vec::new();
4571                                let mut kept_node_paths = Vec::new();
4572                                let mut kept_targets = Vec::new();
4573                                for ((p, np), t) in paths
4574                                    .drain(..)
4575                                    .zip(node_paths.drain(..))
4576                                    .zip(targets.drain(..))
4577                                {
4578                                    if t == id {
4579                                        kept_paths.push(p);
4580                                        kept_node_paths.push(np);
4581                                        kept_targets.push(t);
4582                                    }
4583                                }
4584                                paths = kept_paths;
4585                                node_paths = kept_node_paths;
4586                                targets = kept_targets;
4587                            }
4588                            None => {
4589                                paths.clear();
4590                                node_paths.clear();
4591                                targets.clear();
4592                            }
4593                        }
4594                    }
4595                    if paths.is_empty() && self.optional {
4596                        // Left-join fallback: emit one row with
4597                        // the expansion's output vars set to Null
4598                        // so the outer OPTIONAL MATCH preserves
4599                        // this input row.
4600                        let mut out = row;
4601                        if let Some(ev) = &self.edge_var {
4602                            out.insert(ev.clone(), Value::Null);
4603                        }
4604                        out.insert(self.dst_var.clone(), Value::Null);
4605                        if let Some(pv) = &self.path_var {
4606                            out.insert(pv.clone(), Value::Null);
4607                        }
4608                        return Ok(Some(out));
4609                    }
4610                    self.pending_paths = paths;
4611                    self.pending_node_paths = node_paths;
4612                    self.pending_targets = targets;
4613                    self.pending_idx = 0;
4614                    self.current_row = Some(row);
4615                }
4616            }
4617        }
4618    }
4619}
4620
4621struct FilterOp {
4622    input: Box<dyn Operator>,
4623    predicate: Expr,
4624}
4625
4626impl FilterOp {
4627    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4628        Self { input, predicate }
4629    }
4630}
4631
4632impl Operator for FilterOp {
4633    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4634        while let Some(row) = self.input.next(ctx)? {
4635            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4636                Ok(v) => v,
4637                // Type mismatches and non-boolean errors in filter predicates
4638                // are treated as false (row filtered out), not hard errors
4639                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4640                Err(e) => return Err(e),
4641            };
4642            if to_bool(&v).unwrap_or(false) {
4643                return Ok(Some(row));
4644            }
4645        }
4646        Ok(None)
4647    }
4648}
4649
4650/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
4651/// row from the input unchanged.
4652struct IdentityOp {
4653    input: Box<dyn Operator>,
4654}
4655
4656impl IdentityOp {
4657    fn new(input: Box<dyn Operator>) -> Self {
4658        Self { input }
4659    }
4660}
4661
4662impl Operator for IdentityOp {
4663    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4664        self.input.next(ctx)
4665    }
4666}
4667
4668/// Passes input rows through; if the input produces zero rows,
4669/// emits exactly one row with `null_vars` bound to Value::Null.
4670/// Implements standalone OPTIONAL MATCH semantics (e.g.
4671/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
4672/// row with n=null rather than the empty result set).
4673struct CoalesceNullRowOp {
4674    input: Box<dyn Operator>,
4675    null_vars: Vec<String>,
4676    produced_any: bool,
4677    done: bool,
4678}
4679
4680impl CoalesceNullRowOp {
4681    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
4682        Self {
4683            input,
4684            null_vars,
4685            produced_any: false,
4686            done: false,
4687        }
4688    }
4689}
4690
4691impl Operator for CoalesceNullRowOp {
4692    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4693        if self.done {
4694            return Ok(None);
4695        }
4696        match self.input.next(ctx)? {
4697            Some(row) => {
4698                self.produced_any = true;
4699                Ok(Some(row))
4700            }
4701            None => {
4702                self.done = true;
4703                if self.produced_any {
4704                    Ok(None)
4705                } else {
4706                    let mut row = Row::new();
4707                    for v in &self.null_vars {
4708                        row.insert(v.clone(), Value::Null);
4709                    }
4710                    Ok(Some(row))
4711                }
4712            }
4713        }
4714    }
4715}
4716
4717struct ProjectOp {
4718    input: Box<dyn Operator>,
4719    items: Vec<ReturnItem>,
4720}
4721
4722impl ProjectOp {
4723    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4724        Self { input, items }
4725    }
4726}
4727
4728impl Operator for ProjectOp {
4729    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4730        match self.input.next(ctx)? {
4731            Some(row) => {
4732                let mut out = Row::new();
4733                for (i, item) in self.items.iter().enumerate() {
4734                    let name = item.alias.clone().unwrap_or_else(|| {
4735                        item.raw_text
4736                            .clone()
4737                            .unwrap_or_else(|| default_name(&item.expr, i))
4738                    });
4739                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4740                    out.insert(name, value);
4741                }
4742                Ok(Some(out))
4743            }
4744            None => Ok(None),
4745        }
4746    }
4747}
4748
4749fn default_name(expr: &Expr, idx: usize) -> String {
4750    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4751}
4752
4753fn render_expr_name(expr: &Expr) -> Option<String> {
4754    Some(match expr {
4755        Expr::Identifier(s) => s.clone(),
4756        Expr::Property { var, key } => format!("{var}.{key}"),
4757        Expr::PropertyAccess { base, key } => {
4758            // Match the source syntax's parenthesisation of a
4759            // bracketed base: `(list[1]).k` round-trips, while a
4760            // plain identifier base stays bare (`a.b`).
4761            if matches!(
4762                base.as_ref(),
4763                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4764            ) {
4765                format!("({}).{key}", render_expr_name(base)?)
4766            } else {
4767                format!("{}.{key}", render_expr_name(base)?)
4768            }
4769        }
4770        Expr::Parameter(name) => format!("${name}"),
4771        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4772        Expr::Literal(Literal::Integer(i)) => i.to_string(),
4773        Expr::Literal(Literal::Float(f)) => f.to_string(),
4774        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4775        Expr::Literal(Literal::Null) => "NULL".into(),
4776        Expr::Call { name, args } => {
4777            let arg_str = match args {
4778                CallArgs::Star => "*".into(),
4779                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4780                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4781                        "DISTINCT "
4782                    } else {
4783                        ""
4784                    };
4785                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4786                    if inner.len() != es.len() {
4787                        return None;
4788                    }
4789                    format!("{prefix}{}", inner.join(", "))
4790                }
4791            };
4792            format!("{name}({arg_str})")
4793        }
4794        Expr::BinaryOp { op, left, right } => {
4795            let op_str = match op {
4796                BinaryOp::Add => " + ",
4797                BinaryOp::Sub => " - ",
4798                BinaryOp::Mul => " * ",
4799                BinaryOp::Div => " / ",
4800                BinaryOp::Mod => " % ",
4801                BinaryOp::Pow => " ^ ",
4802            };
4803            format!(
4804                "{}{op_str}{}",
4805                render_expr_name(left)?,
4806                render_expr_name(right)?
4807            )
4808        }
4809        Expr::UnaryOp { op, operand } => {
4810            let op_str = match op {
4811                UnaryOp::Neg => "-",
4812            };
4813            format!("{op_str}{}", render_expr_name(operand)?)
4814        }
4815        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4816        Expr::IsNull { negated, inner } => {
4817            if *negated {
4818                format!("{} IS NOT NULL", render_expr_name(inner)?)
4819            } else {
4820                format!("{} IS NULL", render_expr_name(inner)?)
4821            }
4822        }
4823        Expr::Compare { op, left, right } => {
4824            let op_str = match op {
4825                CompareOp::Eq => " = ",
4826                CompareOp::Ne => " <> ",
4827                CompareOp::Lt => " < ",
4828                CompareOp::Le => " <= ",
4829                CompareOp::Gt => " > ",
4830                CompareOp::Ge => " >= ",
4831                CompareOp::StartsWith => " STARTS WITH ",
4832                CompareOp::EndsWith => " ENDS WITH ",
4833                CompareOp::Contains => " CONTAINS ",
4834                CompareOp::RegexMatch => " =~ ",
4835            };
4836            format!(
4837                "{}{op_str}{}",
4838                render_expr_name(left)?,
4839                render_expr_name(right)?
4840            )
4841        }
4842        Expr::List(items) => {
4843            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4844            if inner.len() != items.len() {
4845                return None;
4846            }
4847            format!("[{}]", inner.join(", "))
4848        }
4849        Expr::Map(entries) => {
4850            let inner: Vec<String> = entries
4851                .iter()
4852                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4853                .collect::<Option<Vec<_>>>()?;
4854            format!("{{{}}}", inner.join(", "))
4855        }
4856        Expr::IndexAccess { base, index } => {
4857            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4858        }
4859        Expr::InList { element, list } => {
4860            format!(
4861                "{} IN {}",
4862                render_expr_name(element)?,
4863                render_expr_name(list)?
4864            )
4865        }
4866        Expr::HasLabels { expr, labels } => {
4867            let mut s = format!("({}", render_expr_name(expr)?);
4868            for l in labels {
4869                s.push(':');
4870                s.push_str(l);
4871            }
4872            s.push(')');
4873            s
4874        }
4875        _ => return None,
4876    })
4877}
4878
4879struct DistinctOp {
4880    input: Box<dyn Operator>,
4881    seen: HashSet<String>,
4882}
4883
4884impl DistinctOp {
4885    fn new(input: Box<dyn Operator>) -> Self {
4886        Self {
4887            input,
4888            seen: HashSet::new(),
4889        }
4890    }
4891}
4892
4893impl Operator for DistinctOp {
4894    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4895        while let Some(row) = self.input.next(ctx)? {
4896            let key = row_key(&row);
4897            if self.seen.insert(key) {
4898                return Ok(Some(row));
4899            }
4900        }
4901        Ok(None)
4902    }
4903}
4904
4905/// Assemble a `Value::Path` from a row's already-bound node and
4906/// edge variables. Emitted by `plan_pattern` when the source
4907/// pattern carries `path_var`. The operator pulls `node_vars[i]`
4908/// and `edge_vars[i]` out of every row, walks them in order, and
4909/// inserts the result into the row under `path_var` before
4910/// forwarding downstream.
4911///
4912/// Rows where any referenced variable is missing or not the
4913/// expected Node/Edge shape get a `Value::Null` at `path_var` —
4914/// matching how `OPTIONAL MATCH` flows null bindings through the
4915/// downstream projection without hard-erroring. Missing bindings
4916/// shouldn't normally happen (the planner fills in synthetic
4917/// names via `ensure_path_bindings`), but the null fallback
4918/// means an unexpected upstream shape degrades gracefully
4919/// instead of crashing the whole query.
4920struct BindPathOp {
4921    input: Box<dyn Operator>,
4922    path_var: String,
4923    node_vars: Vec<String>,
4924    edge_vars: Vec<String>,
4925}
4926
4927impl BindPathOp {
4928    fn new(
4929        input: Box<dyn Operator>,
4930        path_var: String,
4931        node_vars: Vec<String>,
4932        edge_vars: Vec<String>,
4933    ) -> Self {
4934        Self {
4935            input,
4936            path_var,
4937            node_vars,
4938            edge_vars,
4939        }
4940    }
4941}
4942
4943impl Operator for BindPathOp {
4944    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4945        let Some(mut row) = self.input.next(ctx)? else {
4946            return Ok(None);
4947        };
4948        // Extract the ordered node + edge sequence from the row.
4949        // Any missing or wrong-shaped entry collapses the whole
4950        // path binding to null and falls through.
4951        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4952        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4953        let mut abort = false;
4954        // Interleave node/edge vars. For each hop i:
4955        //   node_vars[i] = start/intermediate node
4956        //   edge_vars[i] = edge (or sub-path for var-length)
4957        //   node_vars[i+1] = target node
4958        // For var-length hops, edge_vars[i] may contain a
4959        // Value::Path — splice its interior into the running path.
4960        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4961            nodes.push(n.clone());
4962        } else {
4963            abort = true;
4964        }
4965        if !abort {
4966            for (i, ev) in self.edge_vars.iter().enumerate() {
4967                match row.get(ev) {
4968                    Some(Value::Edge(e)) => {
4969                        edges.push(e.clone());
4970                        match row.get(&self.node_vars[i + 1]) {
4971                            Some(Value::Node(n)) => nodes.push(n.clone()),
4972                            _ => {
4973                                abort = true;
4974                                break;
4975                            }
4976                        }
4977                    }
4978                    Some(Value::Path {
4979                        nodes: sub_nodes,
4980                        edges: sub_edges,
4981                    }) => {
4982                        // Splice the sub-path. The sub-path's first
4983                        // node is the same as nodes.last() (already
4984                        // pushed), so skip it. All sub-edges go in.
4985                        // Sub-path interior nodes go in. The sub-path's
4986                        // last node is the target for this hop.
4987                        edges.extend(sub_edges.iter().cloned());
4988                        if sub_nodes.len() > 1 {
4989                            nodes.extend(sub_nodes[1..].iter().cloned());
4990                        }
4991                    }
4992                    _ => {
4993                        abort = true;
4994                        break;
4995                    }
4996                }
4997            }
4998        }
4999        if abort {
5000            row.insert(self.path_var.clone(), Value::Null);
5001        } else {
5002            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5003        }
5004        Ok(Some(row))
5005    }
5006}
5007
5008/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
5009/// row (which must already bind both `src_var` and `dst_var`
5010/// to `Value::Node`), runs a breadth-first search from the
5011/// source node toward the target, filtering edges by
5012/// `edge_type` and walking up to `max_hops` steps. Emits one
5013/// row per successful search with `path_var` set to a
5014/// `Value::Path` carrying the traversed node/edge sequence;
5015/// rows where BFS finds no path are dropped entirely (matching
5016/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
5017///
5018/// The BFS uses classic parent-pointer reconstruction: each
5019/// visited node's `(parent_node_id, edge_id)` pair is stored
5020/// in a hashmap, and once the target is reached we walk back
5021/// to the source, reversing the accumulated edges to produce
5022/// a forward-order path. Cycle detection is a side-effect of
5023/// the visited set — the first time BFS sees a node is
5024/// necessarily via a shortest path, so later visits are
5025/// ignored.
5026struct ShortestPathOp {
5027    input: Box<dyn Operator>,
5028    src_var: String,
5029    dst_var: String,
5030    path_var: String,
5031    edge_types: Vec<String>,
5032    direction: meshdb_cypher::Direction,
5033    max_hops: u64,
5034    kind: meshdb_cypher::ShortestKind,
5035    /// Buffered `(base_row, path)` pairs from the current
5036    /// input row, used only by `AllShortest` mode where a
5037    /// single input can expand into multiple shortest paths
5038    /// of the same length. Each call to `next` drains one
5039    /// entry before pulling a fresh input row; in `Shortest`
5040    /// mode the buffer is always empty or has one element.
5041    pending: std::collections::VecDeque<(Row, Value)>,
5042}
5043
5044impl ShortestPathOp {
5045    #[allow(clippy::too_many_arguments)]
5046    fn new(
5047        input: Box<dyn Operator>,
5048        src_var: String,
5049        dst_var: String,
5050        path_var: String,
5051        edge_types: Vec<String>,
5052        direction: meshdb_cypher::Direction,
5053        max_hops: u64,
5054        kind: meshdb_cypher::ShortestKind,
5055    ) -> Self {
5056        Self {
5057            input,
5058            src_var,
5059            dst_var,
5060            path_var,
5061            edge_types,
5062            direction,
5063            max_hops,
5064            kind,
5065            pending: std::collections::VecDeque::new(),
5066        }
5067    }
5068}
5069
5070impl Operator for ShortestPathOp {
5071    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5072        loop {
5073            // Drain buffered paths from a prior input row
5074            // before pulling the next one. Only reachable in
5075            // `AllShortest` mode; `Shortest` mode's buffer
5076            // never accumulates more than one entry.
5077            if let Some((mut row, path)) = self.pending.pop_front() {
5078                row.insert(self.path_var.clone(), path);
5079                return Ok(Some(row));
5080            }
5081            let Some(row) = self.input.next(ctx)? else {
5082                return Ok(None);
5083            };
5084            let src = match row.get(&self.src_var) {
5085                Some(Value::Node(n)) => n.clone(),
5086                _ => continue,
5087            };
5088            let dst = match row.get(&self.dst_var) {
5089                Some(Value::Node(n)) => n.clone(),
5090                _ => continue,
5091            };
5092            let paths = bfs_shortest_paths(
5093                &src,
5094                &dst,
5095                &self.edge_types,
5096                self.direction,
5097                self.max_hops,
5098                self.kind,
5099                ctx.store,
5100            )?;
5101            if paths.is_empty() {
5102                // No path of length ≤ max_hops — drop the row.
5103                continue;
5104            }
5105            for path in paths {
5106                self.pending.push_back((row.clone(), path));
5107            }
5108        }
5109    }
5110}
5111
5112/// Layered breadth-first search from `src` toward `dst`,
5113/// constrained by edge type, direction, and max hop count.
5114/// Returns every path of minimum length when `kind` is
5115/// `AllShortest`, or just the first discovered shortest path
5116/// when `kind` is `Shortest`. An empty vector means no path
5117/// of length ≤ `max_hops` exists.
5118///
5119/// `src == dst` is a zero-hop special case that always
5120/// returns a singleton path regardless of `max_hops`.
5121///
5122/// The algorithm builds a parent DAG as it expands each
5123/// level: every node that was first reached at depth `d+1`
5124/// records every `(parent, edge)` pair from frontier[d] that
5125/// reaches it, not just the first one. This lets the
5126/// reconstruction walk enumerate all source-to-target paths
5127/// of the discovered shortest length. For `Shortest` mode
5128/// the reconstruction short-circuits on the first complete
5129/// path.
5130fn bfs_shortest_paths(
5131    src: &Node,
5132    dst: &Node,
5133    edge_types: &[String],
5134    direction: meshdb_cypher::Direction,
5135    max_hops: u64,
5136    kind: meshdb_cypher::ShortestKind,
5137    reader: &dyn crate::reader::GraphReader,
5138) -> Result<Vec<Value>> {
5139    use meshdb_cypher::Direction;
5140
5141    if src.id == dst.id {
5142        return Ok(vec![Value::Path {
5143            nodes: vec![src.clone()],
5144            edges: vec![],
5145        }]);
5146    }
5147
5148    // `dist` tracks the shortest distance from `src` to each
5149    // reached node. `parents` maps each reached node id to
5150    // every `(parent_id, edge_id)` pair that reached it at
5151    // that shortest distance — a node may have multiple
5152    // parents in the parent DAG for `AllShortest`.
5153    let mut dist: HashMap<NodeId, u64> = HashMap::new();
5154    dist.insert(src.id, 0);
5155    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5156
5157    let mut frontier: Vec<NodeId> = vec![src.id];
5158    let mut depth: u64 = 0;
5159    let mut found = false;
5160
5161    while !frontier.is_empty() && depth < max_hops && !found {
5162        let mut next_frontier: Vec<NodeId> = Vec::new();
5163        for node_id in &frontier {
5164            let neighbors = match direction {
5165                Direction::Outgoing => reader.outgoing(*node_id)?,
5166                Direction::Incoming => reader.incoming(*node_id)?,
5167                Direction::Both => {
5168                    let mut out = reader.outgoing(*node_id)?;
5169                    out.extend(reader.incoming(*node_id)?);
5170                    out
5171                }
5172            };
5173            for (edge_id, neighbor_id) in neighbors {
5174                // Edge-type filter. Only fetch the edge record
5175                // when a type constraint is present.
5176                if !edge_types.is_empty() {
5177                    let edge = match reader.get_edge(edge_id)? {
5178                        Some(e) => e,
5179                        None => continue,
5180                    };
5181                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
5182                        continue;
5183                    }
5184                }
5185                match dist.get(&neighbor_id) {
5186                    Some(&d) if d == depth + 1 => {
5187                        // Alternate parent at the same
5188                        // shortest-path level — record the
5189                        // additional edge so the reconstruction
5190                        // can enumerate it, but don't re-add to
5191                        // the next frontier.
5192                        parents
5193                            .entry(neighbor_id)
5194                            .or_default()
5195                            .push((*node_id, edge_id));
5196                    }
5197                    Some(_) => {
5198                        // Already reached at a strictly shorter
5199                        // depth; this edge isn't on a shortest
5200                        // path.
5201                    }
5202                    None => {
5203                        dist.insert(neighbor_id, depth + 1);
5204                        parents
5205                            .entry(neighbor_id)
5206                            .or_default()
5207                            .push((*node_id, edge_id));
5208                        if neighbor_id == dst.id {
5209                            found = true;
5210                        } else {
5211                            next_frontier.push(neighbor_id);
5212                        }
5213                    }
5214                }
5215            }
5216        }
5217        depth += 1;
5218        if !found {
5219            frontier = next_frontier;
5220        }
5221    }
5222
5223    if !found {
5224        return Ok(Vec::new());
5225    }
5226
5227    // Enumerate all shortest paths from the parent DAG.
5228    // `Shortest` mode short-circuits after the first complete
5229    // walk; `AllShortest` collects every distinct path.
5230    let mut out: Vec<Value> = Vec::new();
5231    let mut nodes_rev: Vec<Node> = Vec::new();
5232    let mut edges_rev: Vec<Edge> = Vec::new();
5233    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5234    collect_shortest_paths(
5235        src,
5236        dst,
5237        &parents,
5238        reader,
5239        &mut nodes_rev,
5240        &mut edges_rev,
5241        &mut out,
5242        only_first,
5243    )?;
5244    Ok(out)
5245}
5246
5247/// Depth-first walk through the parent DAG built by
5248/// `bfs_shortest_paths`, collecting every source-to-target
5249/// path of the BFS-determined shortest length. The recursion
5250/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
5251/// representing the partially-reconstructed path in reverse
5252/// order; on reaching `src` we copy the reversed accumulators
5253/// into a forward-order `Value::Path` and push it into `out`.
5254///
5255/// `only_first = true` short-circuits after the first
5256/// complete path, giving `Shortest` mode the optimal-case
5257/// early exit.
5258#[allow(clippy::too_many_arguments)]
5259fn collect_shortest_paths(
5260    src: &Node,
5261    current: &Node,
5262    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5263    reader: &dyn crate::reader::GraphReader,
5264    nodes_rev: &mut Vec<Node>,
5265    edges_rev: &mut Vec<Edge>,
5266    out: &mut Vec<Value>,
5267    only_first: bool,
5268) -> Result<()> {
5269    if current.id == src.id {
5270        // Complete walk: `nodes_rev` holds dst, ..., (last
5271        // node before src) in reverse; prepend src and
5272        // reverse to produce the forward-order node list.
5273        // Edges are similarly reversed.
5274        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5275        nodes.push(src.clone());
5276        nodes.extend(nodes_rev.iter().rev().cloned());
5277        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5278        out.push(Value::Path { nodes, edges });
5279        return Ok(());
5280    }
5281    let Some(parent_edges) = parents.get(&current.id) else {
5282        // Unreachable in normal flow — BFS only inserts dst
5283        // into `parents` when it found at least one incoming
5284        // edge — but handled defensively.
5285        return Ok(());
5286    };
5287    for (parent_id, edge_id) in parent_edges {
5288        if only_first && !out.is_empty() {
5289            return Ok(());
5290        }
5291        let edge = reader
5292            .get_edge(*edge_id)?
5293            .expect("BFS inserted this edge id; it must still exist");
5294        let parent_node = reader
5295            .get_node(*parent_id)?
5296            .expect("BFS visited this node id; it must still exist");
5297        nodes_rev.push(current.clone());
5298        edges_rev.push(edge);
5299        collect_shortest_paths(
5300            src,
5301            &parent_node,
5302            parents,
5303            reader,
5304            nodes_rev,
5305            edges_rev,
5306            out,
5307            only_first,
5308        )?;
5309        nodes_rev.pop();
5310        edges_rev.pop();
5311    }
5312    Ok(())
5313}
5314
5315/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
5316/// its rows through. For plain `UNION` (`all = false`) we
5317/// deduplicate across the combined stream using the same
5318/// `row_key` the `DistinctOp` uses, so the semantics match
5319/// Neo4j's "set union" shape regardless of whether duplicates
5320/// sit inside a single branch or straddle branches. For
5321/// `UNION ALL` the `seen` set stays `None` and every produced
5322/// row is forwarded as-is.
5323struct UnionOp {
5324    branches: Vec<Box<dyn Operator>>,
5325    current: usize,
5326    seen: Option<HashSet<String>>,
5327}
5328
5329impl UnionOp {
5330    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5331        Self {
5332            branches,
5333            current: 0,
5334            seen: if all { None } else { Some(HashSet::new()) },
5335        }
5336    }
5337}
5338
5339impl Operator for UnionOp {
5340    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5341        while self.current < self.branches.len() {
5342            match self.branches[self.current].next(ctx)? {
5343                Some(row) => {
5344                    if let Some(seen) = self.seen.as_mut() {
5345                        let key = row_key(&row);
5346                        if !seen.insert(key) {
5347                            continue;
5348                        }
5349                    }
5350                    return Ok(Some(row));
5351                }
5352                None => {
5353                    self.current += 1;
5354                }
5355            }
5356        }
5357        Ok(None)
5358    }
5359}
5360
5361struct OrderByOp {
5362    input: Box<dyn Operator>,
5363    sort_items: Vec<SortItem>,
5364    sorted: Option<Vec<Row>>,
5365    cursor: usize,
5366}
5367
5368impl OrderByOp {
5369    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5370        Self {
5371            input,
5372            sort_items,
5373            sorted: None,
5374            cursor: 0,
5375        }
5376    }
5377}
5378
5379impl Operator for OrderByOp {
5380    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5381        if self.sorted.is_none() {
5382            let mut rows: Vec<Row> = Vec::new();
5383            while let Some(row) = self.input.next(ctx)? {
5384                rows.push(row);
5385            }
5386            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5387            for row in rows {
5388                let mut keys = Vec::with_capacity(self.sort_items.len());
5389                for item in &self.sort_items {
5390                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5391                }
5392                keyed.push((keys, row));
5393            }
5394            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5395            keyed.sort_by(|a, b| {
5396                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5397                    let ord = compare_values(va, vb);
5398                    let ord = if descs[i] { ord.reverse() } else { ord };
5399                    if ord != Ordering::Equal {
5400                        return ord;
5401                    }
5402                }
5403                Ordering::Equal
5404            });
5405            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5406        }
5407        let rows = self.sorted.as_ref().unwrap();
5408        if self.cursor < rows.len() {
5409            let row = rows[self.cursor].clone();
5410            self.cursor += 1;
5411            Ok(Some(row))
5412        } else {
5413            Ok(None)
5414        }
5415    }
5416}
5417
5418struct AggregateOp {
5419    input: Box<dyn Operator>,
5420    group_keys: Vec<ReturnItem>,
5421    aggregates: Vec<AggregateSpec>,
5422    results: Option<Vec<Row>>,
5423    cursor: usize,
5424}
5425
5426impl AggregateOp {
5427    fn new(
5428        input: Box<dyn Operator>,
5429        group_keys: Vec<ReturnItem>,
5430        aggregates: Vec<AggregateSpec>,
5431    ) -> Self {
5432        Self {
5433            input,
5434            group_keys,
5435            aggregates,
5436            results: None,
5437            cursor: 0,
5438        }
5439    }
5440
5441    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5442        let mut groups: HashMap<String, GroupState> = HashMap::new();
5443        let mut order: Vec<String> = Vec::new();
5444
5445        // If there are no input rows AND no group keys, we still emit one row
5446        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
5447        let mut saw_any = false;
5448
5449        while let Some(row) = self.input.next(ctx)? {
5450            saw_any = true;
5451            let mut key_values = Vec::with_capacity(self.group_keys.len());
5452            for item in &self.group_keys {
5453                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5454            }
5455            let mut hash_key = String::new();
5456            for v in &key_values {
5457                hash_key.push_str(&value_key(v));
5458                hash_key.push('|');
5459            }
5460            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5461                order.push(hash_key.clone());
5462                GroupState {
5463                    key_values: key_values.clone(),
5464                    agg_states: self
5465                        .aggregates
5466                        .iter()
5467                        .map(|a| AggState::initial(a.function))
5468                        .collect(),
5469                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5470                }
5471            });
5472            for (i, spec) in self.aggregates.iter().enumerate() {
5473                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5474                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5475                    if matches!(v, Value::Null) {
5476                        continue;
5477                    }
5478                    let key = value_key(&v);
5479                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5480                    if !seen.insert(key) {
5481                        continue;
5482                    }
5483                }
5484                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5485                // The percentile is a constant expression — evaluate
5486                // it once against the first row we see and stash it
5487                // in the state so finalize() has a number to use.
5488                if let Some(extra_expr) = &spec.extra_arg {
5489                    let need_resolve = matches!(
5490                        &entry.agg_states[i],
5491                        AggState::PercentileDisc {
5492                            percentile: None,
5493                            ..
5494                        } | AggState::PercentileCont {
5495                            percentile: None,
5496                            ..
5497                        }
5498                    );
5499                    if need_resolve {
5500                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5501                        let p = match pv {
5502                            Value::Property(Property::Float64(f)) => f,
5503                            Value::Property(Property::Int64(i)) => i as f64,
5504                            _ => 0.0,
5505                        };
5506                        // openCypher requires the percentile to
5507                        // lie in [0.0, 1.0]; anything else is an
5508                        // `ArgumentError: NumberOutOfRange`.
5509                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
5510                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
5511                        }
5512                        match &mut entry.agg_states[i] {
5513                            AggState::PercentileDisc { percentile, .. }
5514                            | AggState::PercentileCont { percentile, .. } => {
5515                                *percentile = Some(p);
5516                            }
5517                            _ => {}
5518                        }
5519                    }
5520                }
5521            }
5522        }
5523
5524        let mut out = Vec::new();
5525        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
5526            // Empty group, single aggregate row
5527            let mut row = Row::new();
5528            for spec in &self.aggregates {
5529                row.insert(
5530                    spec.alias.clone(),
5531                    AggState::initial(spec.function).finalize(),
5532                );
5533            }
5534            out.push(row);
5535        } else {
5536            for key in order {
5537                let state = groups.remove(&key).unwrap();
5538                let mut row = Row::new();
5539                for (i, item) in self.group_keys.iter().enumerate() {
5540                    let name = item
5541                        .alias
5542                        .clone()
5543                        .unwrap_or_else(|| default_name(&item.expr, i));
5544                    row.insert(name, state.key_values[i].clone());
5545                }
5546                for (i, spec) in self.aggregates.iter().enumerate() {
5547                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
5548                }
5549                out.push(row);
5550            }
5551        }
5552        self.results = Some(out);
5553        Ok(())
5554    }
5555}
5556
5557impl Operator for AggregateOp {
5558    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5559        if self.results.is_none() {
5560            self.compute(ctx)?;
5561        }
5562        let rows = self.results.as_ref().unwrap();
5563        if self.cursor < rows.len() {
5564            let row = rows[self.cursor].clone();
5565            self.cursor += 1;
5566            Ok(Some(row))
5567        } else {
5568            Ok(None)
5569        }
5570    }
5571}
5572
5573struct GroupState {
5574    key_values: Vec<Value>,
5575    agg_states: Vec<AggState>,
5576    distinct_seen: Vec<Option<HashSet<String>>>,
5577}
5578
5579enum AggState {
5580    Count(i64),
5581    Sum {
5582        int_part: i64,
5583        float_part: f64,
5584        is_float: bool,
5585    },
5586    Avg {
5587        total: f64,
5588        count: i64,
5589    },
5590    Min(Option<Value>),
5591    Max(Option<Value>),
5592    Collect(Vec<Value>),
5593    StDev {
5594        sum: f64,
5595        sum_sq: f64,
5596        count: i64,
5597    },
5598    StDevP {
5599        sum: f64,
5600        sum_sq: f64,
5601        count: i64,
5602    },
5603    PercentileDisc {
5604        items: Vec<Value>,
5605        percentile: Option<f64>,
5606    },
5607    PercentileCont {
5608        items: Vec<Value>,
5609        percentile: Option<f64>,
5610    },
5611}
5612
5613impl AggState {
5614    fn initial(func: AggregateFn) -> Self {
5615        match func {
5616            AggregateFn::Count => AggState::Count(0),
5617            AggregateFn::Sum => AggState::Sum {
5618                int_part: 0,
5619                float_part: 0.0,
5620                is_float: false,
5621            },
5622            AggregateFn::Avg => AggState::Avg {
5623                total: 0.0,
5624                count: 0,
5625            },
5626            AggregateFn::Min => AggState::Min(None),
5627            AggregateFn::Max => AggState::Max(None),
5628            AggregateFn::Collect => AggState::Collect(Vec::new()),
5629            AggregateFn::StDev => AggState::StDev {
5630                sum: 0.0,
5631                sum_sq: 0.0,
5632                count: 0,
5633            },
5634            AggregateFn::StDevP => AggState::StDevP {
5635                sum: 0.0,
5636                sum_sq: 0.0,
5637                count: 0,
5638            },
5639            AggregateFn::PercentileDisc => AggState::PercentileDisc {
5640                items: Vec::new(),
5641                percentile: None,
5642            },
5643            AggregateFn::PercentileCont => AggState::PercentileCont {
5644                items: Vec::new(),
5645                percentile: None,
5646            },
5647        }
5648    }
5649
5650    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
5651        match self {
5652            AggState::Count(c) => match arg {
5653                AggregateArg::Star => *c += 1,
5654                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
5655                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
5656                        *c += 1;
5657                    }
5658                }
5659            },
5660            AggState::Sum {
5661                int_part,
5662                float_part,
5663                is_float,
5664            } => {
5665                let v = expr_arg_value(arg, ctx)?;
5666                match v {
5667                    Value::Null => {}
5668                    Value::Property(Property::Int64(i)) => *int_part += i,
5669                    Value::Property(Property::Float64(f)) => {
5670                        *float_part += f;
5671                        *is_float = true;
5672                    }
5673                    _ => return Err(Error::AggregateTypeError),
5674                }
5675            }
5676            AggState::Avg { total, count } => {
5677                let v = expr_arg_value(arg, ctx)?;
5678                match v {
5679                    Value::Null => {}
5680                    Value::Property(Property::Int64(i)) => {
5681                        *total += i as f64;
5682                        *count += 1;
5683                    }
5684                    Value::Property(Property::Float64(f)) => {
5685                        *total += f;
5686                        *count += 1;
5687                    }
5688                    _ => return Err(Error::AggregateTypeError),
5689                }
5690            }
5691            AggState::Min(slot) => {
5692                // `min` / `max` ignore null inputs and operate
5693                // over any `Value` (including lists, nodes etc.),
5694                // not just scalar Properties. The ordering is
5695                // `compare_values`: within a single type the
5696                // natural order, across types the
5697                // `type_order_value` rank.
5698                let v = expr_arg_value(arg, ctx)?;
5699                if matches!(v, Value::Null | Value::Property(Property::Null)) {
5700                    // skip
5701                } else {
5702                    match slot {
5703                        None => *slot = Some(v),
5704                        Some(cur) => {
5705                            if compare_values(&v, cur) == Ordering::Less {
5706                                *cur = v;
5707                            }
5708                        }
5709                    }
5710                }
5711            }
5712            AggState::Max(slot) => {
5713                let v = expr_arg_value(arg, ctx)?;
5714                if matches!(v, Value::Null | Value::Property(Property::Null)) {
5715                    // skip
5716                } else {
5717                    match slot {
5718                        None => *slot = Some(v),
5719                        Some(cur) => {
5720                            if compare_values(&v, cur) == Ordering::Greater {
5721                                *cur = v;
5722                            }
5723                        }
5724                    }
5725                }
5726            }
5727            AggState::Collect(items) => {
5728                let v = expr_arg_value(arg, ctx)?;
5729                if !matches!(v, Value::Null) {
5730                    items.push(v);
5731                }
5732            }
5733            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5734                let v = expr_arg_value(arg, ctx)?;
5735                if !matches!(v, Value::Null) {
5736                    items.push(v);
5737                }
5738            }
5739            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5740                let v = expr_arg_value(arg, ctx)?;
5741                match v {
5742                    Value::Null => {}
5743                    Value::Property(Property::Int64(i)) => {
5744                        let f = i as f64;
5745                        *sum += f;
5746                        *sum_sq += f * f;
5747                        *count += 1;
5748                    }
5749                    Value::Property(Property::Float64(f)) => {
5750                        *sum += f;
5751                        *sum_sq += f * f;
5752                        *count += 1;
5753                    }
5754                    _ => return Err(Error::AggregateTypeError),
5755                }
5756            }
5757        }
5758        Ok(())
5759    }
5760
5761    fn finalize(&self) -> Value {
5762        match self {
5763            AggState::Count(c) => Value::Property(Property::Int64(*c)),
5764            AggState::Sum {
5765                int_part,
5766                float_part,
5767                is_float,
5768            } => {
5769                if *is_float {
5770                    Value::Property(Property::Float64(*float_part + *int_part as f64))
5771                } else {
5772                    Value::Property(Property::Int64(*int_part))
5773                }
5774            }
5775            AggState::Avg { total, count } => {
5776                if *count == 0 {
5777                    Value::Null
5778                } else {
5779                    Value::Property(Property::Float64(*total / *count as f64))
5780                }
5781            }
5782            AggState::Min(slot) | AggState::Max(slot) => match slot {
5783                Some(v) => v.clone(),
5784                None => Value::Null,
5785            },
5786            AggState::Collect(items) => Value::List(items.clone()),
5787            AggState::StDevP { sum, sum_sq, count } => {
5788                if *count == 0 {
5789                    Value::Property(Property::Float64(0.0))
5790                } else {
5791                    let n = *count as f64;
5792                    let variance = *sum_sq / n - (*sum / n).powi(2);
5793                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5794                }
5795            }
5796            AggState::StDev { sum, sum_sq, count } => {
5797                if *count < 2 {
5798                    Value::Property(Property::Float64(0.0))
5799                } else {
5800                    let n = *count as f64;
5801                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5802                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5803                }
5804            }
5805            AggState::PercentileDisc { items, percentile } => {
5806                percentile_disc(items, percentile.unwrap_or(0.0))
5807            }
5808            AggState::PercentileCont { items, percentile } => {
5809                percentile_cont(items, percentile.unwrap_or(0.0))
5810            }
5811        }
5812    }
5813}
5814
5815fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5816    match arg {
5817        AggregateArg::Star => Err(Error::AggregateTypeError),
5818        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5819    }
5820}
5821
5822/// Coerce a collected aggregate value into an f64 for percentile
5823/// math. Unhandled types fall back to NaN; the caller sorts NaN
5824/// out of the stream before computing the percentile.
5825fn value_to_f64(v: &Value) -> f64 {
5826    match v {
5827        Value::Property(Property::Int64(i)) => *i as f64,
5828        Value::Property(Property::Float64(f)) => *f,
5829        _ => f64::NAN,
5830    }
5831}
5832
5833/// `percentileDisc(expr, p)` — discrete percentile. Returns the
5834/// smallest value at or above the `p`-ranked position. Numbers only;
5835/// non-numeric values get sorted to the end and are effectively
5836/// ignored unless the percentile lands on one.
5837fn percentile_disc(items: &[Value], p: f64) -> Value {
5838    let mut nums: Vec<(f64, Value)> = items
5839        .iter()
5840        .map(|v| (value_to_f64(v), v.clone()))
5841        .filter(|(f, _)| !f.is_nan())
5842        .collect();
5843    if nums.is_empty() {
5844        return Value::Null;
5845    }
5846    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5847    let p = p.clamp(0.0, 1.0);
5848    let n = nums.len();
5849    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
5850    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5851    nums[idx.min(n - 1)].1.clone()
5852}
5853
5854/// `percentileCont(expr, p)` — continuous percentile. Linearly
5855/// interpolates between the two ranks that bracket the fractional
5856/// position `p * (n - 1)`. Returns a Float64.
5857fn percentile_cont(items: &[Value], p: f64) -> Value {
5858    let mut nums: Vec<f64> = items
5859        .iter()
5860        .map(value_to_f64)
5861        .filter(|f| !f.is_nan())
5862        .collect();
5863    if nums.is_empty() {
5864        return Value::Null;
5865    }
5866    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5867    let p = p.clamp(0.0, 1.0);
5868    let n = nums.len();
5869    if n == 1 {
5870        return Value::Property(Property::Float64(nums[0]));
5871    }
5872    let pos = p * (n as f64 - 1.0);
5873    let lo = pos.floor() as usize;
5874    let hi = pos.ceil() as usize;
5875    let frac = pos - lo as f64;
5876    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5877    Value::Property(Property::Float64(v))
5878}
5879
5880struct SkipOp {
5881    input: Box<dyn Operator>,
5882    count_expr: Expr,
5883    remaining: Option<i64>,
5884}
5885
5886impl SkipOp {
5887    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5888        Self {
5889            input,
5890            count_expr,
5891            remaining: None,
5892        }
5893    }
5894}
5895
5896impl Operator for SkipOp {
5897    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5898        if self.remaining.is_none() {
5899            let empty = Row::new();
5900            let ectx = ctx.eval_ctx(&empty);
5901            let val = eval_expr(&self.count_expr, &ectx)?;
5902            self.remaining = Some(expr_to_count(val)?);
5903        }
5904        let rem = self.remaining.as_mut().unwrap();
5905        while *rem > 0 {
5906            if self.input.next(ctx)?.is_none() {
5907                return Ok(None);
5908            }
5909            *rem -= 1;
5910        }
5911        self.input.next(ctx)
5912    }
5913}
5914
5915struct LimitOp {
5916    input: Box<dyn Operator>,
5917    count_expr: Expr,
5918    remaining: Option<i64>,
5919}
5920
5921impl LimitOp {
5922    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5923        Self {
5924            input,
5925            count_expr,
5926            remaining: None,
5927        }
5928    }
5929}
5930
5931impl Operator for LimitOp {
5932    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5933        if self.remaining.is_none() {
5934            let empty = Row::new();
5935            let ectx = ctx.eval_ctx(&empty);
5936            let val = eval_expr(&self.count_expr, &ectx)?;
5937            self.remaining = Some(expr_to_count(val)?);
5938        }
5939        let rem = self.remaining.as_mut().unwrap();
5940        if *rem <= 0 {
5941            return Ok(None);
5942        }
5943        match self.input.next(ctx)? {
5944            Some(row) => {
5945                *rem -= 1;
5946                Ok(Some(row))
5947            }
5948            None => Ok(None),
5949        }
5950    }
5951}
5952
5953fn expr_to_count(val: Value) -> Result<i64> {
5954    match val {
5955        Value::Null | Value::Property(Property::Null) => Ok(0),
5956        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5957        // openCypher: SKIP/LIMIT require an integer. Float values
5958        // (including integer-valued floats) are rejected as
5959        // InvalidArgumentType — they'd only be valid after an
5960        // explicit cast, which the user must write themselves.
5961        _ => Err(Error::TypeMismatch),
5962    }
5963}