Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12    ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13    Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14    ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17    ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18    PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24/// Shared tombstone set for nodes / edges that have been DELETEd
25/// earlier in the current query. Property / label / type / keys /
26/// id accesses consult this set and raise `DeletedEntityAccess`
27/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
28/// runtime instead of reading off a stale clone.
29#[derive(Default)]
30pub struct Tombstones {
31    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36    pub store: &'a dyn GraphReader,
37    pub writer: &'a dyn GraphWriter,
38    /// Per-query parameter bindings. Empty for the single-node and Raft
39    /// entry points that don't carry a Bolt RUN; populated with the
40    /// driver-supplied params on the Bolt path. Threaded into every
41    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
42    pub params: &'a ParamMap,
43    /// Procedures known to this execution. Defaults to an empty
44    /// registry on call sites that don't care; the TCK harness and
45    /// any future server startup plug in a populated registry so
46    /// `CALL ns.name(...)` resolves.
47    pub procedures: &'a ProcedureRegistry,
48    /// Outer-scope rows contributed by enclosing operators. The
49    /// innermost slot (first entry) is the nearest outer. Set by
50    /// [`CartesianProductOp`] when running its right side so that
51    /// operators inside — typically `EdgeExpandOp` /
52    /// `VarLengthExpandOp` constraint lookups — can resolve
53    /// variables the right-side scan didn't bind directly. Empty
54    /// at the top level.
55    pub outer_rows: &'a [&'a Row],
56    /// Tombstones for entities deleted earlier in the same query.
57    /// Shared by reference so DeleteOp can insert and downstream
58    /// eval can check.
59    pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64    fn put_node(&self, _: &Node) -> Result<()> {
65        Ok(())
66    }
67    fn put_edge(&self, _: &Edge) -> Result<()> {
68        Ok(())
69    }
70    fn delete_edge(&self, _: EdgeId) -> Result<()> {
71        Ok(())
72    }
73    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74        Ok(())
75    }
76}
77
78impl<'a> ExecCtx<'a> {
79    /// Build an `EvalCtx` wrapping the given row alongside this
80    /// exec context's params and reader. Used by operators to
81    /// bridge the per-operator context into the expression
82    /// evaluator without repeating the field list at every
83    /// `eval_expr` call site.
84    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85    where
86        'a: 'b,
87    {
88        EvalCtx {
89            row,
90            params: self.params,
91            reader: self.store,
92            procedures: self.procedures,
93            outer_rows: self.outer_rows,
94            tombstones: self.tombstones,
95        }
96    }
97
98    /// Look up a variable in `row` first, then in each outer-scope
99    /// row in order. Returns the nearest match. Used for constraint
100    /// lookups inside right-side operators of a `CartesianProduct`
101    /// so a fresh scan that references an outer binding
102    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
103    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104        if let Some(v) = row.get(name) {
105            return Some(v);
106        }
107        for outer in self.outer_rows {
108            if let Some(v) = outer.get(name) {
109                return Some(v);
110            }
111        }
112        None
113    }
114}
115
116pub trait Operator {
117    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120/// Execute a plan using the given store for both reads and writes.
121/// Equivalent to [`execute_with_reader`] with the store acting as both
122/// and an empty parameter map.
123pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124    let params = ParamMap::new();
125    execute_with_reader(
126        plan,
127        store as &dyn GraphReader,
128        store as &dyn GraphWriter,
129        &params,
130    )
131}
132
133/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
134/// sending mutations to a separate [`GraphWriter`]. Used by in-process
135/// setups where reads are always cheap-local (single node or
136/// full-replica Raft). No parameters.
137pub fn execute_with_writer(
138    plan: &LogicalPlan,
139    store: &RocksDbStorageEngine,
140    writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142    let params = ParamMap::new();
143    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
144}
145
146/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
147/// parameter map for `$param` resolution. Routing mode uses a
148/// partitioned reader that fans out reads across peers; the Bolt
149/// listener supplies the driver-bound param map.
150pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151    let text = meshdb_cypher::format_plan(plan);
152    let mut row = Row::new();
153    row.insert("plan".to_string(), Value::Property(Property::String(text)));
154    vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158    let result_rows = execute(plan, store)?;
159    let row_count = result_rows.len() as i64;
160    let plan_text = meshdb_cypher::format_plan(plan);
161    let summary = format!("{plan_text}\nRows: {row_count}");
162    let mut row = Row::new();
163    row.insert(
164        "profile".to_string(),
165        Value::Property(Property::String(summary)),
166    );
167    row.insert(
168        "rows".to_string(),
169        Value::Property(Property::Int64(row_count)),
170    );
171    Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175    plan: &LogicalPlan,
176    reader: &dyn GraphReader,
177    writer: &dyn GraphWriter,
178    params: &ParamMap,
179) -> Result<Vec<Row>> {
180    let empty_procs = ProcedureRegistry::new();
181    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184/// Like [`execute_with_reader`] but with an explicit procedure
185/// registry in scope. Used by the TCK harness to mount mock
186/// procedures declared by `there exists a procedure ...` steps,
187/// and reserved for the server startup path once built-in
188/// procedures land.
189/// Run `plan` once for a caller-supplied "outer" row that's
190/// folded into the operator pipeline as a seed. Used by
191/// `CALL { ... } IN TRANSACTIONS` to evaluate the body once per
192/// input row (with the row's bindings visible inside the body)
193/// while letting the caller batch writes across multiple
194/// invocations into a shared `BufferingGraphWriter`.
195///
196/// The `seed` row is threaded into every operator's
197/// `outer_rows` list, mirroring what `CallSubqueryOp` does
198/// internally for the non-batched form. Pass `None` to invoke
199/// without a seed (equivalent to
200/// [`execute_with_reader_and_procs`]).
201pub fn execute_with_seed(
202    plan: &LogicalPlan,
203    seed: Option<&Row>,
204    reader: &dyn GraphReader,
205    writer: &dyn GraphWriter,
206    params: &ParamMap,
207    procedures: &ProcedureRegistry,
208) -> Result<Vec<Row>> {
209    crate::eval::reset_statement_time();
210    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
211        return Ok(rows);
212    }
213    let suppress_output = is_write_only_plan(plan);
214    let mut op = build_op_inner(plan, seed, &mut None);
215    let tombstones = Tombstones::default();
216    let outer_rows: Vec<&Row> = seed.into_iter().collect();
217    let ctx = ExecCtx {
218        store: reader,
219        writer,
220        params,
221        procedures,
222        outer_rows: &outer_rows,
223        tombstones: &tombstones,
224    };
225    let mut rows = Vec::new();
226    while let Some(row) = op.next(&ctx)? {
227        rows.push(row);
228    }
229    if suppress_output {
230        Ok(Vec::new())
231    } else {
232        Ok(rows)
233    }
234}
235
236/// Run `plan` with a pre-materialised set of rows substituted
237/// for the (single) `LogicalPlan::CallSubqueryInTransactions`
238/// node in the tree. Used by
239/// `MeshService::execute_call_in_transactions` to fold its
240/// already-batched-and-committed body output back into the
241/// regular pipeline so wrapping clauses (Project / OrderBy /
242/// Limit / Aggregate / Filter) can run untouched.
243///
244/// Panics at build time if the plan doesn't contain exactly one
245/// IN TRANSACTIONS node. The dispatcher detects the wrapping
246/// shape before calling this, so reaching that panic indicates a
247/// wiring bug.
248pub fn execute_with_in_tx_substitute(
249    plan: &LogicalPlan,
250    in_tx_rows: Vec<Row>,
251    reader: &dyn GraphReader,
252    writer: &dyn GraphWriter,
253    params: &ParamMap,
254    procedures: &ProcedureRegistry,
255) -> Result<Vec<Row>> {
256    crate::eval::reset_statement_time();
257    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
258        return Ok(rows);
259    }
260    let suppress_output = is_write_only_plan(plan);
261    let mut substitute = Some(in_tx_rows);
262    let mut op = build_op_inner(plan, None, &mut substitute);
263    let tombstones = Tombstones::default();
264    let ctx = ExecCtx {
265        store: reader,
266        writer,
267        params,
268        procedures,
269        outer_rows: &[],
270        tombstones: &tombstones,
271    };
272    let mut rows = Vec::new();
273    while let Some(row) = op.next(&ctx)? {
274        rows.push(row);
275    }
276    if suppress_output {
277        Ok(Vec::new())
278    } else {
279        Ok(rows)
280    }
281}
282
283pub fn execute_with_reader_and_procs(
284    plan: &LogicalPlan,
285    reader: &dyn GraphReader,
286    writer: &dyn GraphWriter,
287    params: &ParamMap,
288    procedures: &ProcedureRegistry,
289) -> Result<Vec<Row>> {
290    // `datetime()`, `localtime()`, and friends cache "now" in a
291    // thread-local so multiple calls inside the same statement see
292    // the same instant. Clear it at the start of every execution so
293    // the next query gets a fresh reading.
294    crate::eval::reset_statement_time();
295    // Schema DDL never enters the operator pipeline — it's a
296    // side-effect on the backing store, not a row-producing query.
297    // Short-circuit here so `build_op` stays a pure plan-to-operator
298    // mapping and doesn't need DDL-specific cases.
299    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
300        return Ok(rows);
301    }
302    let suppress_output = is_write_only_plan(plan);
303    let mut op = build_op(plan);
304    let tombstones = Tombstones::default();
305    let ctx = ExecCtx {
306        store: reader,
307        writer,
308        params,
309        procedures,
310        outer_rows: &[],
311        tombstones: &tombstones,
312    };
313    let mut rows = Vec::new();
314    while let Some(row) = op.next(&ctx)? {
315        rows.push(row);
316    }
317    if suppress_output {
318        Ok(Vec::new())
319    } else {
320        Ok(rows)
321    }
322}
323
324fn is_write_only_plan(plan: &LogicalPlan) -> bool {
325    // A plan is write-only when its top-level operator is a mutation
326    // with no projection (Project/Identity) wrapping it. Mutations
327    // with RETURN have a Project on top.
328    match plan {
329        LogicalPlan::CreatePath { .. }
330        | LogicalPlan::Delete { .. }
331        | LogicalPlan::SetProperty { .. }
332        | LogicalPlan::Remove { .. }
333        | LogicalPlan::Foreach { .. }
334        | LogicalPlan::MergeNode { .. }
335        | LogicalPlan::MergeEdge { .. } => true,
336        _ => false,
337    }
338}
339
340/// True when `plan` contains any write-bearing operator anywhere
341/// in its subtree. Used by [`build_op`] to flag `Limit` operators
342/// that need to drain their input to ensure side effects happen
343/// even when the limit suppresses all output rows — Cypher
344/// specifies write operators are eager regardless of how many
345/// rows downstream `LIMIT 0` / `LIMIT N` consumes.
346pub(crate) fn plan_contains_writes(plan: &LogicalPlan) -> bool {
347    use LogicalPlan::*;
348    match plan {
349        // Write-bearing variants — terminate true.
350        CreatePath { .. }
351        | Delete { .. }
352        | SetProperty { .. }
353        | Remove { .. }
354        | MergeNode { .. }
355        | MergeEdge { .. }
356        | Foreach { .. } => true,
357
358        // Container variants with a single required `input`.
359        Filter { input, .. }
360        | Project { input, .. }
361        | Aggregate { input, .. }
362        | Distinct { input, .. }
363        | OrderBy { input, .. }
364        | Skip { input, .. }
365        | Limit { input, .. }
366        | EdgeExpand { input, .. }
367        | OptionalEdgeExpand { input, .. }
368        | VarLengthExpand { input, .. }
369        | Identity { input, .. }
370        | CoalesceNullRow { input, .. }
371        | UnwindChain { input, .. }
372        | BindPath { input, .. }
373        | ShortestPath { input, .. } => plan_contains_writes(input),
374
375        // Two-input variant.
376        CartesianProduct { left, right } => {
377            plan_contains_writes(left) || plan_contains_writes(right)
378        }
379
380        // UNION carries N branches; any can be write-bearing.
381        Union { branches, .. } => branches.iter().any(plan_contains_writes),
382
383        // Subquery wrappers carry both an `input` row source and a
384        // `body` plan; either can be write-bearing.
385        CallSubquery { input, body } | OptionalApply { input, body, .. } => {
386            plan_contains_writes(input) || plan_contains_writes(body)
387        }
388        // CALL ... IN TRANSACTIONS commits each batch internally, so
389        // its writes don't leak into a surrounding LimitOp's drain
390        // logic — but the input subtree can still be write-bearing
391        // (e.g. a preceding CREATE before the CALL). Recurse only
392        // into the input.
393        CallSubqueryInTransactions { input, .. } => plan_contains_writes(input),
394        // apoc.periodic.iterate is fully self-contained: both
395        // iterate and action subtrees are run by the dispatcher,
396        // not by the surrounding operator pipeline. From a
397        // wrapping LimitOp's perspective, no writes pass
398        // through this node.
399        ApocPeriodicIterate { .. } => false,
400
401        // Variants whose `input` is optional (top-level producer
402        // when `None`).
403        ProcedureCall { input, .. } | LoadCsv { input, .. } => {
404            input.as_ref().map_or(false, |i| plan_contains_writes(i))
405        }
406
407        // Leaves and DDL — no writes flow through here.
408        _ => false,
409    }
410}
411
412/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
413/// statement and was handled; `Ok(None)` when it's a regular graph
414/// operation that the operator pipeline should execute normally.
415///
416/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
417/// value so Bolt clients see a non-empty RECORD stream rather than
418/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
419/// `SHOW INDEXES` returns one row per registered index with
420/// `label`, `property`, and `state` columns.
421fn try_execute_ddl(
422    plan: &LogicalPlan,
423    reader: &dyn GraphReader,
424    writer: &dyn GraphWriter,
425) -> Result<Option<Vec<Row>>> {
426    match plan {
427        LogicalPlan::CreatePropertyIndex { label, properties } => {
428            writer.create_property_index(label, properties)?;
429            Ok(Some(vec![node_index_ddl_ack_row(
430                "created", label, properties,
431            )]))
432        }
433        LogicalPlan::DropPropertyIndex { label, properties } => {
434            writer.drop_property_index(label, properties)?;
435            Ok(Some(vec![node_index_ddl_ack_row(
436                "dropped", label, properties,
437            )]))
438        }
439        LogicalPlan::CreateEdgePropertyIndex {
440            edge_type,
441            properties,
442        } => {
443            writer.create_edge_property_index(edge_type, properties)?;
444            Ok(Some(vec![edge_index_ddl_ack_row(
445                "created", edge_type, properties,
446            )]))
447        }
448        LogicalPlan::DropEdgePropertyIndex {
449            edge_type,
450            properties,
451        } => {
452            writer.drop_edge_property_index(edge_type, properties)?;
453            Ok(Some(vec![edge_index_ddl_ack_row(
454                "dropped", edge_type, properties,
455            )]))
456        }
457        LogicalPlan::ShowPropertyIndexes => {
458            // SHOW is a pure read — source it from the reader so
459            // overlay/partitioned readers deliver the right view
460            // without round-tripping through a writer. Both
461            // node-scoped and edge-scoped indexes land in the same
462            // row stream, distinguished by the `scope` column.
463            let mut rows: Vec<Row> = Vec::new();
464            for (label, properties) in reader.list_property_indexes()? {
465                rows.push(show_index_row("NODE", label, properties));
466            }
467            for (edge_type, properties) in reader.list_edge_property_indexes()? {
468                rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
469            }
470            Ok(Some(rows))
471        }
472        LogicalPlan::CreatePointIndex { label, property } => {
473            writer.create_point_index(label, property)?;
474            Ok(Some(vec![point_index_ddl_ack_row(
475                "created", "NODE", label, property,
476            )]))
477        }
478        LogicalPlan::DropPointIndex { label, property } => {
479            writer.drop_point_index(label, property)?;
480            Ok(Some(vec![point_index_ddl_ack_row(
481                "dropped", "NODE", label, property,
482            )]))
483        }
484        LogicalPlan::CreateEdgePointIndex {
485            edge_type,
486            property,
487        } => {
488            writer.create_edge_point_index(edge_type, property)?;
489            Ok(Some(vec![point_index_ddl_ack_row(
490                "created",
491                "RELATIONSHIP",
492                edge_type,
493                property,
494            )]))
495        }
496        LogicalPlan::DropEdgePointIndex {
497            edge_type,
498            property,
499        } => {
500            writer.drop_edge_point_index(edge_type, property)?;
501            Ok(Some(vec![point_index_ddl_ack_row(
502                "dropped",
503                "RELATIONSHIP",
504                edge_type,
505                property,
506            )]))
507        }
508        LogicalPlan::ShowPointIndexes => {
509            // Kept on its own row stream rather than folded into
510            // `SHOW INDEXES` so the point-specific `type` column
511            // stays a stable part of the shape and callers can
512            // filter without depending on string-matching `type`
513            // across mixed rows. Node- and edge-scope rows are
514            // merged here, disambiguated by the `scope` column.
515            let mut rows: Vec<Row> = Vec::new();
516            for (label, property) in reader.list_point_indexes()? {
517                rows.push(show_point_index_row("NODE", label, property));
518            }
519            for (edge_type, property) in reader.list_edge_point_indexes()? {
520                rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
521            }
522            Ok(Some(rows))
523        }
524        LogicalPlan::CreatePropertyConstraint {
525            name,
526            scope,
527            properties,
528            kind,
529            if_not_exists,
530        } => {
531            let storage_kind = match kind {
532                ConstraintKind::Unique => PropertyConstraintKind::Unique,
533                ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
534                ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
535                ConstraintKind::PropertyType(t) => {
536                    PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
537                }
538            };
539            let storage_scope = cypher_to_storage_scope(scope);
540            let spec = writer.create_property_constraint(
541                name.as_deref(),
542                &storage_scope,
543                properties,
544                storage_kind,
545                *if_not_exists,
546            )?;
547            Ok(Some(vec![constraint_ack_row("created", &spec)]))
548        }
549        LogicalPlan::DropPropertyConstraint { name, if_exists } => {
550            writer.drop_property_constraint(name, *if_exists)?;
551            let mut row = Row::default();
552            row.insert(
553                "state".into(),
554                Value::Property(Property::String("dropped".into())),
555            );
556            row.insert(
557                "name".into(),
558                Value::Property(Property::String(name.clone())),
559            );
560            Ok(Some(vec![row]))
561        }
562        LogicalPlan::ShowPropertyConstraints => {
563            let specs = reader.list_property_constraints()?;
564            let rows = specs.into_iter().map(constraint_show_row).collect();
565            Ok(Some(rows))
566        }
567        _ => Ok(None),
568    }
569}
570
571/// One-row acknowledgement emitted after `CREATE CONSTRAINT`. Carries
572/// the resolved spec so callers see the final name when they omitted
573/// it. Columns match `SHOW CONSTRAINTS` (plus `state`) so Bolt clients
574/// can uniformly format DDL responses.
575fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
576    let mut row = constraint_show_row(spec.clone());
577    row.insert(
578        "state".into(),
579        Value::Property(Property::String(state.into())),
580    );
581    row
582}
583
584/// Row shape for `SHOW CONSTRAINTS` and `db.constraints()`. Columns:
585/// `name`, `scope` ("NODE" / "RELATIONSHIP"), `label` (label or edge
586/// type, depending on scope), `properties` (list), and `type`.
587/// Single-property constraints still carry a one-element `properties`
588/// list; composite kinds (e.g. `NodeKey`) carry the full tuple.
589fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
590    let mut row = Row::default();
591    row.insert("name".into(), Value::Property(Property::String(spec.name)));
592    let (scope_tag, target) = match spec.scope {
593        meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
594        meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
595    };
596    row.insert(
597        "scope".into(),
598        Value::Property(Property::String(scope_tag.into())),
599    );
600    // `label` stays for backwards compatibility — it carries the
601    // scope's target regardless of node vs relationship, matching
602    // the index DDL row's column for historical consistency. A
603    // separate `scope` column tells you what kind of target it is.
604    row.insert("label".into(), Value::Property(Property::String(target)));
605    let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
606    row.insert("properties".into(), Value::Property(Property::List(props)));
607    row.insert(
608        "type".into(),
609        Value::Property(Property::String(spec.kind.as_string())),
610    );
611    row
612}
613
614/// Convert the Cypher AST's `ConstraintScope` into the storage-layer
615/// enum. Narrow bridge — same shape on both sides; kept in
616/// different crates to preserve the dependency direction.
617fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
618    match scope {
619        CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
620        CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
621    }
622}
623
624/// Convert the Cypher AST's `PropertyType` into the storage-layer
625/// enum. Narrow bridge — the two enums carry the same four variants
626/// but live in different crates to keep the dependency direction
627/// clean (cypher → executor → storage, never the reverse).
628fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
629    match t {
630        CypherPropertyType::String => StoragePropertyType::String,
631        CypherPropertyType::Integer => StoragePropertyType::Integer,
632        CypherPropertyType::Float => StoragePropertyType::Float,
633        CypherPropertyType::Boolean => StoragePropertyType::Boolean,
634    }
635}
636
637/// One-row acknowledgement emitted after `CREATE INDEX` /
638/// `DROP INDEX` on a node scope. Columns mirror the `SHOW INDEXES`
639/// shape (`scope`, `label`, `property`) so Bolt clients can format
640/// DDL responses uniformly with schema-read responses. `label` is
641/// kept as a column name for backwards-compat with pre-edge-index
642/// clients; the `scope` column disambiguates node from edge.
643fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
644    let mut row = Row::default();
645    row.insert(
646        "state".into(),
647        Value::Property(Property::String(state.into())),
648    );
649    row.insert(
650        "scope".into(),
651        Value::Property(Property::String("NODE".into())),
652    );
653    row.insert(
654        "label".into(),
655        Value::Property(Property::String(label.into())),
656    );
657    // `property` stays a single string for backward compat with
658    // drivers that read it directly. Composite specs render as a
659    // comma-joined preview; the `properties` column carries the
660    // authoritative list.
661    row.insert(
662        "property".into(),
663        Value::Property(Property::String(properties.join(","))),
664    );
665    row.insert("properties".into(), properties_list_value(properties));
666    row
667}
668
669/// One-row acknowledgement emitted after `CREATE INDEX` /
670/// `DROP INDEX` on a relationship scope. Mirrors
671/// [`node_index_ddl_ack_row`] but the target lives under
672/// `edge_type` (and `label` carries the same string so existing
673/// driver code that read `label` still sees the target name).
674fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
675    let mut row = Row::default();
676    row.insert(
677        "state".into(),
678        Value::Property(Property::String(state.into())),
679    );
680    row.insert(
681        "scope".into(),
682        Value::Property(Property::String("RELATIONSHIP".into())),
683    );
684    row.insert(
685        "label".into(),
686        Value::Property(Property::String(edge_type.into())),
687    );
688    row.insert(
689        "edge_type".into(),
690        Value::Property(Property::String(edge_type.into())),
691    );
692    row.insert(
693        "property".into(),
694        Value::Property(Property::String(properties.join(","))),
695    );
696    row.insert("properties".into(), properties_list_value(properties));
697    row
698}
699
700fn properties_list_value(properties: &[String]) -> Value {
701    Value::List(
702        properties
703            .iter()
704            .map(|p| Value::Property(Property::String(p.clone())))
705            .collect(),
706    )
707}
708
709/// One-row ack emitted after `CREATE POINT INDEX` / `DROP POINT
710/// INDEX`. `scope` disambiguates node vs relationship; `label`
711/// carries the scope target string (label for node, edge type for
712/// relationship) and — when relationship — `edge_type` carries it
713/// too so drivers can read either column. Matches the two-column
714/// convention from `show_index_row`.
715fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
716    let mut row = Row::default();
717    row.insert(
718        "state".into(),
719        Value::Property(Property::String(state.into())),
720    );
721    row.insert(
722        "scope".into(),
723        Value::Property(Property::String(scope.into())),
724    );
725    row.insert(
726        "type".into(),
727        Value::Property(Property::String("POINT".into())),
728    );
729    row.insert(
730        "label".into(),
731        Value::Property(Property::String(target.into())),
732    );
733    if scope == "RELATIONSHIP" {
734        row.insert(
735            "edge_type".into(),
736            Value::Property(Property::String(target.into())),
737        );
738    }
739    row.insert(
740        "property".into(),
741        Value::Property(Property::String(property.into())),
742    );
743    row
744}
745
746/// Row shape for `SHOW POINT INDEXES`. One row per registered point
747/// index under both scopes, disambiguated by the `scope` column.
748/// Same column shape as `point_index_ddl_ack_row` minus `state`'s
749/// DDL verb (it carries `"online"` here instead).
750fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
751    let mut row = Row::default();
752    row.insert(
753        "scope".into(),
754        Value::Property(Property::String(scope.into())),
755    );
756    row.insert(
757        "type".into(),
758        Value::Property(Property::String("POINT".into())),
759    );
760    row.insert(
761        "label".into(),
762        Value::Property(Property::String(target.clone())),
763    );
764    if scope == "RELATIONSHIP" {
765        row.insert(
766            "edge_type".into(),
767            Value::Property(Property::String(target)),
768        );
769    }
770    row.insert(
771        "property".into(),
772        Value::Property(Property::String(property)),
773    );
774    row.insert(
775        "state".into(),
776        Value::Property(Property::String("online".into())),
777    );
778    row
779}
780
781/// Row shape for `SHOW INDEXES` output. Columns: `scope`
782/// (`"NODE"` / `"RELATIONSHIP"`), `label` (scope target — label for
783/// node, edge type for relationship; kept for backwards-compat),
784/// `property` (comma-joined preview of the properties for composite
785/// specs), `properties` (full list — authoritative), and `state`.
786/// `label` and `edge_type` carry the same string when
787/// `scope == "RELATIONSHIP"` so clients can read either column.
788fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
789    let mut row = Row::default();
790    row.insert(
791        "scope".into(),
792        Value::Property(Property::String(scope.into())),
793    );
794    row.insert(
795        "label".into(),
796        Value::Property(Property::String(target.clone())),
797    );
798    if scope == "RELATIONSHIP" {
799        row.insert(
800            "edge_type".into(),
801            Value::Property(Property::String(target)),
802        );
803    }
804    row.insert(
805        "property".into(),
806        Value::Property(Property::String(properties.join(","))),
807    );
808    row.insert("properties".into(), properties_list_value(&properties));
809    row.insert(
810        "state".into(),
811        Value::Property(Property::String("online".into())),
812    );
813    row
814}
815
816fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
817    build_op_inner(plan, None, &mut None)
818}
819
820/// Build the operator tree for `plan`. `seed` is the optional
821/// outer row threaded into operators that can consume one (used
822/// by `CallSubqueryOp` for per-row body evaluation).
823///
824/// `in_tx_substitute` lets the caller pre-materialize the rows
825/// that a `CallSubqueryInTransactions` node would normally
826/// produce and inject them as a `RowsLiteralOp` instead. Used by
827/// `MeshService::execute_call_in_transactions` to fold its
828/// already-batched-and-committed outputs back into the regular
829/// pipeline so wrapping clauses (Project / OrderBy / Limit /
830/// Aggregate / Filter) can run untouched. When `None` and a
831/// `CallSubqueryInTransactions` node is encountered, the
832/// operator panics — this surfaces missing dispatcher wiring at
833/// build time instead of as a silent semantic regression.
834pub(crate) fn build_op_inner(
835    plan: &LogicalPlan,
836    seed: Option<&Row>,
837    in_tx_substitute: &mut Option<Vec<Row>>,
838) -> Box<dyn Operator> {
839    macro_rules! child {
840        ($p:expr) => {
841            build_op_inner($p, seed, in_tx_substitute)
842        };
843    }
844    match plan {
845        LogicalPlan::CreatePath {
846            input,
847            nodes,
848            edges,
849        } => Box::new(CreatePathOp::new(
850            input.as_ref().map(|p| child!(p)),
851            nodes.clone(),
852            edges.clone(),
853        )),
854        LogicalPlan::CartesianProduct { left, right } => {
855            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
856        }
857        LogicalPlan::Delete {
858            input,
859            detach,
860            vars,
861            exprs,
862        } => Box::new(DeleteOp::new(
863            child!(input),
864            *detach,
865            vars.clone(),
866            exprs.clone(),
867        )),
868        LogicalPlan::SetProperty { input, assignments } => {
869            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
870        }
871        LogicalPlan::Remove { input, items } => {
872            Box::new(RemoveOp::new(child!(input), items.clone()))
873        }
874        LogicalPlan::LoadCsv {
875            input,
876            path_expr,
877            var,
878            with_headers,
879        } => Box::new(LoadCsvOp::new(
880            input.as_ref().map(|p| child!(p)),
881            path_expr.clone(),
882            var.clone(),
883            *with_headers,
884        )),
885        LogicalPlan::Foreach {
886            input,
887            var,
888            list_expr,
889            set_assignments,
890            remove_items,
891        } => Box::new(ForeachOp::new(
892            child!(input),
893            var.clone(),
894            list_expr.clone(),
895            set_assignments.clone(),
896            remove_items.clone(),
897        )),
898        LogicalPlan::CallSubquery { input, body } => {
899            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
900        }
901        LogicalPlan::CallSubqueryInTransactions { .. }
902        | LogicalPlan::ApocPeriodicIterate { .. } => {
903            // CALL ... IN TRANSACTIONS and apoc.periodic.iterate
904            // both commit each batch in its own transaction,
905            // which the operator pipeline can't do (it has no
906            // commit handle). The dispatcher above
907            // (`MeshService::execute_cypher_in_tx`) does the
908            // batched execution itself, then folds the
909            // resulting rows back into the pipeline by passing
910            // them via `in_tx_substitute`. We swap a
911            // `RowsLiteralOp` in place of the batched node so
912            // any wrapping Project / OrderBy / Limit / etc.
913            // operators run untouched against those
914            // pre-materialized rows.
915            //
916            // Reaching this arm with `in_tx_substitute = None`
917            // means the dispatcher was bypassed — panic loudly
918            // so the missing wiring surfaces at build time.
919            match in_tx_substitute.take() {
920                Some(rows) => Box::new(RowsLiteralOp::new(rows)),
921                None => panic!(
922                    "Batched-commit LogicalPlan variant reached `build_op` without a row \
923                     substitute — the server-side dispatcher \
924                     (execute_cypher_in_tx → execute_call_in_transactions / \
925                     execute_apoc_periodic_iterate) must inject one before invoking the \
926                     operator pipeline"
927                ),
928            }
929        }
930        LogicalPlan::OptionalApply {
931            input,
932            body,
933            null_vars,
934        } => Box::new(OptionalApplyOp::new(
935            child!(input),
936            (**body).clone(),
937            null_vars.clone(),
938        )),
939        LogicalPlan::ProcedureCall {
940            input,
941            qualified_name,
942            args,
943            yield_spec,
944            standalone,
945        } => Box::new(ProcedureCallOp::new(
946            input.as_ref().map(|p| child!(p)),
947            qualified_name.clone(),
948            args.clone(),
949            yield_spec.clone(),
950            *standalone,
951        )),
952        LogicalPlan::SeedRow => match seed {
953            Some(r) => Box::new(SeededRowOp {
954                row: Some(r.clone()),
955            }),
956            None => Box::new(SeedRowOp { done: false }),
957        },
958        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
959        LogicalPlan::NodeScanByLabels { var, labels } => {
960            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
961        }
962        LogicalPlan::EdgeExpand {
963            input,
964            src_var,
965            edge_var,
966            dst_var,
967            dst_labels,
968            edge_properties,
969            edge_types,
970            direction,
971            edge_constraint_var,
972        } => Box::new(EdgeExpandOp::new(
973            child!(input),
974            src_var.clone(),
975            edge_var.clone(),
976            dst_var.clone(),
977            dst_labels.clone(),
978            edge_properties.clone(),
979            edge_types.clone(),
980            *direction,
981            edge_constraint_var.clone(),
982        )),
983        LogicalPlan::OptionalEdgeExpand {
984            input,
985            src_var,
986            edge_var,
987            dst_var,
988            dst_labels,
989            dst_properties,
990            edge_types,
991            direction,
992            dst_constraint_var,
993            edge_constraint_var,
994        } => Box::new(OptionalEdgeExpandOp::new(
995            child!(input),
996            src_var.clone(),
997            edge_var.clone(),
998            dst_var.clone(),
999            dst_labels.clone(),
1000            dst_properties.clone(),
1001            edge_types.clone(),
1002            *direction,
1003            dst_constraint_var.clone(),
1004            edge_constraint_var.clone(),
1005        )),
1006        LogicalPlan::VarLengthExpand {
1007            input,
1008            src_var,
1009            edge_var,
1010            dst_var,
1011            dst_labels,
1012            edge_types,
1013            edge_properties,
1014            direction,
1015            min_hops,
1016            max_hops,
1017            path_var,
1018            optional,
1019            dst_constraint_var,
1020            bound_edge_list_var,
1021            excluded_edge_vars,
1022        } => Box::new(VarLengthExpandOp::new(
1023            child!(input),
1024            src_var.clone(),
1025            edge_var.clone(),
1026            dst_var.clone(),
1027            dst_labels.clone(),
1028            edge_types.clone(),
1029            edge_properties.clone(),
1030            *direction,
1031            *min_hops,
1032            *max_hops,
1033            path_var.clone(),
1034            *optional,
1035            dst_constraint_var.clone(),
1036            bound_edge_list_var.clone(),
1037            excluded_edge_vars.clone(),
1038        )),
1039        LogicalPlan::Filter { input, predicate } => {
1040            Box::new(FilterOp::new(child!(input), predicate.clone()))
1041        }
1042        LogicalPlan::Project { input, items } => {
1043            Box::new(ProjectOp::new(child!(input), items.clone()))
1044        }
1045        LogicalPlan::Aggregate {
1046            input,
1047            group_keys,
1048            aggregates,
1049        } => Box::new(AggregateOp::new(
1050            child!(input),
1051            group_keys.clone(),
1052            aggregates.clone(),
1053        )),
1054        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
1055        LogicalPlan::CoalesceNullRow { input, null_vars } => {
1056            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
1057        }
1058        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
1059        LogicalPlan::OrderBy { input, sort_items } => {
1060            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
1061        }
1062        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
1063        LogicalPlan::Limit { input, count } => {
1064            // When the input subtree contains a write, we must
1065            // drain it after the limit is exhausted (or before
1066            // returning None on `LIMIT 0`) — Cypher specifies
1067            // write side effects are eager regardless of how
1068            // many output rows downstream consumes.
1069            let drain_on_complete = plan_contains_writes(input);
1070            Box::new(LimitOp::new(
1071                child!(input),
1072                count.clone(),
1073                drain_on_complete,
1074            ))
1075        }
1076        LogicalPlan::MergeNode {
1077            input,
1078            var,
1079            labels,
1080            properties,
1081            on_create,
1082            on_match,
1083        } => Box::new(MergeNodeOp::new(
1084            input.as_ref().map(|p| child!(p)),
1085            var.clone(),
1086            labels.clone(),
1087            properties.clone(),
1088            on_create.clone(),
1089            on_match.clone(),
1090        )),
1091        LogicalPlan::MergeEdge {
1092            input,
1093            edge_var,
1094            src_var,
1095            dst_var,
1096            edge_type,
1097            undirected,
1098            properties,
1099            on_create,
1100            on_match,
1101        } => Box::new(MergeEdgeOp::new(
1102            child!(input),
1103            edge_var.clone(),
1104            src_var.clone(),
1105            dst_var.clone(),
1106            edge_type.clone(),
1107            *undirected,
1108            properties.clone(),
1109            on_create.clone(),
1110            on_match.clone(),
1111        )),
1112        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
1113        LogicalPlan::UnwindChain { input, var, expr } => {
1114            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
1115        }
1116        LogicalPlan::IndexSeek {
1117            var,
1118            label,
1119            properties,
1120            values,
1121        } => Box::new(IndexSeekOp::new(
1122            var.clone(),
1123            label.clone(),
1124            properties.clone(),
1125            values.clone(),
1126        )),
1127        LogicalPlan::PointIndexSeek {
1128            var,
1129            label,
1130            property,
1131            bounds,
1132        } => Box::new(PointIndexSeekOp::new(
1133            var.clone(),
1134            label.clone(),
1135            property.clone(),
1136            bounds.clone(),
1137        )),
1138        LogicalPlan::EdgeSeek {
1139            edge_var,
1140            src_var,
1141            dst_var,
1142            edge_type,
1143            property,
1144            value,
1145            direction,
1146            residual_properties,
1147        } => Box::new(EdgeSeekOp::new(
1148            edge_var.clone(),
1149            src_var.clone(),
1150            dst_var.clone(),
1151            edge_type.clone(),
1152            property.clone(),
1153            value.clone(),
1154            *direction,
1155            residual_properties.clone(),
1156        )),
1157        LogicalPlan::EdgePointIndexSeek {
1158            edge_var,
1159            src_var,
1160            dst_var,
1161            edge_type,
1162            property,
1163            direction,
1164            bounds,
1165        } => Box::new(EdgePointIndexSeekOp::new(
1166            edge_var.clone(),
1167            src_var.clone(),
1168            dst_var.clone(),
1169            edge_type.clone(),
1170            property.clone(),
1171            *direction,
1172            bounds.clone(),
1173        )),
1174        // DDL plans are handled by `try_execute_ddl` before the
1175        // operator pipeline is built, so they should never reach
1176        // this point. An assertion here catches any future
1177        // refactor that forgets to gate them.
1178        LogicalPlan::Union { branches, all } => {
1179            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
1180            Box::new(UnionOp::new(branch_ops, *all))
1181        }
1182        LogicalPlan::BindPath {
1183            input,
1184            path_var,
1185            node_vars,
1186            edge_vars,
1187        } => Box::new(BindPathOp::new(
1188            child!(input),
1189            path_var.clone(),
1190            node_vars.clone(),
1191            edge_vars.clone(),
1192        )),
1193        LogicalPlan::ShortestPath {
1194            input,
1195            src_var,
1196            dst_var,
1197            path_var,
1198            edge_types,
1199            direction,
1200            max_hops,
1201            kind,
1202        } => Box::new(ShortestPathOp::new(
1203            child!(input),
1204            src_var.clone(),
1205            dst_var.clone(),
1206            path_var.clone(),
1207            edge_types.clone(),
1208            *direction,
1209            *max_hops,
1210            *kind,
1211        )),
1212        LogicalPlan::CreatePropertyIndex { .. }
1213        | LogicalPlan::DropPropertyIndex { .. }
1214        | LogicalPlan::CreateEdgePropertyIndex { .. }
1215        | LogicalPlan::DropEdgePropertyIndex { .. }
1216        | LogicalPlan::ShowPropertyIndexes
1217        | LogicalPlan::CreatePointIndex { .. }
1218        | LogicalPlan::DropPointIndex { .. }
1219        | LogicalPlan::CreateEdgePointIndex { .. }
1220        | LogicalPlan::DropEdgePointIndex { .. }
1221        | LogicalPlan::ShowPointIndexes
1222        | LogicalPlan::CreatePropertyConstraint { .. }
1223        | LogicalPlan::DropPropertyConstraint { .. }
1224        | LogicalPlan::ShowPropertyConstraints => {
1225            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1226        }
1227    }
1228}
1229
1230struct UnwindOp {
1231    var: String,
1232    expr: Expr,
1233    items: Option<Vec<Value>>,
1234    cursor: usize,
1235}
1236
1237impl UnwindOp {
1238    fn new(var: String, expr: Expr) -> Self {
1239        Self {
1240            var,
1241            expr,
1242            items: None,
1243            cursor: 0,
1244        }
1245    }
1246}
1247
1248impl Operator for UnwindOp {
1249    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1250        if self.items.is_none() {
1251            let empty = Row::new();
1252            let ectx = EvalCtx {
1253                row: &empty,
1254                params: ctx.params,
1255                reader: ctx.store,
1256                procedures: ctx.procedures,
1257                outer_rows: ctx.outer_rows,
1258                tombstones: ctx.tombstones,
1259            };
1260            let val = eval_expr(&self.expr, &ectx)?;
1261            self.items = Some(coerce_unwind_list(val)?);
1262        }
1263        let items = self.items.as_ref().unwrap();
1264        if self.cursor < items.len() {
1265            let v = items[self.cursor].clone();
1266            self.cursor += 1;
1267            let mut row = Row::new();
1268            row.insert(self.var.clone(), v);
1269            Ok(Some(row))
1270        } else {
1271            Ok(None)
1272        }
1273    }
1274}
1275
1276/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
1277/// to produce a list, and emits one output row per list element.
1278/// Each output row inherits every binding from the input row plus
1279/// a new `var` binding. Empty / null lists drop the input row and
1280/// the operator immediately pulls the next input row.
1281struct UnwindChainOp {
1282    input: Box<dyn Operator>,
1283    var: String,
1284    expr: Expr,
1285    current_row: Option<Row>,
1286    items: Vec<Value>,
1287    cursor: usize,
1288}
1289
1290impl UnwindChainOp {
1291    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1292        Self {
1293            input,
1294            var,
1295            expr,
1296            current_row: None,
1297            items: Vec::new(),
1298            cursor: 0,
1299        }
1300    }
1301}
1302
1303impl Operator for UnwindChainOp {
1304    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1305        loop {
1306            if let Some(base) = &self.current_row {
1307                if self.cursor < self.items.len() {
1308                    let v = self.items[self.cursor].clone();
1309                    self.cursor += 1;
1310                    let mut row = base.clone();
1311                    row.insert(self.var.clone(), v);
1312                    return Ok(Some(row));
1313                }
1314                self.current_row = None;
1315                self.items.clear();
1316                self.cursor = 0;
1317            }
1318            let base = match self.input.next(ctx)? {
1319                Some(r) => r,
1320                None => return Ok(None),
1321            };
1322            let ectx = EvalCtx {
1323                row: &base,
1324                params: ctx.params,
1325                reader: ctx.store,
1326                procedures: ctx.procedures,
1327                outer_rows: ctx.outer_rows,
1328                tombstones: ctx.tombstones,
1329            };
1330            let val = eval_expr(&self.expr, &ectx)?;
1331            self.items = coerce_unwind_list(val)?;
1332            self.current_row = Some(base);
1333        }
1334    }
1335}
1336
1337/// Shared list-coercion used by both UNWIND operators. Accepts a
1338/// native executor `Value::List`, a property-list
1339/// `Property::List`, or `Null` (treated as an empty list). Any
1340/// other shape is a type error — UNWIND is defined over lists.
1341fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1342    match val {
1343        Value::List(items) => Ok(items),
1344        Value::Property(Property::List(props)) => {
1345            Ok(props.into_iter().map(Value::Property).collect())
1346        }
1347        Value::Null => Ok(Vec::new()),
1348        _ => Err(Error::TypeMismatch),
1349    }
1350}
1351
1352struct CreatePathOp {
1353    input: Option<Box<dyn Operator>>,
1354    nodes: Vec<CreateNodeSpec>,
1355    edges: Vec<CreateEdgeSpec>,
1356    done: bool,
1357    buffered: Option<Vec<Row>>,
1358    cursor: usize,
1359}
1360
1361impl CreatePathOp {
1362    fn new(
1363        input: Option<Box<dyn Operator>>,
1364        nodes: Vec<CreateNodeSpec>,
1365        edges: Vec<CreateEdgeSpec>,
1366    ) -> Self {
1367        Self {
1368            input,
1369            nodes,
1370            edges,
1371            done: false,
1372            buffered: None,
1373            cursor: 0,
1374        }
1375    }
1376
1377    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1378        let mut out = row.clone();
1379        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1380        for spec in &self.nodes {
1381            match spec {
1382                CreateNodeSpec::New {
1383                    var,
1384                    labels,
1385                    properties,
1386                } => {
1387                    let mut node = Node::new();
1388                    for label in labels {
1389                        node.labels.push(label.clone());
1390                    }
1391                    // Property values are `Expr` (literal | parameter
1392                    // per the grammar). Evaluate against the current row
1393                    // + params and convert to a stored Property via the
1394                    // existing helper, which rejects Node/Edge values.
1395                    // Null-valued properties aren't stored — openCypher
1396                    // treats `{k: null}` as "no such property," so
1397                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
1398                    for (k, expr) in properties {
1399                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1400                        let prop = value_to_property(value)?;
1401                        if matches!(prop, Property::Null) {
1402                            continue;
1403                        }
1404                        node.properties.insert(k.clone(), prop);
1405                    }
1406                    ctx.writer.put_node(&node)?;
1407                    node_ids.push(node.id);
1408                    if let Some(v) = var {
1409                        out.insert(v.clone(), Value::Node(node));
1410                    }
1411                }
1412                CreateNodeSpec::Reference(name) => {
1413                    let id = match out.get(name) {
1414                        Some(Value::Node(n)) => n.id,
1415                        _ => return Err(Error::UnboundVariable(name.clone())),
1416                    };
1417                    node_ids.push(id);
1418                }
1419            }
1420        }
1421        for spec in &self.edges {
1422            let src = node_ids[spec.src_idx];
1423            let dst = node_ids[spec.dst_idx];
1424            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1425            for (k, expr) in &spec.properties {
1426                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1427                let prop = value_to_property(value)?;
1428                if matches!(prop, Property::Null) {
1429                    continue;
1430                }
1431                edge.properties.insert(k.clone(), prop);
1432            }
1433            ctx.writer.put_edge(&edge)?;
1434            if let Some(v) = &spec.var {
1435                out.insert(v.clone(), Value::Edge(edge));
1436            }
1437        }
1438        Ok(out)
1439    }
1440}
1441
1442impl Operator for CreatePathOp {
1443    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1444        if self.input.is_some() {
1445            // Eager evaluation: drain the entire input *and* apply
1446            // all writes on the first call, then stream the
1447            // already-applied rows. This is required by Cypher's
1448            // "writes are eager" semantics — subsequent operators
1449            // (especially fresh scans inside a CartesianProduct
1450            // for `WITH * MATCH ...`) must see the post-write
1451            // graph state. The previous lazy per-`next()` apply
1452            // interleaved writes with downstream scan caching,
1453            // so a `MATCH () CREATE () WITH * MATCH () CREATE ()`
1454            // pattern undercounted: the right-side scan rebuilt
1455            // for each left row only saw writes done before that
1456            // particular iteration.
1457            if let Some(buffered) = self.buffered.as_mut() {
1458                if self.cursor < buffered.len() {
1459                    let row = buffered[self.cursor].clone();
1460                    self.cursor += 1;
1461                    return Ok(Some(row));
1462                }
1463                return Ok(None);
1464            }
1465            let input_rows: Vec<Row> = {
1466                let input = self.input.as_mut().unwrap();
1467                let mut acc = Vec::new();
1468                while let Some(row) = input.next(ctx)? {
1469                    acc.push(row);
1470                }
1471                acc
1472            };
1473            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1474            for row in input_rows {
1475                applied.push(self.apply(ctx, &row)?);
1476            }
1477            self.buffered = Some(applied);
1478            self.cursor = 0;
1479            // Fall through to next call via recursion to yield
1480            // the first buffered row.
1481            self.next(ctx)
1482        } else {
1483            if self.done {
1484                return Ok(None);
1485            }
1486            self.done = true;
1487            let empty = Row::new();
1488            Ok(Some(self.apply(ctx, &empty)?))
1489        }
1490    }
1491}
1492
1493struct CartesianProductOp {
1494    left: Box<dyn Operator>,
1495    right_plan: LogicalPlan,
1496    left_row: Option<Row>,
1497    right_op: Option<Box<dyn Operator>>,
1498}
1499
1500impl CartesianProductOp {
1501    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1502        Self {
1503            left,
1504            right_plan,
1505            left_row: None,
1506            right_op: None,
1507        }
1508    }
1509}
1510
1511impl Operator for CartesianProductOp {
1512    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1513        loop {
1514            if self.left_row.is_none() {
1515                match self.left.next(ctx)? {
1516                    None => return Ok(None),
1517                    Some(row) => {
1518                        self.left_row = Some(row);
1519                        self.right_op = Some(build_op(&self.right_plan));
1520                    }
1521                }
1522            }
1523            let right_op = self.right_op.as_mut().expect("right_op set");
1524            // Expose the current left row to right-side operators
1525            // via a shadow context so constraint lookups (for
1526            // bound edges / bound edge lists) can find vars that
1527            // the right-side scan itself doesn't bind.
1528            let left_ref = self.left_row.as_ref().unwrap();
1529            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1530            stacked.push(left_ref);
1531            stacked.extend_from_slice(ctx.outer_rows);
1532            let inner_ctx = ExecCtx {
1533                store: ctx.store,
1534                writer: ctx.writer,
1535                params: ctx.params,
1536                procedures: ctx.procedures,
1537                outer_rows: &stacked,
1538                tombstones: ctx.tombstones,
1539            };
1540            match right_op.next(&inner_ctx)? {
1541                Some(right_row) => {
1542                    let mut combined = left_ref.clone();
1543                    for (k, v) in right_row {
1544                        combined.insert(k, v);
1545                    }
1546                    return Ok(Some(combined));
1547                }
1548                None => {
1549                    self.left_row = None;
1550                    self.right_op = None;
1551                }
1552            }
1553        }
1554    }
1555}
1556
1557struct DeleteOp {
1558    input: Box<dyn Operator>,
1559    detach: bool,
1560    #[allow(dead_code)]
1561    vars: Vec<String>,
1562    exprs: Vec<Expr>,
1563    /// Rows drained from `input` before any deletes have run.
1564    /// Populated on first call to `next`. openCypher treats DELETE
1565    /// as a batch mutation: the whole preceding row set is
1566    /// materialised, then the mutations happen, then rows are
1567    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
1568    /// a, b RETURN count(*)` deletes the first row's nodes before
1569    /// the pipelined scan reaches the second row.
1570    buffered: Option<Vec<Row>>,
1571    cursor: usize,
1572}
1573
1574impl DeleteOp {
1575    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1576        Self {
1577            input,
1578            detach,
1579            vars,
1580            exprs,
1581            buffered: None,
1582            cursor: 0,
1583        }
1584    }
1585
1586    /// Gather every edge and node id referenced by the row's
1587    /// DELETE expressions and run the delete in two phases —
1588    /// edges first, then nodes. The two-phase ordering is what
1589    /// lets `DELETE p1, p2` succeed when two paths share nodes
1590    /// connected by each other's edges: by the time the node
1591    /// phase runs, the in-batch edges are already gone. The
1592    /// non-detach check probes the graph then filters out
1593    /// any edge we just deleted, so the batch-level detachment
1594    /// is counted as "no longer attached" rather than failing.
1595    fn apply_deletes(
1596        &self,
1597        ctx: &ExecCtx,
1598        row: &Row,
1599        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1600        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1601    ) -> Result<()> {
1602        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1603        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1604        for expr in &self.exprs {
1605            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1606            match v {
1607                Value::Node(n) => node_ids.push(n.id),
1608                Value::Edge(e) => edge_ids.push(e.id),
1609                Value::Path { nodes, edges } => {
1610                    for e in edges {
1611                        edge_ids.push(e.id);
1612                    }
1613                    for n in nodes {
1614                        node_ids.push(n.id);
1615                    }
1616                }
1617                Value::Null | Value::Property(Property::Null) => continue,
1618                _ => return Err(Error::TypeMismatch),
1619            }
1620        }
1621        for eid in &edge_ids {
1622            if seen_edges.insert(*eid) {
1623                ctx.writer.delete_edge(*eid)?;
1624                ctx.tombstones.edges.borrow_mut().insert(*eid);
1625            }
1626        }
1627        for nid in &node_ids {
1628            if !seen_nodes.insert(*nid) {
1629                continue;
1630            }
1631            if self.detach {
1632                // DETACH DELETE also removes every attached edge;
1633                // tombstone them too so a later projection over a
1634                // formerly-attached edge clone also raises rather
1635                // than reading stale properties.
1636                for (eid, _) in ctx.store.outgoing(*nid)? {
1637                    ctx.tombstones.edges.borrow_mut().insert(eid);
1638                }
1639                for (eid, _) in ctx.store.incoming(*nid)? {
1640                    ctx.tombstones.edges.borrow_mut().insert(eid);
1641                }
1642                ctx.writer.detach_delete_node(*nid)?;
1643            } else {
1644                let out = ctx.store.outgoing(*nid)?;
1645                let inc = ctx.store.incoming(*nid)?;
1646                let still_attached = out
1647                    .iter()
1648                    .chain(inc.iter())
1649                    .any(|(eid, _)| !seen_edges.contains(eid));
1650                if still_attached {
1651                    return Err(Error::CannotDeleteAttachedNode);
1652                }
1653                ctx.writer.detach_delete_node(*nid)?;
1654            }
1655            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1656        }
1657        Ok(())
1658    }
1659}
1660
1661impl Operator for DeleteOp {
1662    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1663        // First call: drain the input, run all deletes, then
1664        // start forwarding rows. Buffering protects against the
1665        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1666        // DELETE r, a, b RETURN count(*)` must observe every
1667        // MATCH row before any node or edge disappears, otherwise
1668        // the undirected expansion hits an already-deleted source
1669        // on the second iteration.
1670        if self.buffered.is_none() {
1671            let mut rows: Vec<Row> = Vec::new();
1672            while let Some(row) = self.input.next(ctx)? {
1673                rows.push(row);
1674            }
1675            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1676            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1677            for row in &rows {
1678                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1679            }
1680            self.buffered = Some(rows);
1681            self.cursor = 0;
1682        }
1683        let rows = self.buffered.as_ref().unwrap();
1684        if self.cursor < rows.len() {
1685            let row = rows[self.cursor].clone();
1686            self.cursor += 1;
1687            return Ok(Some(row));
1688        }
1689        Ok(None)
1690    }
1691}
1692
1693struct SetPropertyOp {
1694    input: Box<dyn Operator>,
1695    assignments: Vec<SetAssignment>,
1696    /// Eager-evaluated output: drained input with all SET
1697    /// assignments already applied to the store. Populated on
1698    /// the first `next()` call so the writes happen before any
1699    /// downstream operator (e.g. a fresh MATCH inside a
1700    /// CartesianProduct after a `WITH *` boundary) starts
1701    /// scanning. Without this, lazy per-row apply interleaved
1702    /// the writes with downstream scan caching and
1703    /// undercounted side effects in patterns like
1704    /// `MATCH (n) SET n.x = 1 WITH * MATCH (m) ...`.
1705    buffered: Option<Vec<Row>>,
1706    cursor: usize,
1707}
1708
1709impl SetPropertyOp {
1710    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1711        Self {
1712            input,
1713            assignments,
1714            buffered: None,
1715            cursor: 0,
1716        }
1717    }
1718
1719    fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1720        // Phase 1: evaluate any RHSes against the original row bindings.
1721        enum Action {
1722            SetKey {
1723                var: String,
1724                key: String,
1725                prop: Property,
1726            },
1727            AddLabels {
1728                var: String,
1729                labels: Vec<String>,
1730            },
1731            Replace {
1732                var: String,
1733                props: Vec<(String, Property)>,
1734            },
1735            Merge {
1736                var: String,
1737                props: Vec<(String, Property)>,
1738            },
1739        }
1740        let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1741        for a in &self.assignments {
1742            match a {
1743                SetAssignment::Property { var, key, value } => {
1744                    let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1745                    let prop = value_to_property(evaluated)?;
1746                    actions.push(Action::SetKey {
1747                        var: var.clone(),
1748                        key: key.clone(),
1749                        prop,
1750                    });
1751                }
1752                SetAssignment::Labels { var, labels } => {
1753                    actions.push(Action::AddLabels {
1754                        var: var.clone(),
1755                        labels: labels.clone(),
1756                    });
1757                }
1758                SetAssignment::Replace { var, properties } => {
1759                    // Property values are now `Expr`. Evaluate
1760                    // each against the current row + params and
1761                    // convert via the shared helper, which
1762                    // surfaces InvalidSetValue for Node/Edge.
1763                    let props = properties
1764                        .iter()
1765                        .map(|(k, expr)| {
1766                            let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1767                            Ok((k.clone(), value_to_property(v)?))
1768                        })
1769                        .collect::<Result<Vec<(String, Property)>>>()?;
1770                    actions.push(Action::Replace {
1771                        var: var.clone(),
1772                        props,
1773                    });
1774                }
1775                SetAssignment::Merge { var, properties } => {
1776                    let props = properties
1777                        .iter()
1778                        .map(|(k, expr)| {
1779                            let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1780                            Ok((k.clone(), value_to_property(v)?))
1781                        })
1782                        .collect::<Result<Vec<(String, Property)>>>()?;
1783                    actions.push(Action::Merge {
1784                        var: var.clone(),
1785                        props,
1786                    });
1787                }
1788                SetAssignment::ReplaceFromExpr {
1789                    var,
1790                    source,
1791                    replace,
1792                } => {
1793                    let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1794                    let props = extract_property_map(&v)?;
1795                    if *replace {
1796                        actions.push(Action::Replace {
1797                            var: var.clone(),
1798                            props,
1799                        });
1800                    } else {
1801                        actions.push(Action::Merge {
1802                            var: var.clone(),
1803                            props,
1804                        });
1805                    }
1806                }
1807            }
1808        }
1809
1810        // Phase 2: apply updates in-place to the row bindings.
1811        let mut updated_nodes: HashSet<String> = HashSet::new();
1812        let mut updated_edges: HashSet<String> = HashSet::new();
1813        for action in actions {
1814            match action {
1815                Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1816                    // openCypher: SET on a null target (from
1817                    // OPTIONAL MATCH that didn't bind) is a
1818                    // silent no-op rather than an error.
1819                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1820                    // openCypher: SET a.key = null removes the
1821                    // property rather than storing a null value.
1822                    Some(Value::Node(n)) => {
1823                        if matches!(prop, Property::Null) {
1824                            n.properties.remove(&key);
1825                        } else {
1826                            n.properties.insert(key, prop);
1827                        }
1828                        updated_nodes.insert(var);
1829                    }
1830                    Some(Value::Edge(e)) => {
1831                        if matches!(prop, Property::Null) {
1832                            e.properties.remove(&key);
1833                        } else {
1834                            e.properties.insert(key, prop);
1835                        }
1836                        updated_edges.insert(var);
1837                    }
1838                    _ => return Err(Error::UnboundVariable(var)),
1839                },
1840                Action::AddLabels { var, labels } => match row.get_mut(&var) {
1841                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1842                    Some(Value::Node(n)) => {
1843                        for label in labels {
1844                            if !n.labels.contains(&label) {
1845                                n.labels.push(label);
1846                            }
1847                        }
1848                        updated_nodes.insert(var);
1849                    }
1850                    _ => return Err(Error::UnboundVariable(var)),
1851                },
1852                Action::Replace { var, props } => match row.get_mut(&var) {
1853                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1854                    Some(Value::Node(n)) => {
1855                        n.properties.clear();
1856                        for (k, v) in props {
1857                            if !matches!(v, Property::Null) {
1858                                n.properties.insert(k, v);
1859                            }
1860                        }
1861                        updated_nodes.insert(var);
1862                    }
1863                    Some(Value::Edge(e)) => {
1864                        e.properties.clear();
1865                        for (k, v) in props {
1866                            if !matches!(v, Property::Null) {
1867                                e.properties.insert(k, v);
1868                            }
1869                        }
1870                        updated_edges.insert(var);
1871                    }
1872                    _ => return Err(Error::UnboundVariable(var)),
1873                },
1874                Action::Merge { var, props } => match row.get_mut(&var) {
1875                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1876                    Some(Value::Node(n)) => {
1877                        for (k, v) in props {
1878                            if matches!(v, Property::Null) {
1879                                n.properties.remove(&k);
1880                            } else {
1881                                n.properties.insert(k, v);
1882                            }
1883                        }
1884                        updated_nodes.insert(var);
1885                    }
1886                    Some(Value::Edge(e)) => {
1887                        for (k, v) in props {
1888                            if matches!(v, Property::Null) {
1889                                e.properties.remove(&k);
1890                            } else {
1891                                e.properties.insert(k, v);
1892                            }
1893                        }
1894                        updated_edges.insert(var);
1895                    }
1896                    _ => return Err(Error::UnboundVariable(var)),
1897                },
1898            }
1899        }
1900
1901        // Phase 3: flush each mutated entity once to the writer.
1902        for var in &updated_nodes {
1903            if let Some(Value::Node(n)) = row.get(var) {
1904                ctx.writer.put_node(n)?;
1905            }
1906        }
1907        for var in &updated_edges {
1908            if let Some(Value::Edge(e)) = row.get(var) {
1909                ctx.writer.put_edge(e)?;
1910            }
1911        }
1912
1913        Ok(row)
1914    }
1915}
1916
1917impl Operator for SetPropertyOp {
1918    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1919        // Eager: drain input + apply all SETs upfront, then
1920        // stream the mutated rows. See `buffered` doc comment
1921        // for why lazy per-row apply broke `WITH *`-then-MATCH.
1922        if self.buffered.is_none() {
1923            let mut input_rows: Vec<Row> = Vec::new();
1924            while let Some(row) = self.input.next(ctx)? {
1925                input_rows.push(row);
1926            }
1927            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1928            for row in input_rows {
1929                applied.push(self.apply_one(ctx, row)?);
1930            }
1931            self.buffered = Some(applied);
1932            self.cursor = 0;
1933        }
1934        let rows = self.buffered.as_ref().unwrap();
1935        if self.cursor < rows.len() {
1936            let row = rows[self.cursor].clone();
1937            self.cursor += 1;
1938            return Ok(Some(row));
1939        }
1940        Ok(None)
1941    }
1942}
1943
1944struct RemoveOp {
1945    input: Box<dyn Operator>,
1946    items: Vec<RemoveSpec>,
1947    /// Eager-evaluated output, same rationale as
1948    /// [`SetPropertyOp::buffered`]: drain input and apply all
1949    /// removes to the store before any downstream operator
1950    /// starts. Lazy per-row apply would interleave writes with
1951    /// downstream scan caching.
1952    buffered: Option<Vec<Row>>,
1953    cursor: usize,
1954}
1955
1956impl RemoveOp {
1957    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1958        Self {
1959            input,
1960            items,
1961            buffered: None,
1962            cursor: 0,
1963        }
1964    }
1965
1966    fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1967        let mut updated_nodes: HashSet<String> = HashSet::new();
1968        let mut updated_edges: HashSet<String> = HashSet::new();
1969        for item in &self.items {
1970            match item {
1971                RemoveSpec::Property { var, key } => match row.get_mut(var) {
1972                    // Null target (from OPTIONAL MATCH) is a
1973                    // no-op, matching Neo4j's REMOVE semantics.
1974                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1975                    Some(Value::Node(n)) => {
1976                        n.properties.remove(key);
1977                        updated_nodes.insert(var.clone());
1978                    }
1979                    Some(Value::Edge(e)) => {
1980                        e.properties.remove(key);
1981                        updated_edges.insert(var.clone());
1982                    }
1983                    _ => return Err(Error::UnboundVariable(var.clone())),
1984                },
1985                RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1986                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1987                    Some(Value::Node(n)) => {
1988                        n.labels.retain(|l| !labels.contains(l));
1989                        updated_nodes.insert(var.clone());
1990                    }
1991                    _ => return Err(Error::UnboundVariable(var.clone())),
1992                },
1993            }
1994        }
1995        for var in &updated_nodes {
1996            if let Some(Value::Node(n)) = row.get(var) {
1997                ctx.writer.put_node(n)?;
1998            }
1999        }
2000        for var in &updated_edges {
2001            if let Some(Value::Edge(e)) = row.get(var) {
2002                ctx.writer.put_edge(e)?;
2003            }
2004        }
2005        Ok(row)
2006    }
2007}
2008
2009impl Operator for RemoveOp {
2010    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2011        if self.buffered.is_none() {
2012            let mut input_rows: Vec<Row> = Vec::new();
2013            while let Some(row) = self.input.next(ctx)? {
2014                input_rows.push(row);
2015            }
2016            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
2017            for row in input_rows {
2018                applied.push(self.apply_one(ctx, row)?);
2019            }
2020            self.buffered = Some(applied);
2021            self.cursor = 0;
2022        }
2023        let rows = self.buffered.as_ref().unwrap();
2024        if self.cursor < rows.len() {
2025            let row = rows[self.cursor].clone();
2026            self.cursor += 1;
2027            return Ok(Some(row));
2028        }
2029        Ok(None)
2030    }
2031}
2032
2033struct LoadCsvOp {
2034    input: Option<Box<dyn Operator>>,
2035    path_expr: Expr,
2036    var: String,
2037    with_headers: bool,
2038    rows: Option<Vec<Value>>,
2039    cursor: usize,
2040}
2041
2042impl LoadCsvOp {
2043    fn new(
2044        input: Option<Box<dyn Operator>>,
2045        path_expr: Expr,
2046        var: String,
2047        with_headers: bool,
2048    ) -> Self {
2049        Self {
2050            input,
2051            path_expr,
2052            var,
2053            with_headers,
2054            rows: None,
2055            cursor: 0,
2056        }
2057    }
2058
2059    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
2060        let ectx = ctx.eval_ctx(base_row);
2061        let path_val = eval_expr(&self.path_expr, &ectx)?;
2062        let path = match path_val {
2063            Value::Property(Property::String(s)) => s,
2064            _ => return Err(Error::TypeMismatch),
2065        };
2066        let content = std::fs::read_to_string(&path).map_err(|e| {
2067            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
2068        })?;
2069        let mut lines = content.lines();
2070        let headers: Option<Vec<String>> = if self.with_headers {
2071            lines
2072                .next()
2073                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
2074        } else {
2075            None
2076        };
2077        let mut csv_rows = Vec::new();
2078        for line in lines {
2079            if line.trim().is_empty() {
2080                continue;
2081            }
2082            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
2083            if let Some(hdrs) = &headers {
2084                let mut map = std::collections::HashMap::new();
2085                for (i, h) in hdrs.iter().enumerate() {
2086                    let val = fields.get(i).cloned().unwrap_or_default();
2087                    map.insert(h.clone(), Property::String(val));
2088                }
2089                csv_rows.push(Value::Property(Property::Map(map)));
2090            } else {
2091                let list: Vec<Value> = fields
2092                    .into_iter()
2093                    .map(|f| Value::Property(Property::String(f)))
2094                    .collect();
2095                csv_rows.push(Value::List(list));
2096            }
2097        }
2098        self.rows = Some(csv_rows);
2099        self.cursor = 0;
2100        Ok(())
2101    }
2102}
2103
2104impl Operator for LoadCsvOp {
2105    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2106        if self.rows.is_none() {
2107            let base = if let Some(input) = &mut self.input {
2108                match input.next(ctx)? {
2109                    Some(r) => r,
2110                    None => return Ok(None),
2111                }
2112            } else {
2113                Row::new()
2114            };
2115            self.load(ctx, &base)?;
2116        }
2117        let rows = self.rows.as_ref().unwrap();
2118        if self.cursor < rows.len() {
2119            let val = rows[self.cursor].clone();
2120            self.cursor += 1;
2121            let mut row = Row::new();
2122            row.insert(self.var.clone(), val);
2123            Ok(Some(row))
2124        } else {
2125            Ok(None)
2126        }
2127    }
2128}
2129
2130struct ForeachOp {
2131    input: Box<dyn Operator>,
2132    var: String,
2133    list_expr: Expr,
2134    set_assignments: Vec<SetAssignment>,
2135    remove_items: Vec<RemoveSpec>,
2136}
2137
2138impl ForeachOp {
2139    fn new(
2140        input: Box<dyn Operator>,
2141        var: String,
2142        list_expr: Expr,
2143        set_assignments: Vec<SetAssignment>,
2144        remove_items: Vec<RemoveSpec>,
2145    ) -> Self {
2146        Self {
2147            input,
2148            var,
2149            list_expr,
2150            set_assignments,
2151            remove_items,
2152        }
2153    }
2154}
2155
2156impl Operator for ForeachOp {
2157    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2158        let Some(row) = self.input.next(ctx)? else {
2159            return Ok(None);
2160        };
2161        let ectx = ctx.eval_ctx(&row);
2162        let list_val = eval_expr(&self.list_expr, &ectx)?;
2163        let items = match list_val {
2164            Value::List(items) => items,
2165            Value::Property(Property::List(props)) => {
2166                props.into_iter().map(Value::Property).collect()
2167            }
2168            Value::Null | Value::Property(Property::Null) => Vec::new(),
2169            _ => return Err(Error::TypeMismatch),
2170        };
2171        for item in items {
2172            let mut scratch = row.clone();
2173            scratch.insert(self.var.clone(), item);
2174            for a in &self.set_assignments {
2175                match a {
2176                    SetAssignment::Property { var, key, value } => {
2177                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
2178                        let prop = value_to_property(evaluated)?;
2179                        match scratch.get_mut(var) {
2180                            Some(Value::Node(n)) => {
2181                                n.properties.insert(key.clone(), prop);
2182                            }
2183                            Some(Value::Edge(e)) => {
2184                                e.properties.insert(key.clone(), prop);
2185                            }
2186                            _ => return Err(Error::UnboundVariable(var.clone())),
2187                        }
2188                    }
2189                    SetAssignment::Labels { var, labels } => {
2190                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2191                            for l in labels {
2192                                if !n.labels.contains(l) {
2193                                    n.labels.push(l.clone());
2194                                }
2195                            }
2196                        }
2197                    }
2198                    _ => {}
2199                }
2200            }
2201            for ri in &self.remove_items {
2202                match ri {
2203                    RemoveSpec::Property { var, key } => {
2204                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2205                            n.properties.remove(key);
2206                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
2207                            e.properties.remove(key);
2208                        }
2209                    }
2210                    RemoveSpec::Labels { var, labels } => {
2211                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2212                            n.labels.retain(|l| !labels.contains(l));
2213                        }
2214                    }
2215                }
2216            }
2217            // Flush mutated entities for each iteration
2218            for (_, val) in scratch.iter() {
2219                match val {
2220                    Value::Node(n) => ctx.writer.put_node(n)?,
2221                    Value::Edge(e) => ctx.writer.put_edge(e)?,
2222                    _ => {}
2223                }
2224            }
2225        }
2226        Ok(Some(row))
2227    }
2228}
2229
2230struct SeedRowOp {
2231    done: bool,
2232}
2233
2234impl Operator for SeedRowOp {
2235    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2236        if self.done {
2237            return Ok(None);
2238        }
2239        self.done = true;
2240        Ok(Some(Row::new()))
2241    }
2242}
2243
2244struct SeededRowOp {
2245    row: Option<Row>,
2246}
2247
2248impl Operator for SeededRowOp {
2249    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2250        Ok(self.row.take())
2251    }
2252}
2253
2254struct CallSubqueryOp {
2255    input: Box<dyn Operator>,
2256    body_plan: LogicalPlan,
2257    pending: Vec<Row>,
2258    pending_idx: usize,
2259}
2260
2261impl CallSubqueryOp {
2262    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
2263        Self {
2264            input,
2265            body_plan,
2266            pending: Vec::new(),
2267            pending_idx: 0,
2268        }
2269    }
2270}
2271
2272impl Operator for CallSubqueryOp {
2273    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2274        loop {
2275            if self.pending_idx < self.pending.len() {
2276                let row = self.pending[self.pending_idx].clone();
2277                self.pending_idx += 1;
2278                return Ok(Some(row));
2279            }
2280            let outer_row = match self.input.next(ctx)? {
2281                Some(r) => r,
2282                None => return Ok(None),
2283            };
2284            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2285            let mut results = Vec::new();
2286            while let Some(body_row) = body_op.next(ctx)? {
2287                let mut merged = outer_row.clone();
2288                for (k, v) in body_row {
2289                    merged.insert(k, v);
2290                }
2291                results.push(merged);
2292            }
2293            if results.is_empty() {
2294                continue;
2295            }
2296            self.pending = results;
2297            self.pending_idx = 0;
2298        }
2299    }
2300}
2301
2302/// Per-input-row left-join driver (see
2303/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
2304/// outer row with the row as its seed; forwards all emitted rows
2305/// merged with the outer bindings, and emits one null-fallback
2306/// row (outer bindings plus `null_vars` bound to Null) only when
2307/// the body produced zero rows for that input.
2308struct OptionalApplyOp {
2309    input: Box<dyn Operator>,
2310    body_plan: LogicalPlan,
2311    null_vars: Vec<String>,
2312    pending: Vec<Row>,
2313    pending_idx: usize,
2314}
2315
2316impl OptionalApplyOp {
2317    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2318        Self {
2319            input,
2320            body_plan,
2321            null_vars,
2322            pending: Vec::new(),
2323            pending_idx: 0,
2324        }
2325    }
2326}
2327
2328impl Operator for OptionalApplyOp {
2329    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2330        loop {
2331            if self.pending_idx < self.pending.len() {
2332                let row = self.pending[self.pending_idx].clone();
2333                self.pending_idx += 1;
2334                return Ok(Some(row));
2335            }
2336            let outer_row = match self.input.next(ctx)? {
2337                Some(r) => r,
2338                None => return Ok(None),
2339            };
2340            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2341            // Push the outer row onto `outer_rows` so Filter /
2342            // eval_expr inside the body can resolve outer
2343            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
2344            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2345            stacked.push(&outer_row);
2346            stacked.extend_from_slice(ctx.outer_rows);
2347            let inner_ctx = ExecCtx {
2348                store: ctx.store,
2349                writer: ctx.writer,
2350                params: ctx.params,
2351                procedures: ctx.procedures,
2352                outer_rows: &stacked,
2353                tombstones: ctx.tombstones,
2354            };
2355            let mut results = Vec::new();
2356            while let Some(body_row) = body_op.next(&inner_ctx)? {
2357                let mut merged = outer_row.clone();
2358                for (k, v) in body_row {
2359                    merged.insert(k, v);
2360                }
2361                results.push(merged);
2362            }
2363            if results.is_empty() {
2364                let mut fallback = outer_row;
2365                for v in &self.null_vars {
2366                    fallback.insert(v.clone(), Value::Null);
2367                }
2368                return Ok(Some(fallback));
2369            }
2370            self.pending = results;
2371            self.pending_idx = 0;
2372        }
2373    }
2374}
2375
2376/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
2377///
2378/// Resolves the procedure against the [`ProcedureRegistry`] at
2379/// `next()` time, validates arity / types / form (implicit args,
2380/// `YIELD *`, in-query YIELD requirement), then scans the registered
2381/// data-table for rows whose input cells match the call arguments.
2382/// Each matching row is projected according to the YIELD spec and
2383/// merged into the upstream row.
2384///
2385/// Two shapes are supported at once:
2386/// * Standalone (`input = None`): the op is the full plan. It
2387///   runs the procedure exactly once (against a synthetic empty row)
2388///   and emits the matching rows directly.
2389/// * In-query (`input = Some(...)`): for each upstream row, runs
2390///   the procedure with args evaluated against that row and emits
2391///   one merged row per match. Procedures with zero declared output
2392///   columns act as a pass-through — the upstream row is forwarded
2393///   unchanged, matching Neo4j's side-effect-only semantics.
2394struct ProcedureCallOp {
2395    input: Option<Box<dyn Operator>>,
2396    qualified_name: Vec<String>,
2397    args: Option<Vec<Expr>>,
2398    yield_spec: Option<YieldSpec>,
2399    standalone: bool,
2400    /// Row source for the currently-active input row. Transitions
2401    /// `None → (Buffered|Streaming) → None` per input row. Streaming
2402    /// cursors can be pulled from lazily — successive `next()` calls
2403    /// resume the cursor without rebuilding per-input state.
2404    active: ProcActiveSource,
2405    // Only set for the standalone form, which drives itself off a
2406    // synthetic seed row exactly once.
2407    done: bool,
2408}
2409
2410/// Per-input-row row source. See [`ProcedureCallOp::active`].
2411enum ProcActiveSource {
2412    None,
2413    Buffered {
2414        rows: Vec<Row>,
2415        idx: usize,
2416    },
2417    Streaming {
2418        cursor: Box<dyn crate::procedures::ProcCursor>,
2419        input_row: Row,
2420        projection: Vec<(String, String)>,
2421    },
2422}
2423
2424impl ProcedureCallOp {
2425    fn new(
2426        input: Option<Box<dyn Operator>>,
2427        qualified_name: Vec<String>,
2428        args: Option<Vec<Expr>>,
2429        yield_spec: Option<YieldSpec>,
2430        standalone: bool,
2431    ) -> Self {
2432        Self {
2433            input,
2434            qualified_name,
2435            args,
2436            yield_spec,
2437            standalone,
2438            active: ProcActiveSource::None,
2439            done: false,
2440        }
2441    }
2442
2443    /// Resolve the projection list `(source_column, output_alias)`
2444    /// from the procedure signature and this op's yield spec. Also
2445    /// validates the spec — unknown YIELD columns, duplicate
2446    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
2447    /// surface as `Error::Procedure`.
2448    fn resolve_projection(
2449        &self,
2450        proc: &crate::procedures::Procedure,
2451    ) -> Result<Vec<(String, String)>> {
2452        match &self.yield_spec {
2453            None => {
2454                if !self.standalone {
2455                    // In-query CALL with no YIELD: legal only for
2456                    // side-effect-only procedures (zero declared
2457                    // outputs). A procedure with outputs but no
2458                    // YIELD leaves those outputs unbound, so any
2459                    // downstream RETURN that references them would
2460                    // see `UndefinedVariable`; reject here so the
2461                    // error surfaces instead of silently emitting
2462                    // nulls.
2463                    if proc.outputs.is_empty() {
2464                        return Ok(Vec::new());
2465                    }
2466                    return Err(Error::Procedure(format!(
2467                        "procedure '{}' has outputs but no YIELD clause",
2468                        self.qualified_name.join(".")
2469                    )));
2470                }
2471                Ok(proc
2472                    .outputs
2473                    .iter()
2474                    .map(|o| (o.name.clone(), o.name.clone()))
2475                    .collect())
2476            }
2477            Some(YieldSpec::Star) => {
2478                if !self.standalone {
2479                    return Err(Error::Procedure(
2480                        "YIELD * is only allowed on standalone CALL".into(),
2481                    ));
2482                }
2483                Ok(proc
2484                    .outputs
2485                    .iter()
2486                    .map(|o| (o.name.clone(), o.name.clone()))
2487                    .collect())
2488            }
2489            Some(YieldSpec::Items(items)) => {
2490                let mut projection = Vec::with_capacity(items.len());
2491                let mut seen_aliases: std::collections::HashSet<String> =
2492                    std::collections::HashSet::new();
2493                for yi in items {
2494                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
2495                        return Err(Error::Procedure(format!(
2496                            "procedure '{}' has no output column '{}'",
2497                            self.qualified_name.join("."),
2498                            yi.column
2499                        )));
2500                    }
2501                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2502                    if !seen_aliases.insert(alias.clone()) {
2503                        return Err(Error::Procedure(format!(
2504                            "variable '{alias}' already bound by YIELD"
2505                        )));
2506                    }
2507                    projection.push((yi.column.clone(), alias));
2508                }
2509                Ok(projection)
2510            }
2511        }
2512    }
2513
2514    /// Evaluate the call's argument list against `row`. For the
2515    /// implicit-args form (`args = None`), each declared input
2516    /// column's value comes from the per-query parameter map
2517    /// (keyed by the input-column name). Returns the argument
2518    /// values in declaration order. Raises `ProcedureError` on
2519    /// arity mismatch, type mismatch, or missing parameter.
2520    fn evaluate_args(
2521        &self,
2522        ctx: &ExecCtx,
2523        row: &Row,
2524        proc: &crate::procedures::Procedure,
2525    ) -> Result<Vec<Value>> {
2526        match &self.args {
2527            Some(exprs) => {
2528                if exprs.len() != proc.inputs.len() {
2529                    return Err(Error::Procedure(format!(
2530                        "procedure '{}' expects {} argument(s), got {}",
2531                        self.qualified_name.join("."),
2532                        proc.inputs.len(),
2533                        exprs.len()
2534                    )));
2535                }
2536                let eval_ctx = ctx.eval_ctx(row);
2537                let mut values = Vec::with_capacity(exprs.len());
2538                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2539                    let v = eval_expr(expr, &eval_ctx)?;
2540                    if !spec.ty.accepts(&v) {
2541                        return Err(Error::Procedure(format!(
2542                            "argument '{}' has wrong type for procedure '{}'",
2543                            spec.name,
2544                            self.qualified_name.join(".")
2545                        )));
2546                    }
2547                    values.push(coerce_arg(v, spec.ty));
2548                }
2549                Ok(values)
2550            }
2551            None => {
2552                // Implicit-arg form only valid standalone.
2553                if !self.standalone {
2554                    return Err(Error::Procedure(
2555                        "in-query CALL requires explicit argument list".into(),
2556                    ));
2557                }
2558                let mut values = Vec::with_capacity(proc.inputs.len());
2559                for spec in &proc.inputs {
2560                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2561                        Error::Procedure(format!(
2562                            "missing parameter ${} for procedure '{}'",
2563                            spec.name,
2564                            self.qualified_name.join(".")
2565                        ))
2566                    })?;
2567                    if !spec.ty.accepts(&v) {
2568                        return Err(Error::Procedure(format!(
2569                            "parameter '{}' has wrong type",
2570                            spec.name
2571                        )));
2572                    }
2573                    values.push(coerce_arg(v, spec.ty));
2574                }
2575                Ok(values)
2576            }
2577        }
2578    }
2579
2580    /// Merge one procedure-produced row with the input row (or an
2581    /// empty row for standalone calls) and apply the YIELD-driven
2582    /// projection. Shared by the eager and streaming paths.
2583    fn merge_proc_row(
2584        &self,
2585        proc_row: &crate::procedures::ProcRow,
2586        input_row: &Row,
2587        projection: &[(String, String)],
2588    ) -> Row {
2589        let mut merged = if self.standalone {
2590            Row::new()
2591        } else {
2592            input_row.clone()
2593        };
2594        for (src, alias) in projection {
2595            let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2596            merged.insert(alias.clone(), v);
2597        }
2598        merged
2599    }
2600
2601    /// Invoke the procedure once for `input_row` and install the
2602    /// resulting row source into `self.active`. Handles the
2603    /// "zero-output-column pass-through" case that keeps
2604    /// `MATCH (n) CALL test.doNothing() RETURN n.name` from
2605    /// filtering the match rows. The produced source may be
2606    /// `Buffered` (eager: TCK data tables, read built-ins with
2607    /// bounded output, all write procedures) or `Streaming`
2608    /// (path-style built-ins with potentially-unbounded output).
2609    fn invoke_once(
2610        &mut self,
2611        ctx: &ExecCtx,
2612        input_row: Row,
2613        proc: &crate::procedures::Procedure,
2614        projection: Vec<(String, String)>,
2615    ) -> Result<()> {
2616        // Zero-output-column procedures are side-effect-only in
2617        // the TCK; they either suppress rows entirely (standalone)
2618        // or pass the input row through unchanged (in-query).
2619        if proc.outputs.is_empty() {
2620            if self.standalone {
2621                self.active = ProcActiveSource::None;
2622            } else {
2623                self.active = ProcActiveSource::Buffered {
2624                    rows: vec![input_row],
2625                    idx: 0,
2626                };
2627            }
2628            return Ok(());
2629        }
2630        let args = self.evaluate_args(ctx, &input_row, proc)?;
2631        let is_write = proc.is_write_builtin();
2632        // Write builtins (apoc.create.node, …) take the writer +
2633        // args directly and produce already-final rows, so they
2634        // skip the row_matches filter. Read built-ins similarly
2635        // generate their rows from args, not from a candidate set
2636        // — row_matches only applies to static TCK data tables
2637        // (where `builtin` is None).
2638        if is_write {
2639            // The cfg gate must include every feature that owns a
2640            // write builtin — `is_write_builtin` returns true only
2641            // when one of those features is on, so widen the gate
2642            // when a new write namespace lands.
2643            #[cfg(any(
2644                feature = "apoc-create",
2645                feature = "apoc-refactor",
2646                feature = "apoc-cypher"
2647            ))]
2648            {
2649                let rows = proc.resolve_write_rows(ctx.store, ctx.writer, &args, ctx.procedures)?;
2650                let merged: Vec<Row> = rows
2651                    .iter()
2652                    .map(|pr| self.merge_proc_row(pr, &input_row, &projection))
2653                    .collect();
2654                self.active = ProcActiveSource::Buffered {
2655                    rows: merged,
2656                    idx: 0,
2657                };
2658                return Ok(());
2659            }
2660            #[cfg(not(any(
2661                feature = "apoc-create",
2662                feature = "apoc-refactor",
2663                feature = "apoc-cypher"
2664            )))]
2665            {
2666                let _ = (ctx, &args);
2667                return Err(Error::Procedure(
2668                    "write procedure dispatched in a non-write-apoc build".into(),
2669                ));
2670            }
2671        }
2672        match proc.resolve_rows(ctx.store, &args, ctx.procedures)? {
2673            crate::procedures::ProcRows::Eager(rows) => {
2674                // Static TCK data tables (`builtin.is_none()`)
2675                // filter by input-column matching. Built-ins
2676                // always skip — their rows are produced directly
2677                // from args, not looked up from a candidate set.
2678                let is_static = proc.builtin.is_none();
2679                let merged: Vec<Row> = rows
2680                    .iter()
2681                    .filter(|pr| !is_static || proc.row_matches(pr, &args))
2682                    .map(|pr| self.merge_proc_row(pr, &input_row, &projection))
2683                    .collect();
2684                self.active = ProcActiveSource::Buffered {
2685                    rows: merged,
2686                    idx: 0,
2687                };
2688            }
2689            crate::procedures::ProcRows::Streaming(cursor) => {
2690                self.active = ProcActiveSource::Streaming {
2691                    cursor,
2692                    input_row,
2693                    projection,
2694                };
2695            }
2696        }
2697        Ok(())
2698    }
2699}
2700
2701/// Cast an int to a float when the declared type is `FLOAT`. Other
2702/// declared types leave the value as-is (accept() has already
2703/// gated on kind) so the comparison in `row_matches` sees a
2704/// consistent shape.
2705fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2706    use crate::procedures::ProcType;
2707    if matches!(ty, ProcType::Float) {
2708        if let Value::Property(Property::Int64(n)) = v {
2709            return Value::Property(Property::Float64(n as f64));
2710        }
2711    }
2712    v
2713}
2714
2715impl Operator for ProcedureCallOp {
2716    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2717        loop {
2718            // 1. Drain an active row source first. Streaming cursors
2719            //    get a fresh reader reference on each pull so an
2720            //    overlay reader swap mid-query is picked up.
2721            match &mut self.active {
2722                ProcActiveSource::Buffered { rows, idx } => {
2723                    if *idx < rows.len() {
2724                        let row = rows[*idx].clone();
2725                        *idx += 1;
2726                        return Ok(Some(row));
2727                    }
2728                    self.active = ProcActiveSource::None;
2729                }
2730                ProcActiveSource::Streaming {
2731                    cursor,
2732                    input_row,
2733                    projection,
2734                } => match cursor.advance(ctx.store)? {
2735                    Some(proc_row) => {
2736                        // Inline the merge here — we're holding a
2737                        // `&mut self.active` borrow through this arm
2738                        // so we can't call `&self` helper methods.
2739                        let mut merged = if self.standalone {
2740                            Row::new()
2741                        } else {
2742                            input_row.clone()
2743                        };
2744                        for (src, alias) in projection.iter() {
2745                            let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2746                            merged.insert(alias.clone(), v);
2747                        }
2748                        return Ok(Some(merged));
2749                    }
2750                    None => {
2751                        self.active = ProcActiveSource::None;
2752                    }
2753                },
2754                ProcActiveSource::None => {}
2755            }
2756
2757            // 2. Active source exhausted — pull the next input row
2758            //    and build a fresh source. The procedure reference
2759            //    lives in `ctx.procedures` (immutable for the query)
2760            //    so we can borrow across the input pull + invoke
2761            //    without cloning.
2762            let proc = match ctx.procedures.get(&self.qualified_name) {
2763                Some(p) => p,
2764                None => {
2765                    return Err(Error::Procedure(format!(
2766                        "procedure '{}' not found",
2767                        self.qualified_name.join(".")
2768                    )));
2769                }
2770            };
2771            let projection = self.resolve_projection(proc)?;
2772            let input_row = match &mut self.input {
2773                Some(inp) => match inp.next(ctx)? {
2774                    Some(r) => r,
2775                    None => return Ok(None),
2776                },
2777                None => {
2778                    if self.done {
2779                        return Ok(None);
2780                    }
2781                    self.done = true;
2782                    Row::new()
2783                }
2784            };
2785            self.invoke_once(ctx, input_row, proc, projection)?;
2786            // Loop re-enters step 1 to drain the freshly-installed
2787            // source. If invoke_once installed an empty Buffered
2788            // source, we'll loop around and pull the next input row
2789            // (for input-driven calls) or return None (standalone).
2790        }
2791    }
2792}
2793
2794/// Pull a property map out of a value. Supports node / edge
2795/// (uses their live property map) and map values (parameter or
2796/// map literal). Null propagates as an empty map — `SET x = null`
2797/// is spec'd as a property clear / no-op and SET = <null-binding>
2798/// (from an unmatched OPTIONAL MATCH) matches that shape.
2799fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2800    match v {
2801        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2802        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2803        Value::Map(pairs) => pairs
2804            .iter()
2805            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2806            .collect(),
2807        Value::Property(Property::Map(entries)) => Ok(entries
2808            .iter()
2809            .map(|(k, p)| (k.clone(), p.clone()))
2810            .collect()),
2811        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2812        _ => Err(Error::InvalidSetValue),
2813    }
2814}
2815
2816fn value_to_property(v: Value) -> Result<Property> {
2817    match v {
2818        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2819        Value::Property(p) => Ok(p),
2820        Value::Null => Ok(Property::Null),
2821        Value::List(items) => {
2822            let props: Vec<Property> = items
2823                .into_iter()
2824                .map(value_to_property)
2825                .collect::<Result<_>>()?;
2826            Ok(Property::List(props))
2827        }
2828        // Graph-aware `Value::Map` and graph elements can't be
2829        // stored as node / edge property values; SET will reject
2830        // them.
2831        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2832            Err(Error::InvalidSetValue)
2833        }
2834    }
2835}
2836
2837struct NodeScanAllOp {
2838    var: String,
2839    ids: Option<Vec<NodeId>>,
2840    cursor: usize,
2841}
2842
2843impl NodeScanAllOp {
2844    fn new(var: String) -> Self {
2845        Self {
2846            var,
2847            ids: None,
2848            cursor: 0,
2849        }
2850    }
2851}
2852
2853impl Operator for NodeScanAllOp {
2854    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2855        if self.ids.is_none() {
2856            self.ids = Some(ctx.store.all_node_ids()?);
2857        }
2858        let ids = self.ids.as_ref().unwrap();
2859        while self.cursor < ids.len() {
2860            let id = ids[self.cursor];
2861            self.cursor += 1;
2862            if let Some(node) = ctx.store.get_node(id)? {
2863                let mut row = Row::new();
2864                row.insert(self.var.clone(), Value::Node(node));
2865                return Ok(Some(row));
2866            }
2867        }
2868        Ok(None)
2869    }
2870}
2871
2872struct NodeScanByLabelsOp {
2873    var: String,
2874    labels: Vec<String>,
2875    ids: Option<Vec<NodeId>>,
2876    cursor: usize,
2877}
2878
2879impl NodeScanByLabelsOp {
2880    fn new(var: String, labels: Vec<String>) -> Self {
2881        Self {
2882            var,
2883            labels,
2884            ids: None,
2885            cursor: 0,
2886        }
2887    }
2888}
2889
2890impl Operator for NodeScanByLabelsOp {
2891    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2892        if self.ids.is_none() {
2893            // Use the first label for the index scan, filter the rest per-node.
2894            let primary = self
2895                .labels
2896                .first()
2897                .expect("NodeScanByLabels must have at least one label");
2898            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2899        }
2900        let ids = self.ids.as_ref().unwrap();
2901        while self.cursor < ids.len() {
2902            let id = ids[self.cursor];
2903            self.cursor += 1;
2904            if let Some(node) = ctx.store.get_node(id)? {
2905                if has_all_labels(&node, &self.labels) {
2906                    let mut row = Row::new();
2907                    row.insert(self.var.clone(), Value::Node(node));
2908                    return Ok(Some(row));
2909                }
2910            }
2911        }
2912        Ok(None)
2913    }
2914}
2915
2916fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2917    labels.iter().all(|l| node.labels.contains(l))
2918}
2919
2920/// Equality-lookup operator backed by a property index. Evaluates
2921/// the value expression lazily on the first `next()` — crucially
2922/// so parameters resolve against the per-query `ExecCtx::params`
2923/// map, not against a literal baked in at plan-construction time.
2924///
2925/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2926/// that slipped through) surface as `Error::InvalidSetValue`. The
2927/// planner only emits this op for indexes that do exist, so the
2928/// reader call should find a populated CF unless a concurrent DROP
2929/// raced us (in which case the result is just empty, which matches
2930/// how Neo4j's planner handles the race).
2931struct IndexSeekOp {
2932    var: String,
2933    label: String,
2934    properties: Vec<String>,
2935    value_exprs: Vec<Expr>,
2936    results: Option<Vec<NodeId>>,
2937    cursor: usize,
2938}
2939
2940impl IndexSeekOp {
2941    fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2942        assert_eq!(
2943            properties.len(),
2944            value_exprs.len(),
2945            "IndexSeekOp: properties and values must have equal length"
2946        );
2947        Self {
2948            var,
2949            label,
2950            properties,
2951            value_exprs,
2952            results: None,
2953            cursor: 0,
2954        }
2955    }
2956}
2957
2958impl Operator for IndexSeekOp {
2959    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2960        if self.results.is_none() {
2961            let empty = Row::new();
2962            let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2963            for expr in &self.value_exprs {
2964                let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2965                let property = match value {
2966                    Value::Property(p) => p,
2967                    Value::Null => Property::Null,
2968                    Value::Node(_)
2969                    | Value::Edge(_)
2970                    | Value::List(_)
2971                    | Value::Map(_)
2972                    | Value::Path { .. } => {
2973                        return Err(Error::InvalidSetValue);
2974                    }
2975                };
2976                values.push(property);
2977            }
2978            let ids = ctx
2979                .store
2980                .nodes_by_properties(&self.label, &self.properties, &values)?;
2981            self.results = Some(ids);
2982        }
2983        let ids = self.results.as_ref().unwrap();
2984        while self.cursor < ids.len() {
2985            let id = ids[self.cursor];
2986            self.cursor += 1;
2987            if let Some(node) = ctx.store.get_node(id)? {
2988                let mut row = Row::new();
2989                row.insert(self.var.clone(), Value::Node(node));
2990                return Ok(Some(row));
2991            }
2992        }
2993        Ok(None)
2994    }
2995}
2996
2997/// Physical operator for [`LogicalPlan::PointIndexSeek`]. Evaluates
2998/// its bounds once on first `next()` and drives
2999/// `reader.nodes_in_bbox(...)`.
3000///
3001/// For [`PointSeekBounds::Corners`] the operator passes the corners
3002/// through as-is and short-circuits on SRID mismatch / null / non-Point
3003/// corners — `point.withinbbox` returns null in those cases, which
3004/// excludes the row from a filter, so "no rows" is the same
3005/// observable outcome.
3006///
3007/// For [`PointSeekBounds::Radius`] the operator derives the enclosing
3008/// bbox from the center's SRID: Cartesian gets a `(cx ± r, cy ± r)`
3009/// square; WGS-84 converts `r` metres to lat/lon spans using the
3010/// `cos(lat)` factor for longitude. The enclosing bbox is always a
3011/// *superset* of the circle, so the planner keeps the original
3012/// distance predicate as a residual `Filter` above the seek — the
3013/// corners of the square that fall outside the circle are culled
3014/// there.
3015struct PointIndexSeekOp {
3016    var: String,
3017    label: String,
3018    property: String,
3019    bounds: PointSeekBounds,
3020    results: Option<Vec<NodeId>>,
3021    cursor: usize,
3022}
3023
3024impl PointIndexSeekOp {
3025    fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
3026        Self {
3027            var,
3028            label,
3029            property,
3030            bounds,
3031            results: None,
3032            cursor: 0,
3033        }
3034    }
3035}
3036
3037impl Operator for PointIndexSeekOp {
3038    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3039        if self.results.is_none() {
3040            let empty = Row::new();
3041            let ectx = ctx.eval_ctx(&empty);
3042            let ids = match &self.bounds {
3043                PointSeekBounds::Corners { lo, hi } => {
3044                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3045                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3046                    match (lo_pt, hi_pt) {
3047                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
3048                            &self.label,
3049                            &self.property,
3050                            lo.srid,
3051                            lo.x,
3052                            lo.y,
3053                            hi.x,
3054                            hi.y,
3055                        )?,
3056                        _ => Vec::new(),
3057                    }
3058                }
3059                PointSeekBounds::Radius { center, radius } => {
3060                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
3061                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3062                    match (center_pt, radius_val) {
3063                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3064                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3065                            ctx.store.nodes_in_bbox(
3066                                &self.label,
3067                                &self.property,
3068                                c.srid,
3069                                xlo,
3070                                ylo,
3071                                xhi,
3072                                yhi,
3073                            )?
3074                        }
3075                        // Null / non-point center or null / negative / NaN radius → no rows.
3076                        _ => Vec::new(),
3077                    }
3078                }
3079            };
3080            self.results = Some(ids);
3081        }
3082        let ids = self.results.as_ref().unwrap();
3083        while self.cursor < ids.len() {
3084            let id = ids[self.cursor];
3085            self.cursor += 1;
3086            if let Some(node) = ctx.store.get_node(id)? {
3087                let mut row = Row::new();
3088                row.insert(self.var.clone(), Value::Node(node));
3089                return Ok(Some(row));
3090            }
3091        }
3092        Ok(None)
3093    }
3094}
3095
3096fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
3097    match v {
3098        Value::Property(Property::Point(p)) => Some(*p),
3099        _ => None,
3100    }
3101}
3102
3103fn extract_f64(v: &Value) -> Option<f64> {
3104    match v {
3105        Value::Property(Property::Float64(f)) => Some(*f),
3106        Value::Property(Property::Int64(i)) => Some(*i as f64),
3107        _ => None,
3108    }
3109}
3110
3111/// Enclosing axis-aligned bbox for a circle of radius `r` around
3112/// `center`. Always a superset of the circle, so callers must still
3113/// apply a precision filter on distance. Cartesian SRIDs use a
3114/// straight `center ± r` square in the CRS's own units. Geographic
3115/// SRIDs (WGS-84, SRID 4326 / 4979) convert `r` metres to lat/lon
3116/// spans via the standard flat-earth approximation with a `cos(lat)`
3117/// longitude-span correction; the factor is clamped away from zero
3118/// so near-pole circles don't collapse into a zero-width bbox (the
3119/// pathological case falls back to the full longitude range).
3120fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
3121    if center.is_geographic() {
3122        // One degree of latitude ≈ 111_320 metres on a spherical
3123        // Earth. One degree of longitude shrinks by cos(latitude).
3124        const METRES_PER_DEG: f64 = 111_320.0;
3125        let dlat = r / METRES_PER_DEG;
3126        let cos_lat = center.y.to_radians().cos().abs();
3127        // `cos(lat)` goes to 0 at the poles; clamp so a polar
3128        // circle maps to a wide-but-finite longitude span rather
3129        // than a divide-by-zero.
3130        let cos_lat_floor = cos_lat.max(1.0e-6);
3131        let dlon = r / (METRES_PER_DEG * cos_lat_floor);
3132        (
3133            center.x - dlon,
3134            center.y - dlat,
3135            center.x + dlon,
3136            center.y + dlat,
3137        )
3138    } else {
3139        (center.x - r, center.y - r, center.x + r, center.y + r)
3140    }
3141}
3142
3143/// Relationship-scope analogue of [`IndexSeekOp`]. Seeks edges
3144/// through the `(edge_type, property)` index, hydrates each one,
3145/// and emits a row per matching edge binding `edge_var`, `src_var`,
3146/// `dst_var`. For `Direction::Both` each edge emits two rows — one
3147/// per orientation — so an undirected pattern surfaces both
3148/// endpoint assignments. `residual_properties` carries any
3149/// non-indexed pattern-property equalities that still need
3150/// per-edge filtering.
3151struct EdgeSeekOp {
3152    edge_var: String,
3153    src_var: String,
3154    dst_var: String,
3155    edge_type: String,
3156    property: String,
3157    value_expr: Expr,
3158    direction: Direction,
3159    residual_properties: Vec<(String, Expr)>,
3160    /// Pre-materialized output rows. Built lazily on the first
3161    /// `next()` call so the seek + endpoint fetches run once.
3162    results: Option<Vec<Row>>,
3163    cursor: usize,
3164}
3165
3166impl EdgeSeekOp {
3167    #[allow(clippy::too_many_arguments)]
3168    fn new(
3169        edge_var: String,
3170        src_var: String,
3171        dst_var: String,
3172        edge_type: String,
3173        property: String,
3174        value_expr: Expr,
3175        direction: Direction,
3176        residual_properties: Vec<(String, Expr)>,
3177    ) -> Self {
3178        Self {
3179            edge_var,
3180            src_var,
3181            dst_var,
3182            edge_type,
3183            property,
3184            value_expr,
3185            direction,
3186            residual_properties,
3187            results: None,
3188            cursor: 0,
3189        }
3190    }
3191}
3192
3193impl Operator for EdgeSeekOp {
3194    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3195        if self.results.is_none() {
3196            let empty = Row::new();
3197            let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
3198            let property = match seek_value {
3199                Value::Property(p) => p,
3200                Value::Null => Property::Null,
3201                Value::Node(_)
3202                | Value::Edge(_)
3203                | Value::List(_)
3204                | Value::Map(_)
3205                | Value::Path { .. } => {
3206                    return Err(Error::InvalidSetValue);
3207                }
3208            };
3209            let ids = ctx
3210                .store
3211                .edges_by_property(&self.edge_type, &self.property, &property)?;
3212            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3213            for id in ids {
3214                let Some(edge) = ctx.store.get_edge(id)? else {
3215                    continue;
3216                };
3217                // Residual pattern-property equality filters — same
3218                // shape as `EdgeExpandOp`'s inline edge-properties
3219                // check. Uses an empty row for expr evaluation since
3220                // an EdgeSeek has no upstream input and the seek's
3221                // own bindings aren't visible to its own filters.
3222                let mut residuals_ok = true;
3223                for (key, expr) in &self.residual_properties {
3224                    let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
3225                    let Some(stored) = edge.properties.get(key) else {
3226                        residuals_ok = false;
3227                        break;
3228                    };
3229                    if !values_equal(&Value::Property(stored.clone()), &wanted) {
3230                        residuals_ok = false;
3231                        break;
3232                    }
3233                }
3234                if !residuals_ok {
3235                    continue;
3236                }
3237                // Hydrate both endpoints so downstream operators
3238                // can read labels / properties off the bound vars.
3239                // A deleted endpoint drops the whole row (same as
3240                // `IndexSeekOp` when `get_node` returns None).
3241                let Some(src_node) = ctx.store.get_node(edge.source)? else {
3242                    continue;
3243                };
3244                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3245                    continue;
3246                };
3247                // Emit output rows per direction. Outgoing / Incoming
3248                // bind a single orientation; Both yields two rows —
3249                // the `-[r]-` pattern in openCypher matches edges in
3250                // either orientation and binds endpoints accordingly.
3251                match self.direction {
3252                    Direction::Outgoing => {
3253                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3254                    }
3255                    Direction::Incoming => {
3256                        rows.push(self.make_row(&edge, &dst_node, &src_node));
3257                    }
3258                    Direction::Both => {
3259                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3260                        // Self-loops still only surface once — the
3261                        // second orientation is the same row.
3262                        if edge.source != edge.target {
3263                            rows.push(self.make_row(&edge, &dst_node, &src_node));
3264                        }
3265                    }
3266                }
3267            }
3268            self.results = Some(rows);
3269        }
3270        let rows = self.results.as_ref().unwrap();
3271        if self.cursor < rows.len() {
3272            let row = rows[self.cursor].clone();
3273            self.cursor += 1;
3274            return Ok(Some(row));
3275        }
3276        Ok(None)
3277    }
3278}
3279
3280impl EdgeSeekOp {
3281    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3282        let mut row = Row::new();
3283        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3284        row.insert(self.src_var.clone(), Value::Node(src.clone()));
3285        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3286        row
3287    }
3288}
3289
3290/// Relationship-scope analogue of [`PointIndexSeekOp`]. Drives
3291/// `reader.edges_in_bbox(...)` off the bounds, then hydrates each
3292/// matching edge + its endpoints and emits rows binding
3293/// `edge_var`, `src_var`, `dst_var` — same shape as [`EdgeSeekOp`].
3294/// For `Direction::Both`, each edge emits two rows (one per
3295/// orientation), matching the `-[r]-` pattern semantics.
3296struct EdgePointIndexSeekOp {
3297    edge_var: String,
3298    src_var: String,
3299    dst_var: String,
3300    edge_type: String,
3301    property: String,
3302    direction: Direction,
3303    bounds: PointSeekBounds,
3304    results: Option<Vec<Row>>,
3305    cursor: usize,
3306}
3307
3308impl EdgePointIndexSeekOp {
3309    #[allow(clippy::too_many_arguments)]
3310    fn new(
3311        edge_var: String,
3312        src_var: String,
3313        dst_var: String,
3314        edge_type: String,
3315        property: String,
3316        direction: Direction,
3317        bounds: PointSeekBounds,
3318    ) -> Self {
3319        Self {
3320            edge_var,
3321            src_var,
3322            dst_var,
3323            edge_type,
3324            property,
3325            direction,
3326            bounds,
3327            results: None,
3328            cursor: 0,
3329        }
3330    }
3331
3332    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3333        let mut row = Row::new();
3334        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3335        row.insert(self.src_var.clone(), Value::Node(src.clone()));
3336        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3337        row
3338    }
3339}
3340
3341impl Operator for EdgePointIndexSeekOp {
3342    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3343        if self.results.is_none() {
3344            let empty = Row::new();
3345            let ectx = ctx.eval_ctx(&empty);
3346            let ids = match &self.bounds {
3347                PointSeekBounds::Corners { lo, hi } => {
3348                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3349                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3350                    match (lo_pt, hi_pt) {
3351                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
3352                            &self.edge_type,
3353                            &self.property,
3354                            lo.srid,
3355                            lo.x,
3356                            lo.y,
3357                            hi.x,
3358                            hi.y,
3359                        )?,
3360                        _ => Vec::new(),
3361                    }
3362                }
3363                PointSeekBounds::Radius { center, radius } => {
3364                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
3365                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3366                    match (center_pt, radius_val) {
3367                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3368                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3369                            ctx.store.edges_in_bbox(
3370                                &self.edge_type,
3371                                &self.property,
3372                                c.srid,
3373                                xlo,
3374                                ylo,
3375                                xhi,
3376                                yhi,
3377                            )?
3378                        }
3379                        _ => Vec::new(),
3380                    }
3381                }
3382            };
3383
3384            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3385            for id in ids {
3386                let Some(edge) = ctx.store.get_edge(id)? else {
3387                    continue;
3388                };
3389                let Some(src_node) = ctx.store.get_node(edge.source)? else {
3390                    continue;
3391                };
3392                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3393                    continue;
3394                };
3395                match self.direction {
3396                    Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
3397                    Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
3398                    Direction::Both => {
3399                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3400                        if edge.source != edge.target {
3401                            rows.push(self.make_row(&edge, &dst_node, &src_node));
3402                        }
3403                    }
3404                }
3405            }
3406            self.results = Some(rows);
3407        }
3408        let rows = self.results.as_ref().unwrap();
3409        if self.cursor < rows.len() {
3410            let row = rows[self.cursor].clone();
3411            self.cursor += 1;
3412            return Ok(Some(row));
3413        }
3414        Ok(None)
3415    }
3416}
3417
3418fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3419    props.iter().all(|(k, v)| {
3420        node.properties
3421            .get(k)
3422            .map(|stored| stored == v)
3423            .unwrap_or(false)
3424    })
3425}
3426
3427struct MergeNodeOp {
3428    var: String,
3429    labels: Vec<String>,
3430    /// Pattern property expressions as they came from the planner. These
3431    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
3432    /// isn't available until `next()` is called.
3433    properties: Vec<(String, Expr)>,
3434    /// `ON CREATE SET ...` assignments — applied only when the
3435    /// merge took the create branch. Evaluated against a row
3436    /// `{var → Node}` so the value expressions can reference the
3437    /// just-created node.
3438    on_create: Vec<SetAssignment>,
3439    /// `ON MATCH SET ...` assignments — applied to every matched
3440    /// node when the merge took the match branch. Same row shape
3441    /// as `on_create`.
3442    on_match: Vec<SetAssignment>,
3443    /// Optional upstream operator. `None` means this is a
3444    /// top-level producer (`MERGE (n) RETURN n`) and emits
3445    /// rows with a fresh empty base. `Some` means this is a
3446    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
3447    /// and each emitted row is a cross-join between an input
3448    /// row and a merge-result node.
3449    input: Option<Box<dyn Operator>>,
3450    /// Cached merge result. Populated on the first `next()`
3451    /// call by running the scan + maybe-create logic *once*,
3452    /// then reused for every input row. Running the merge
3453    /// exactly once sidesteps the read-after-write issue in
3454    /// buffered-writer mode (a node created by the first input
3455    /// row wouldn't be visible to a re-scan on the second).
3456    merged_nodes: Vec<Node>,
3457    /// Whether `merged_nodes` has been populated. The merge
3458    /// logic runs lazily on the first `next()` so
3459    /// `ExecCtx::params` is available.
3460    merge_done: bool,
3461    cursor: usize,
3462    /// Eager-evaluated output for the input-driven case: drain
3463    /// the entire input on first call, run merge for each row
3464    /// (writing to the store as we go), and accumulate the
3465    /// (input_row, merged_node) cross-products. Lazy per-row
3466    /// merging interleaved writes with downstream scan caching
3467    /// for `WITH *`-then-MATCH patterns. Only populated when
3468    /// `input.is_some()`; the standalone case (`input.is_none()`)
3469    /// is already eager via `merge_done`.
3470    input_buffered: Option<Vec<Row>>,
3471    input_cursor: usize,
3472}
3473
3474impl MergeNodeOp {
3475    fn new(
3476        input: Option<Box<dyn Operator>>,
3477        var: String,
3478        labels: Vec<String>,
3479        properties: Vec<(String, Expr)>,
3480        on_create: Vec<SetAssignment>,
3481        on_match: Vec<SetAssignment>,
3482    ) -> Self {
3483        Self {
3484            var,
3485            labels,
3486            properties,
3487            on_create,
3488            on_match,
3489            input,
3490            merged_nodes: Vec::new(),
3491            merge_done: false,
3492            cursor: 0,
3493            input_buffered: None,
3494            input_cursor: 0,
3495        }
3496    }
3497
3498    /// Run the MERGE logic exactly once: scan the store for
3499    /// existing matches, apply ON MATCH SET to each; or create
3500    /// a fresh node and apply ON CREATE SET; persist everything
3501    /// via `ctx.writer`; stash the resulting nodes in
3502    /// `self.merged_nodes`. Idempotent — subsequent calls are
3503    /// no-ops once `self.merge_done` is set.
3504    /// Resolve the pattern properties against `base`, scan the
3505    /// store, and either match existing nodes or create a fresh
3506    /// one. Returns the resulting node set — can be called
3507    /// multiple times with different `base` rows for
3508    /// input-driven merges.
3509    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3510        let resolved_props: Vec<(String, Property)> = self
3511            .properties
3512            .iter()
3513            .map(|(k, expr)| {
3514                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3515                Ok((k.clone(), value_to_property(v)?))
3516            })
3517            .collect::<Result<Vec<_>>>()?;
3518
3519        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3520            ctx.store.nodes_by_label(primary)?
3521        } else {
3522            ctx.store.all_node_ids()?
3523        };
3524        let mut merged_nodes: Vec<Node> = Vec::new();
3525        for id in candidate_ids {
3526            if let Some(node) = ctx.store.get_node(id)? {
3527                if has_all_labels(&node, &self.labels)
3528                    && matches_pattern_props(&node, &resolved_props)
3529                {
3530                    merged_nodes.push(node);
3531                }
3532            }
3533        }
3534
3535        if merged_nodes.is_empty() {
3536            let mut node = Node::new();
3537            for label in &self.labels {
3538                node.labels.push(label.clone());
3539            }
3540            for (k, prop) in resolved_props {
3541                node.properties.insert(k, prop);
3542            }
3543            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3544            ctx.writer.put_node(&node)?;
3545            merged_nodes.push(node);
3546        } else if !self.on_match.is_empty() {
3547            for node in merged_nodes.iter_mut() {
3548                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3549                ctx.writer.put_node(node)?;
3550            }
3551        }
3552        Ok(merged_nodes)
3553    }
3554}
3555
3556impl Operator for MergeNodeOp {
3557    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3558        // Top-level producer: no upstream context, so the pattern
3559        // properties can only reference literals / parameters.
3560        // Run the merge once against an empty row, then emit
3561        // each result.
3562        if self.input.is_none() {
3563            if !self.merge_done {
3564                let empty = Row::new();
3565                let nodes = self.run_merge_for(ctx, &empty)?;
3566                self.merged_nodes = nodes;
3567                self.merge_done = true;
3568            }
3569            if self.cursor < self.merged_nodes.len() {
3570                let node = self.merged_nodes[self.cursor].clone();
3571                self.cursor += 1;
3572                let mut row = Row::new();
3573                row.insert(self.var.clone(), Value::Node(node));
3574                return Ok(Some(row));
3575            }
3576            return Ok(None);
3577        }
3578
3579        // Input-driven case: evaluate pattern properties *per
3580        // input row* so references like `MERGE (:City {name:
3581        // person.bornIn})` resolve against the bound `person`.
3582        // Eager evaluation — drain all input + run all merges +
3583        // cross-join with input rows before yielding any. Lazy
3584        // per-row merging interleaved writes with downstream
3585        // scan caching, which broke `WITH *`-then-MATCH chains.
3586        if self.input_buffered.is_none() {
3587            let mut input_rows: Vec<Row> = Vec::new();
3588            while let Some(row) = self.input.as_mut().unwrap().next(ctx)? {
3589                input_rows.push(row);
3590            }
3591            let mut output: Vec<Row> = Vec::new();
3592            for input_row in input_rows {
3593                let nodes = self.run_merge_for(ctx, &input_row)?;
3594                for node in nodes {
3595                    let mut out = input_row.clone();
3596                    out.insert(self.var.clone(), Value::Node(node));
3597                    output.push(out);
3598                }
3599            }
3600            self.input_buffered = Some(output);
3601            self.input_cursor = 0;
3602        }
3603        let rows = self.input_buffered.as_ref().unwrap();
3604        if self.input_cursor < rows.len() {
3605            let row = rows[self.input_cursor].clone();
3606            self.input_cursor += 1;
3607            return Ok(Some(row));
3608        }
3609        Ok(None)
3610    }
3611}
3612
3613/// Apply MERGE-conditional SET assignments (`ON CREATE` or
3614/// Find-or-create executor for edge MERGE
3615/// (`MERGE (a)-[r:KNOWS]->(b)`).
3616///
3617/// For every row pulled from `input`, looks up the `src_var`
3618/// and `dst_var` bindings (which must be `Value::Node` — the
3619/// planner enforces that they came from a prior MATCH or
3620/// MERGE), scans `src`'s outgoing edges, and either:
3621///
3622/// - Picks the first edge of type `edge_type` whose target is
3623///   `dst` and applies `on_match` to it, or
3624/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
3625///   `on_create`, and persists it via `ctx.writer.put_edge`.
3626///
3627/// Either way, the resulting edge is bound into `edge_var` in
3628/// the output row and the row is emitted. v1 restrictions:
3629/// single directed hop, both endpoints already bound,
3630/// explicit relationship type.
3631struct MergeEdgeOp {
3632    input: Box<dyn Operator>,
3633    edge_var: String,
3634    src_var: String,
3635    dst_var: String,
3636    edge_type: String,
3637    undirected: bool,
3638    /// Inline edge property filter from the MERGE pattern
3639    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
3640    /// the create branch stamps them onto the new edge.
3641    properties: Vec<(String, Expr)>,
3642    on_create: Vec<SetAssignment>,
3643    on_match: Vec<SetAssignment>,
3644    /// All output rows from the eager merge pass, queued for
3645    /// streaming. Populated on the first `next()` call: drain
3646    /// input, run merge for each row (writing to the store as
3647    /// we go), accumulate every (input_row × matched_edge)
3648    /// cross-product. Eager evaluation is required for
3649    /// `WITH *`-then-MATCH chains — see `next` impl for why.
3650    pending: std::collections::VecDeque<Row>,
3651    /// Set after the eager merge pass populates `pending` so
3652    /// subsequent `next()` calls don't re-drain the (already
3653    /// exhausted) input.
3654    drained: bool,
3655}
3656
3657impl MergeEdgeOp {
3658    #[allow(clippy::too_many_arguments)]
3659    fn new(
3660        input: Box<dyn Operator>,
3661        edge_var: String,
3662        src_var: String,
3663        dst_var: String,
3664        edge_type: String,
3665        undirected: bool,
3666        properties: Vec<(String, Expr)>,
3667        on_create: Vec<SetAssignment>,
3668        on_match: Vec<SetAssignment>,
3669    ) -> Self {
3670        Self {
3671            input,
3672            edge_var,
3673            src_var,
3674            dst_var,
3675            edge_type,
3676            undirected,
3677            properties,
3678            on_create,
3679            on_match,
3680            pending: std::collections::VecDeque::new(),
3681            drained: false,
3682        }
3683    }
3684}
3685
3686impl MergeEdgeOp {
3687    /// Per-input-row merge step: scan for matching edges, take
3688    /// either the match or create branch, push resulting output
3689    /// rows onto `out`. Factored out so the eager `next()` can
3690    /// run this for every drained input row before yielding.
3691    fn merge_for(&self, ctx: &ExecCtx, row: Row, out: &mut Vec<Row>) -> Result<()> {
3692        // Resolve src/dst. Both must be Value::Node — the
3693        // planner enforces that the variables came from an
3694        // earlier producer, so anything else is a bug or a
3695        // later-added feature that didn't update the check.
3696        let src_node = match row.get(&self.src_var) {
3697            Some(Value::Node(n)) => n.clone(),
3698            _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3699        };
3700        let dst_node = match row.get(&self.dst_var) {
3701            Some(Value::Node(n)) => n.clone(),
3702            _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3703        };
3704
3705        // Evaluate the inline edge property filter once per
3706        // input row. These are AST expressions so they can
3707        // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
3708        let required_props: Vec<(String, Property)> = self
3709            .properties
3710            .iter()
3711            .map(|(k, expr)| {
3712                let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3713                Ok((k.clone(), value_to_property(v)?))
3714            })
3715            .collect::<Result<Vec<_>>>()?;
3716        let edge_matches = |edge: &Edge| -> bool {
3717            required_props.iter().all(|(k, want)| {
3718                edge.properties
3719                    .get(k)
3720                    .map(|have| have == want)
3721                    .unwrap_or(false)
3722            })
3723        };
3724
3725        // Collect every edge of type `edge_type` from src to
3726        // dst (and, for undirected patterns, dst to src) that
3727        // also satisfies the inline property filter.
3728        let mut matched: Vec<Edge> = Vec::new();
3729        for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3730            if neighbor_id != dst_node.id {
3731                continue;
3732            }
3733            if let Some(edge) = ctx.store.get_edge(edge_id)? {
3734                if edge.edge_type == self.edge_type && edge_matches(&edge) {
3735                    matched.push(edge);
3736                }
3737            }
3738        }
3739        if self.undirected {
3740            for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3741                if neighbor_id != dst_node.id {
3742                    continue;
3743                }
3744                if let Some(edge) = ctx.store.get_edge(edge_id)? {
3745                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
3746                        matched.push(edge);
3747                    }
3748                }
3749            }
3750        }
3751
3752        if matched.is_empty() {
3753            let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3754            for (k, p) in &required_props {
3755                new_edge.properties.insert(k.clone(), p.clone());
3756            }
3757            let mut row_out = row.clone();
3758            apply_merge_edge_actions(
3759                &mut new_edge,
3760                &self.on_create,
3761                &self.edge_var,
3762                ctx,
3763                &mut row_out,
3764            )?;
3765            ctx.writer.put_edge(&new_edge)?;
3766            row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3767            out.push(row_out);
3768        } else {
3769            for mut existing in matched {
3770                let mut row_out = row.clone();
3771                if !self.on_match.is_empty() {
3772                    apply_merge_edge_actions(
3773                        &mut existing,
3774                        &self.on_match,
3775                        &self.edge_var,
3776                        ctx,
3777                        &mut row_out,
3778                    )?;
3779                    ctx.writer.put_edge(&existing)?;
3780                }
3781                row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3782                out.push(row_out);
3783            }
3784        }
3785        Ok(())
3786    }
3787}
3788
3789impl Operator for MergeEdgeOp {
3790    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3791        // Eager evaluation — drain all input + run all merges
3792        // (writing to the store) before yielding any row. Lazy
3793        // per-input-row merging interleaved writes with
3794        // downstream scan caching for `WITH *`-then-MATCH chains.
3795        if self.pending.is_empty() && !self.drained {
3796            let mut input_rows: Vec<Row> = Vec::new();
3797            while let Some(row) = self.input.next(ctx)? {
3798                input_rows.push(row);
3799            }
3800            let mut out: Vec<Row> = Vec::new();
3801            for row in input_rows {
3802                self.merge_for(ctx, row, &mut out)?;
3803            }
3804            self.pending.extend(out);
3805            self.drained = true;
3806        }
3807        Ok(self.pending.pop_front())
3808    }
3809}
3810
3811/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
3812/// each `SetAssignment` against the outer `row` (augmented with
3813/// the current edge binding) and mutates either the edge itself
3814/// or a non-edge target node from the outer row. MERGE's ON
3815/// CREATE / ON MATCH clauses are scoped against the whole input
3816/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
3817/// reach outside the edge-local binding. Non-edge mutations
3818/// are persisted via `ctx.writer.put_node` so the change is
3819/// visible to later clauses.
3820fn apply_merge_edge_actions(
3821    edge: &mut Edge,
3822    actions: &[SetAssignment],
3823    var: &str,
3824    exec_ctx: &ExecCtx,
3825    outer: &mut Row,
3826) -> Result<()> {
3827    if actions.is_empty() {
3828        return Ok(());
3829    }
3830    // Edge binding is live in `outer` while we evaluate — RHS can
3831    // reference both the edge and any sibling node binding.
3832    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3833    for action in actions {
3834        match action {
3835            SetAssignment::Property {
3836                var: target,
3837                key,
3838                value,
3839            } => {
3840                let sub_ctx = exec_ctx.eval_ctx(outer);
3841                let evaluated = eval_expr(value, &sub_ctx)?;
3842                let prop = value_to_property(evaluated)?;
3843                if target == var {
3844                    if matches!(prop, Property::Null) {
3845                        edge.properties.remove(key);
3846                    } else {
3847                        edge.properties.insert(key.clone(), prop);
3848                    }
3849                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3850                } else {
3851                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3852                }
3853            }
3854            SetAssignment::Merge {
3855                var: target,
3856                properties,
3857            } => {
3858                let sub_ctx = exec_ctx.eval_ctx(outer);
3859                let resolved: Vec<(String, Property)> = properties
3860                    .iter()
3861                    .map(|(k, expr)| {
3862                        let v = eval_expr(expr, &sub_ctx)?;
3863                        Ok((k.clone(), value_to_property(v)?))
3864                    })
3865                    .collect::<Result<Vec<_>>>()?;
3866                if target == var {
3867                    for (k, p) in resolved {
3868                        edge.properties.insert(k, p);
3869                    }
3870                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3871                } else {
3872                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3873                }
3874            }
3875            SetAssignment::Replace {
3876                var: target,
3877                properties,
3878            } => {
3879                let sub_ctx = exec_ctx.eval_ctx(outer);
3880                let resolved: Vec<(String, Property)> = properties
3881                    .iter()
3882                    .map(|(k, expr)| {
3883                        let v = eval_expr(expr, &sub_ctx)?;
3884                        Ok((k.clone(), value_to_property(v)?))
3885                    })
3886                    .collect::<Result<Vec<_>>>()?;
3887                if target == var {
3888                    edge.properties.clear();
3889                    for (k, p) in resolved {
3890                        edge.properties.insert(k, p);
3891                    }
3892                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3893                } else {
3894                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3895                }
3896            }
3897            SetAssignment::Labels {
3898                var: target,
3899                labels,
3900            } => {
3901                if target == var {
3902                    // Edges don't carry labels.
3903                    return Err(Error::UnboundVariable(target.clone()));
3904                }
3905                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3906            }
3907            SetAssignment::ReplaceFromExpr {
3908                var: target,
3909                source,
3910                replace,
3911            } => {
3912                let sub_ctx = exec_ctx.eval_ctx(outer);
3913                let v = eval_expr(source, &sub_ctx)?;
3914                let props = extract_property_map(&v)?;
3915                if target == var {
3916                    if *replace {
3917                        edge.properties.clear();
3918                    }
3919                    for (k, p) in props {
3920                        edge.properties.insert(k, p);
3921                    }
3922                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3923                } else {
3924                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3925                }
3926            }
3927        }
3928    }
3929    Ok(())
3930}
3931
3932/// Apply a single `SET target.key = prop` to a node or edge bound
3933/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
3934/// the target isn't the merge edge itself (the common case being
3935/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
3936fn apply_set_prop_to_outer(
3937    outer: &mut Row,
3938    exec_ctx: &ExecCtx,
3939    target: &str,
3940    key: &str,
3941    prop: Property,
3942) -> Result<()> {
3943    match outer.get_mut(target) {
3944        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3945            // SET on a null target (typically an unmatched OPTIONAL
3946            // MATCH binding) is a silent no-op in openCypher.
3947            return Ok(());
3948        }
3949        Some(Value::Node(n)) => {
3950            if matches!(prop, Property::Null) {
3951                n.properties.remove(key);
3952            } else {
3953                n.properties.insert(key.to_string(), prop);
3954            }
3955            exec_ctx.writer.put_node(n)?;
3956        }
3957        Some(Value::Edge(e)) => {
3958            if matches!(prop, Property::Null) {
3959                e.properties.remove(key);
3960            } else {
3961                e.properties.insert(key.to_string(), prop);
3962            }
3963            exec_ctx.writer.put_edge(e)?;
3964        }
3965        _ => return Err(Error::UnboundVariable(target.to_string())),
3966    }
3967    Ok(())
3968}
3969
3970/// Apply a property-map assignment (`SET target = {..}` when
3971/// `replace`, or `SET target += {..}` when not) to a node or
3972/// edge bound in the outer row.
3973fn apply_set_map_to_outer(
3974    outer: &mut Row,
3975    exec_ctx: &ExecCtx,
3976    target: &str,
3977    props: Vec<(String, Property)>,
3978    replace: bool,
3979) -> Result<()> {
3980    match outer.get_mut(target) {
3981        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3982        Some(Value::Node(n)) => {
3983            if replace {
3984                n.properties.clear();
3985            }
3986            for (k, p) in props {
3987                if replace || !matches!(p, Property::Null) {
3988                    n.properties.insert(k, p);
3989                } else {
3990                    n.properties.remove(&k);
3991                }
3992            }
3993            exec_ctx.writer.put_node(n)?;
3994            Ok(())
3995        }
3996        Some(Value::Edge(e)) => {
3997            if replace {
3998                e.properties.clear();
3999            }
4000            for (k, p) in props {
4001                if replace || !matches!(p, Property::Null) {
4002                    e.properties.insert(k, p);
4003                } else {
4004                    e.properties.remove(&k);
4005                }
4006            }
4007            exec_ctx.writer.put_edge(e)?;
4008            Ok(())
4009        }
4010        _ => Err(Error::UnboundVariable(target.to_string())),
4011    }
4012}
4013
4014/// Apply a labels assignment to a node bound in the outer row.
4015fn apply_set_labels_to_outer(
4016    outer: &mut Row,
4017    exec_ctx: &ExecCtx,
4018    target: &str,
4019    labels: &[String],
4020) -> Result<()> {
4021    match outer.get_mut(target) {
4022        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
4023        Some(Value::Node(n)) => {
4024            for label in labels {
4025                if !n.labels.contains(label) {
4026                    n.labels.push(label.clone());
4027                }
4028            }
4029            exec_ctx.writer.put_node(n)?;
4030            Ok(())
4031        }
4032        _ => Err(Error::UnboundVariable(target.to_string())),
4033    }
4034}
4035
4036/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
4037/// dispatch but specialized to a single bound variable so we
4038/// don't have to materialize a full row dispatcher.
4039///
4040/// Value expressions are evaluated against a temporary row
4041/// `{var → Node(node.clone())}` so the RHS can reference the
4042/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
4043/// works the same as if the SET ran in a SetPropertyOp pipeline.
4044fn apply_merge_actions(
4045    node: &mut Node,
4046    actions: &[SetAssignment],
4047    var: &str,
4048    exec_ctx: &ExecCtx,
4049    base_row: &Row,
4050) -> Result<()> {
4051    if actions.is_empty() {
4052        return Ok(());
4053    }
4054    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
4055    // can resolve `other` — then overlay the merged node under `var`.
4056    let mut row = base_row.clone();
4057    row.insert(var.to_string(), Value::Node(node.clone()));
4058    for action in actions {
4059        let sub_ctx = exec_ctx.eval_ctx(&row);
4060        match action {
4061            SetAssignment::Property {
4062                var: target,
4063                key,
4064                value,
4065            } => {
4066                if target != var {
4067                    return Err(Error::UnboundVariable(target.clone()));
4068                }
4069                let evaluated = eval_expr(value, &sub_ctx)?;
4070                let prop = value_to_property(evaluated)?;
4071                node.properties.insert(key.clone(), prop);
4072                row.insert(var.to_string(), Value::Node(node.clone()));
4073            }
4074            SetAssignment::Labels {
4075                var: target,
4076                labels,
4077            } => {
4078                if target != var {
4079                    return Err(Error::UnboundVariable(target.clone()));
4080                }
4081                for label in labels {
4082                    if !node.labels.contains(label) {
4083                        node.labels.push(label.clone());
4084                    }
4085                }
4086                row.insert(var.to_string(), Value::Node(node.clone()));
4087            }
4088            SetAssignment::Replace {
4089                var: target,
4090                properties,
4091            } => {
4092                if target != var {
4093                    return Err(Error::UnboundVariable(target.clone()));
4094                }
4095                let resolved: Vec<(String, Property)> = properties
4096                    .iter()
4097                    .map(|(k, expr)| {
4098                        let v = eval_expr(expr, &sub_ctx)?;
4099                        Ok((k.clone(), value_to_property(v)?))
4100                    })
4101                    .collect::<Result<Vec<_>>>()?;
4102                node.properties.clear();
4103                for (k, p) in resolved {
4104                    node.properties.insert(k, p);
4105                }
4106                row.insert(var.to_string(), Value::Node(node.clone()));
4107            }
4108            SetAssignment::Merge {
4109                var: target,
4110                properties,
4111            } => {
4112                if target != var {
4113                    return Err(Error::UnboundVariable(target.clone()));
4114                }
4115                let resolved: Vec<(String, Property)> = properties
4116                    .iter()
4117                    .map(|(k, expr)| {
4118                        let v = eval_expr(expr, &sub_ctx)?;
4119                        Ok((k.clone(), value_to_property(v)?))
4120                    })
4121                    .collect::<Result<Vec<_>>>()?;
4122                for (k, p) in resolved {
4123                    node.properties.insert(k, p);
4124                }
4125                row.insert(var.to_string(), Value::Node(node.clone()));
4126            }
4127            SetAssignment::ReplaceFromExpr {
4128                var: target,
4129                source,
4130                replace,
4131            } => {
4132                if target != var {
4133                    return Err(Error::UnboundVariable(target.clone()));
4134                }
4135                let v = eval_expr(source, &sub_ctx)?;
4136                let props = extract_property_map(&v)?;
4137                if *replace {
4138                    node.properties.clear();
4139                }
4140                for (k, p) in props {
4141                    node.properties.insert(k, p);
4142                }
4143                row.insert(var.to_string(), Value::Node(node.clone()));
4144            }
4145        }
4146    }
4147    Ok(())
4148}
4149
4150struct EdgeExpandOp {
4151    input: Box<dyn Operator>,
4152    src_var: String,
4153    edge_var: Option<String>,
4154    dst_var: String,
4155    dst_labels: Vec<String>,
4156    edge_properties: Vec<(String, Expr)>,
4157    edge_types: Vec<String>,
4158    direction: Direction,
4159    /// When set, only the specific edge whose id matches the
4160    /// row-bound value counts as a match. Used by fresh-scan
4161    /// MATCH patterns that reuse an edge variable from a prior
4162    /// clause so the new hop stays an existence check instead of
4163    /// rebinding and clobbering the outer edge.
4164    edge_constraint_var: Option<String>,
4165    current_row: Option<Row>,
4166    pending: Vec<(EdgeId, NodeId)>,
4167    pending_idx: usize,
4168}
4169
4170impl EdgeExpandOp {
4171    #[allow(clippy::too_many_arguments)]
4172    fn new(
4173        input: Box<dyn Operator>,
4174        src_var: String,
4175        edge_var: Option<String>,
4176        dst_var: String,
4177        dst_labels: Vec<String>,
4178        edge_properties: Vec<(String, Expr)>,
4179        edge_types: Vec<String>,
4180        direction: Direction,
4181        edge_constraint_var: Option<String>,
4182    ) -> Self {
4183        Self {
4184            input,
4185            src_var,
4186            edge_var,
4187            dst_var,
4188            dst_labels,
4189            edge_properties,
4190            edge_types,
4191            direction,
4192            edge_constraint_var,
4193            current_row: None,
4194            pending: Vec::new(),
4195            pending_idx: 0,
4196        }
4197    }
4198}
4199
4200impl Operator for EdgeExpandOp {
4201    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4202        loop {
4203            while self.pending_idx < self.pending.len() {
4204                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4205                self.pending_idx += 1;
4206
4207                let edge = match ctx.store.get_edge(edge_id)? {
4208                    Some(e) => e,
4209                    None => continue,
4210                };
4211                if !self.edge_types.is_empty()
4212                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4213                {
4214                    continue;
4215                }
4216                // Pre-bound edge constraint: only accept the edge
4217                // whose id matches the row-bound value. Falls
4218                // through to outer scopes so a fresh right-side
4219                // scan of a CartesianProduct can still see the
4220                // bound edge from the left side. Non-edge / null
4221                // bindings trigger no matches at all — the
4222                // expansion yields nothing for that input row.
4223                if let Some(constraint_var) = &self.edge_constraint_var {
4224                    let base = self
4225                        .current_row
4226                        .as_ref()
4227                        .expect("pending edges without source row");
4228                    let expected = match ctx.lookup_binding(base, constraint_var) {
4229                        Some(Value::Edge(e)) => Some(e.id),
4230                        _ => None,
4231                    };
4232                    match expected {
4233                        Some(id) if id != edge.id => continue,
4234                        None => continue,
4235                        _ => {}
4236                    }
4237                }
4238                // Inline edge property filter: every (key, value)
4239                // must match the traversed edge's property of the
4240                // same name via `=` equality. Missing keys fail the
4241                // check (matching Neo4j).
4242                if !self.edge_properties.is_empty() {
4243                    let base = self
4244                        .current_row
4245                        .as_ref()
4246                        .expect("pending edges without source row");
4247                    let ectx = ctx.eval_ctx(base);
4248                    let mut ok = true;
4249                    for (key, expr) in &self.edge_properties {
4250                        let expected = eval_expr(expr, &ectx)?;
4251                        let actual = match edge.properties.get(key) {
4252                            Some(v) => Value::Property(v.clone()),
4253                            None => {
4254                                ok = false;
4255                                break;
4256                            }
4257                        };
4258                        if !values_equal(&actual, &expected) {
4259                            ok = false;
4260                            break;
4261                        }
4262                    }
4263                    if !ok {
4264                        continue;
4265                    }
4266                }
4267
4268                let neighbor = match ctx.store.get_node(neighbor_id)? {
4269                    Some(n) => n,
4270                    None => continue,
4271                };
4272                if !has_all_labels(&neighbor, &self.dst_labels) {
4273                    continue;
4274                }
4275
4276                let base = self
4277                    .current_row
4278                    .as_ref()
4279                    .expect("pending edges without source row");
4280                let mut out = base.clone();
4281                if let Some(ev) = &self.edge_var {
4282                    out.insert(ev.clone(), Value::Edge(edge));
4283                }
4284                out.insert(self.dst_var.clone(), Value::Node(neighbor));
4285                return Ok(Some(out));
4286            }
4287
4288            match self.input.next(ctx)? {
4289                None => return Ok(None),
4290                Some(row) => {
4291                    let src_id = match row.get(&self.src_var) {
4292                        Some(Value::Node(n)) => n.id,
4293                        // A null source (e.g. from OPTIONAL MATCH
4294                        // that matched nothing) drops the input
4295                        // row — `MATCH (a)-->(b)` against a null
4296                        // `a` is just empty, not an error.
4297                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4298                            continue
4299                        }
4300                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4301                    };
4302                    self.pending = match self.direction {
4303                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
4304                        Direction::Incoming => ctx.store.incoming(src_id)?,
4305                        Direction::Both => {
4306                            // For undirected traversal, a self-loop
4307                            // appears once as outgoing and once as
4308                            // incoming. The Cypher spec visits each
4309                            // physical edge once, so dedupe by
4310                            // edge_id before emitting.
4311                            let mut all = ctx.store.outgoing(src_id)?;
4312                            let mut seen: std::collections::HashSet<EdgeId> =
4313                                all.iter().map(|(e, _)| *e).collect();
4314                            for (e, n) in ctx.store.incoming(src_id)? {
4315                                if seen.insert(e) {
4316                                    all.push((e, n));
4317                                }
4318                            }
4319                            all
4320                        }
4321                    };
4322                    self.pending_idx = 0;
4323                    self.current_row = Some(row);
4324                }
4325            }
4326        }
4327    }
4328}
4329
4330/// Left-join variant of [`EdgeExpandOp`]. For each input row,
4331/// expands the adjacency in the configured direction and
4332/// filters by `edge_type` / `dst_labels` — if **any** neighbor
4333/// survives the filters, emits rows exactly like
4334/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
4335/// that carries the input row's bindings plus `edge_var` /
4336/// `dst_var` set to `Value::Null`, preserving the input row in
4337/// the output stream. This is the left-outer-join semantics
4338/// OPTIONAL MATCH needs.
4339///
4340/// Tracks per-input-row whether any output was produced so the
4341/// fallback Null row is only emitted after the pending buffer
4342/// drains without yielding anything. The `yielded_for_current`
4343/// flag is reset whenever a new input row is pulled.
4344struct OptionalEdgeExpandOp {
4345    input: Box<dyn Operator>,
4346    src_var: String,
4347    edge_var: Option<String>,
4348    dst_var: String,
4349    dst_labels: Vec<String>,
4350    dst_properties: Vec<(String, Expr)>,
4351    edge_types: Vec<String>,
4352    direction: Direction,
4353    /// When set, edges whose target id differs from the node
4354    /// bound at this variable in the current row are skipped
4355    /// inside the expansion loop. A row whose outgoing edges all
4356    /// fail the constraint then triggers the same null-fallback
4357    /// as having no edges at all.
4358    dst_constraint_var: Option<String>,
4359    /// When set, the expansion only considers the single edge
4360    /// whose id matches the edge already bound at this row
4361    /// variable — used when the OPTIONAL MATCH pattern reuses
4362    /// an edge variable from a prior clause.
4363    edge_constraint_var: Option<String>,
4364    current_row: Option<Row>,
4365    pending: Vec<(EdgeId, NodeId)>,
4366    pending_idx: usize,
4367    yielded_for_current: bool,
4368}
4369
4370impl OptionalEdgeExpandOp {
4371    #[allow(clippy::too_many_arguments)]
4372    fn new(
4373        input: Box<dyn Operator>,
4374        src_var: String,
4375        edge_var: Option<String>,
4376        dst_var: String,
4377        dst_labels: Vec<String>,
4378        dst_properties: Vec<(String, Expr)>,
4379        edge_types: Vec<String>,
4380        direction: Direction,
4381        dst_constraint_var: Option<String>,
4382        edge_constraint_var: Option<String>,
4383    ) -> Self {
4384        Self {
4385            input,
4386            src_var,
4387            edge_var,
4388            dst_var,
4389            dst_labels,
4390            dst_properties,
4391            edge_types,
4392            direction,
4393            dst_constraint_var,
4394            edge_constraint_var,
4395            current_row: None,
4396            pending: Vec::new(),
4397            pending_idx: 0,
4398            yielded_for_current: false,
4399        }
4400    }
4401}
4402
4403impl Operator for OptionalEdgeExpandOp {
4404    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4405        loop {
4406            while self.pending_idx < self.pending.len() {
4407                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4408                self.pending_idx += 1;
4409
4410                let edge = match ctx.store.get_edge(edge_id)? {
4411                    Some(e) => e,
4412                    None => continue,
4413                };
4414                if !self.edge_types.is_empty()
4415                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4416                {
4417                    continue;
4418                }
4419                // Pre-bound edge constraint: only the specific
4420                // edge whose id matches the row-bound value
4421                // counts as a match. Falls through to outer
4422                // scopes so the constraint works for fresh
4423                // scans on the right side of a CartesianProduct.
4424                if let Some(constraint_var) = &self.edge_constraint_var {
4425                    let base = self
4426                        .current_row
4427                        .as_ref()
4428                        .expect("pending without source row");
4429                    let expected = match ctx.lookup_binding(base, constraint_var) {
4430                        Some(Value::Edge(e)) => Some(e.id),
4431                        _ => None,
4432                    };
4433                    match expected {
4434                        Some(id) if id != edge.id => continue,
4435                        None => continue,
4436                        _ => {}
4437                    }
4438                }
4439
4440                let neighbor = match ctx.store.get_node(neighbor_id)? {
4441                    Some(n) => n,
4442                    None => continue,
4443                };
4444                if !has_all_labels(&neighbor, &self.dst_labels) {
4445                    continue;
4446                }
4447                // Bound-endpoint constraint: when the declared
4448                // target is already bound in the row, only edges
4449                // that lead to that exact node count as a match.
4450                // Edges failing the constraint are silently
4451                // skipped — if every candidate fails, the
4452                // per-row left-join fallback below still fires.
4453                if let Some(constraint_var) = &self.dst_constraint_var {
4454                    let base = self
4455                        .current_row
4456                        .as_ref()
4457                        .expect("pending without source row");
4458                    let bound_id = match base.get(constraint_var) {
4459                        Some(Value::Node(n)) => Some(n.id),
4460                        Some(Value::Null)
4461                        | Some(Value::Property(meshdb_core::Property::Null))
4462                        | None => None,
4463                        _ => None,
4464                    };
4465                    match bound_id {
4466                        Some(id) if id != neighbor.id => continue,
4467                        None => continue,
4468                        _ => {}
4469                    }
4470                }
4471                if !self.dst_properties.is_empty() {
4472                    let base = self
4473                        .current_row
4474                        .as_ref()
4475                        .expect("pending without source row");
4476                    let ectx = ctx.eval_ctx(base);
4477                    let mut props_ok = true;
4478                    for (key, expr) in &self.dst_properties {
4479                        let expected = eval_expr(expr, &ectx)?;
4480                        let actual = neighbor
4481                            .properties
4482                            .get(key)
4483                            .cloned()
4484                            .map(Value::Property)
4485                            .unwrap_or(Value::Null);
4486                        if !values_equal(&expected, &actual) {
4487                            props_ok = false;
4488                            break;
4489                        }
4490                    }
4491                    if !props_ok {
4492                        continue;
4493                    }
4494                }
4495
4496                let base = self
4497                    .current_row
4498                    .as_ref()
4499                    .expect("pending edges without source row");
4500                let mut out = base.clone();
4501                if let Some(ev) = &self.edge_var {
4502                    out.insert(ev.clone(), Value::Edge(edge));
4503                }
4504                out.insert(self.dst_var.clone(), Value::Node(neighbor));
4505                self.yielded_for_current = true;
4506                return Ok(Some(out));
4507            }
4508
4509            // Pending drained for the current row. If nothing was
4510            // yielded, emit the left-join fallback: preserve the
4511            // input row with the optional variables set to Null.
4512            //
4513            // Exception: when an edge / dst constraint names a
4514            // variable that was pre-bound in the input row, the
4515            // fallback must NOT clobber that variable — the
4516            // constraint turns the expansion into an existence
4517            // check and the outer value should survive through.
4518            if let Some(base) = self.current_row.take() {
4519                if !self.yielded_for_current {
4520                    let mut out = base;
4521                    if let Some(ev) = &self.edge_var {
4522                        let preserve = self
4523                            .edge_constraint_var
4524                            .as_ref()
4525                            .map(|c| c == ev)
4526                            .unwrap_or(false);
4527                        if !preserve {
4528                            out.insert(ev.clone(), Value::Null);
4529                        }
4530                    }
4531                    let preserve_dst = self
4532                        .dst_constraint_var
4533                        .as_ref()
4534                        .map(|c| c == &self.dst_var)
4535                        .unwrap_or(false);
4536                    if !preserve_dst {
4537                        out.insert(self.dst_var.clone(), Value::Null);
4538                    }
4539                    self.yielded_for_current = true;
4540                    return Ok(Some(out));
4541                }
4542            }
4543
4544            match self.input.next(ctx)? {
4545                None => return Ok(None),
4546                Some(row) => {
4547                    let src_id = match row.get(&self.src_var) {
4548                        Some(Value::Node(n)) => n.id,
4549                        // src_var is Null (because a prior
4550                        // OPTIONAL MATCH chained before this one
4551                        // Null-bound it). Skip adjacency entirely
4552                        // and fall through to the fallback Null
4553                        // row so downstream clauses see the
4554                        // preserved input.
4555                        Some(Value::Null) => {
4556                            self.pending = Vec::new();
4557                            self.pending_idx = 0;
4558                            self.yielded_for_current = false;
4559                            self.current_row = Some(row);
4560                            continue;
4561                        }
4562                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4563                    };
4564                    self.pending = match self.direction {
4565                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
4566                        Direction::Incoming => ctx.store.incoming(src_id)?,
4567                        Direction::Both => {
4568                            // For undirected traversal, a self-loop
4569                            // appears once as outgoing and once as
4570                            // incoming. The Cypher spec visits each
4571                            // physical edge once, so dedupe by
4572                            // edge_id before emitting.
4573                            let mut all = ctx.store.outgoing(src_id)?;
4574                            let mut seen: std::collections::HashSet<EdgeId> =
4575                                all.iter().map(|(e, _)| *e).collect();
4576                            for (e, n) in ctx.store.incoming(src_id)? {
4577                                if seen.insert(e) {
4578                                    all.push((e, n));
4579                                }
4580                            }
4581                            all
4582                        }
4583                    };
4584                    self.pending_idx = 0;
4585                    self.yielded_for_current = false;
4586                    self.current_row = Some(row);
4587                }
4588            }
4589        }
4590    }
4591}
4592
4593struct VarLengthExpandOp {
4594    input: Box<dyn Operator>,
4595    src_var: String,
4596    edge_var: Option<String>,
4597    dst_var: String,
4598    dst_labels: Vec<String>,
4599    edge_types: Vec<String>,
4600    /// Per-edge property filter — every edge along the walked
4601    /// path must have these `(key, value)` pairs. Mirrors the
4602    /// inline filter on `EdgeExpandOp`; applied during DFS so
4603    /// failing edges prune the branch instead of generating
4604    /// wrong-length results.
4605    edge_properties: Vec<(String, Expr)>,
4606    direction: Direction,
4607    min_hops: u64,
4608    max_hops: u64,
4609    path_var: Option<String>,
4610    /// Per-row left-join mode: when `true`, an input row that
4611    /// produces no matching paths still emits one row with the
4612    /// expansion's output vars bound to Null. Set for
4613    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
4614    /// even when the path search is empty.
4615    optional: bool,
4616    /// When set, paths whose terminal node id differs from the
4617    /// node bound at this variable in the current row are
4618    /// filtered out before counting as a match. Combined with
4619    /// `optional`, an input row whose candidate paths all miss
4620    /// the bound target triggers the null-fallback instead of
4621    /// silently dropping.
4622    dst_constraint_var: Option<String>,
4623    /// Replay mode: read the walked edge sequence from the list
4624    /// already bound at this row variable instead of doing DFS.
4625    /// Used by openCypher's "walk a pre-bound edge list" form.
4626    bound_edge_list_var: Option<String>,
4627    /// Row variables whose bound edges (or edge lists) must not
4628    /// appear in the walked path — enforces openCypher's
4629    /// relationship-uniqueness rule across hops within the same
4630    /// MATCH pattern. Each entry resolves against the current row
4631    /// (falling through to outer-scope rows) at run time; the
4632    /// union of every referenced edge id becomes the DFS
4633    /// exclusion set.
4634    excluded_edge_vars: Vec<String>,
4635    current_row: Option<Row>,
4636    pending_paths: Vec<Vec<Edge>>,
4637    pending_node_paths: Vec<Vec<NodeId>>,
4638    pending_targets: Vec<NodeId>,
4639    pending_idx: usize,
4640}
4641
4642impl VarLengthExpandOp {
4643    #[allow(clippy::too_many_arguments)]
4644    fn new(
4645        input: Box<dyn Operator>,
4646        src_var: String,
4647        edge_var: Option<String>,
4648        dst_var: String,
4649        dst_labels: Vec<String>,
4650        edge_types: Vec<String>,
4651        edge_properties: Vec<(String, Expr)>,
4652        direction: Direction,
4653        min_hops: u64,
4654        max_hops: u64,
4655        path_var: Option<String>,
4656        optional: bool,
4657        dst_constraint_var: Option<String>,
4658        bound_edge_list_var: Option<String>,
4659        excluded_edge_vars: Vec<String>,
4660    ) -> Self {
4661        Self {
4662            input,
4663            src_var,
4664            edge_var,
4665            dst_var,
4666            dst_labels,
4667            edge_types,
4668            edge_properties,
4669            direction,
4670            min_hops,
4671            max_hops,
4672            path_var,
4673            optional,
4674            dst_constraint_var,
4675            bound_edge_list_var,
4676            excluded_edge_vars,
4677            current_row: None,
4678            pending_paths: Vec::new(),
4679            pending_node_paths: Vec::new(),
4680            pending_targets: Vec::new(),
4681            pending_idx: 0,
4682        }
4683    }
4684
4685    fn enumerate(
4686        &self,
4687        ctx: &ExecCtx,
4688        start: NodeId,
4689        input_row: &Row,
4690    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4691        let mut paths: Vec<Vec<Edge>> = Vec::new();
4692        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4693        let mut targets: Vec<NodeId> = Vec::new();
4694        let mut edge_buf: Vec<Edge> = Vec::new();
4695        let mut node_buf: Vec<NodeId> = vec![start];
4696        // Seed `used` with edges from outer-scope bindings the
4697        // pattern flags as exclusions (e.g. the other hops'
4698        // edge / edge-list vars). A fresh walk can't revisit an
4699        // edge that's already bound as a relationship variable
4700        // elsewhere in the MATCH — that's openCypher's
4701        // relationship-uniqueness rule.
4702        let mut used: HashSet<EdgeId> = HashSet::new();
4703        for var in &self.excluded_edge_vars {
4704            match ctx.lookup_binding(input_row, var) {
4705                Some(Value::Edge(e)) => {
4706                    used.insert(e.id);
4707                }
4708                Some(Value::List(items)) => {
4709                    for item in items {
4710                        if let Value::Edge(e) = item {
4711                            used.insert(e.id);
4712                        }
4713                    }
4714                }
4715                _ => {}
4716            }
4717        }
4718        // Evaluate edge-property expected values once per input row
4719        // — they may reference row bindings or `$`-parameters, but
4720        // don't vary per walked edge.
4721        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4722            Vec::new()
4723        } else {
4724            let ectx = ctx.eval_ctx(input_row);
4725            self.edge_properties
4726                .iter()
4727                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4728                .collect::<Result<Vec<_>>>()?
4729        };
4730        self.dfs(
4731            ctx,
4732            start,
4733            &expected_edge_props,
4734            &mut edge_buf,
4735            &mut node_buf,
4736            &mut used,
4737            &mut paths,
4738            &mut node_paths,
4739            &mut targets,
4740        )?;
4741        Ok((paths, node_paths, targets))
4742    }
4743
4744    #[allow(clippy::too_many_arguments)]
4745    fn dfs(
4746        &self,
4747        ctx: &ExecCtx,
4748        current_node: NodeId,
4749        expected_edge_props: &[(String, Value)],
4750        edge_buf: &mut Vec<Edge>,
4751        node_buf: &mut Vec<NodeId>,
4752        used: &mut HashSet<EdgeId>,
4753        out_paths: &mut Vec<Vec<Edge>>,
4754        out_node_paths: &mut Vec<Vec<NodeId>>,
4755        out_targets: &mut Vec<NodeId>,
4756    ) -> Result<()> {
4757        let depth = edge_buf.len() as u64;
4758
4759        if depth >= self.min_hops && depth <= self.max_hops {
4760            let terminal_ok = match ctx.store.get_node(current_node)? {
4761                Some(node) => has_all_labels(&node, &self.dst_labels),
4762                None => false,
4763            };
4764            if terminal_ok {
4765                out_paths.push(edge_buf.clone());
4766                out_node_paths.push(node_buf.clone());
4767                out_targets.push(current_node);
4768            }
4769        }
4770
4771        if depth >= self.max_hops {
4772            return Ok(());
4773        }
4774
4775        let neighbors = match self.direction {
4776            Direction::Outgoing => ctx.store.outgoing(current_node)?,
4777            Direction::Incoming => ctx.store.incoming(current_node)?,
4778            Direction::Both => {
4779                let mut all = ctx.store.outgoing(current_node)?;
4780                all.extend(ctx.store.incoming(current_node)?);
4781                all
4782            }
4783        };
4784
4785        for (eid, neighbor_id) in neighbors {
4786            if used.contains(&eid) {
4787                continue;
4788            }
4789            let edge = match ctx.store.get_edge(eid)? {
4790                Some(e) => e,
4791                None => continue,
4792            };
4793            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4794            {
4795                continue;
4796            }
4797            // Inline edge property filter: skip edges whose
4798            // properties don't equal the expected values computed
4799            // per input row. A missing property fails the check,
4800            // matching `EdgeExpandOp`.
4801            if !expected_edge_props.is_empty() {
4802                let mut ok = true;
4803                for (key, expected) in expected_edge_props {
4804                    let actual = match edge.properties.get(key) {
4805                        Some(v) => Value::Property(v.clone()),
4806                        None => {
4807                            ok = false;
4808                            break;
4809                        }
4810                    };
4811                    if !values_equal(&actual, expected) {
4812                        ok = false;
4813                        break;
4814                    }
4815                }
4816                if !ok {
4817                    continue;
4818                }
4819            }
4820            used.insert(eid);
4821            edge_buf.push(edge);
4822            node_buf.push(neighbor_id);
4823            self.dfs(
4824                ctx,
4825                neighbor_id,
4826                expected_edge_props,
4827                edge_buf,
4828                node_buf,
4829                used,
4830                out_paths,
4831                out_node_paths,
4832                out_targets,
4833            )?;
4834            edge_buf.pop();
4835            node_buf.pop();
4836            used.remove(&eid);
4837        }
4838
4839        Ok(())
4840    }
4841}
4842
4843/// Replay the edge sequence stored at `list_var` in `row` as a
4844/// var-length walk starting from `src_id`. Returns `(paths,
4845/// node_paths, targets)` in the same shape `enumerate` produces
4846/// — either one entry when the list reads as a valid connected
4847/// walk in `direction`, or empty when it doesn't.
4848///
4849/// Validation per step:
4850/// * the list element is a `Value::Edge`,
4851/// * the edge's optional type filter matches `edge_types`,
4852/// * the edge actually touches the current node and proceeds to
4853///   the other endpoint in the requested direction (undirected
4854///   accepts either).
4855///
4856/// A null / missing / non-list value, a null source, or any
4857/// failed step all produce an empty result — which, when the
4858/// caller is in optional mode, triggers the left-join null
4859/// fallback.
4860fn replay_edge_list(
4861    ctx: &ExecCtx,
4862    row: &Row,
4863    list_var: &str,
4864    src_id: Option<NodeId>,
4865    direction: Direction,
4866    edge_types: &[String],
4867) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4868    let start = match src_id {
4869        Some(id) => id,
4870        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4871    };
4872    let list = match ctx.lookup_binding(row, list_var) {
4873        Some(Value::List(items)) => items.clone(),
4874        Some(Value::Property(meshdb_core::Property::List(items))) => items
4875            .iter()
4876            .cloned()
4877            .map(Value::Property)
4878            .collect::<Vec<_>>(),
4879        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4880    };
4881    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4882    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4883    node_buf.push(start);
4884    let mut current = start;
4885    for item in list {
4886        let edge = match item {
4887            Value::Edge(e) => e,
4888            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4889        };
4890        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4891            return Ok((Vec::new(), Vec::new(), Vec::new()));
4892        }
4893        let next_node = match direction {
4894            Direction::Outgoing => {
4895                if edge.source != current {
4896                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4897                }
4898                edge.target
4899            }
4900            Direction::Incoming => {
4901                if edge.target != current {
4902                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4903                }
4904                edge.source
4905            }
4906            Direction::Both => {
4907                if edge.source == current {
4908                    edge.target
4909                } else if edge.target == current {
4910                    edge.source
4911                } else {
4912                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4913                }
4914            }
4915        };
4916        // The stored edge shape should still exist in the graph;
4917        // a missing node mid-walk means the snapshot shifted and
4918        // the list is no longer valid.
4919        if ctx.store.get_node(next_node)?.is_none() {
4920            return Ok((Vec::new(), Vec::new(), Vec::new()));
4921        }
4922        edge_buf.push(edge);
4923        node_buf.push(next_node);
4924        current = next_node;
4925    }
4926    Ok((vec![edge_buf], vec![node_buf], vec![current]))
4927}
4928
4929impl Operator for VarLengthExpandOp {
4930    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4931        loop {
4932            while self.pending_idx < self.pending_paths.len() {
4933                let i = self.pending_idx;
4934                self.pending_idx += 1;
4935
4936                let target_id = self.pending_targets[i];
4937                let target = match ctx.store.get_node(target_id)? {
4938                    Some(n) => n,
4939                    None => continue,
4940                };
4941
4942                let base = self
4943                    .current_row
4944                    .as_ref()
4945                    .expect("pending without source row");
4946                let mut out = base.clone();
4947                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4948                if let Some(ev) = &self.edge_var {
4949                    let edges: Vec<Value> = self.pending_paths[i]
4950                        .iter()
4951                        .cloned()
4952                        .map(Value::Edge)
4953                        .collect();
4954                    out.insert(ev.clone(), Value::List(edges));
4955                }
4956                if let Some(pv) = &self.path_var {
4957                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4958                    for nid in &self.pending_node_paths[i] {
4959                        match ctx.store.get_node(*nid)? {
4960                            Some(n) => nodes.push(n),
4961                            None => continue,
4962                        }
4963                    }
4964                    let edges = self.pending_paths[i].clone();
4965                    out.insert(pv.clone(), Value::Path { nodes, edges });
4966                }
4967                return Ok(Some(out));
4968            }
4969
4970            match self.input.next(ctx)? {
4971                None => return Ok(None),
4972                Some(row) => {
4973                    let src_id = match row.get(&self.src_var) {
4974                        Some(Value::Node(n)) => Some(n.id),
4975                        // Null source → no paths. Same
4976                        // null-propagating semantics as
4977                        // `EdgeExpandOp`. In optional mode we
4978                        // still emit the left-join fallback;
4979                        // otherwise we just skip.
4980                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4981                            None
4982                        }
4983                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4984                    };
4985                    // Replay path: when the hop names an already-
4986                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
4987                    // `rs = [r1, r2]` from an earlier WITH), skip
4988                    // DFS entirely and verify the listed edges
4989                    // form a connected walk from `src_id` in the
4990                    // required direction. Produces at most one
4991                    // path — the exact list that was supplied.
4992                    let (mut paths, mut node_paths, mut targets) =
4993                        if let Some(list_var) = &self.bound_edge_list_var {
4994                            replay_edge_list(
4995                                ctx,
4996                                &row,
4997                                list_var,
4998                                src_id,
4999                                self.direction,
5000                                &self.edge_types,
5001                            )?
5002                        } else {
5003                            match src_id {
5004                                Some(id) => self.enumerate(ctx, id, &row)?,
5005                                None => (Vec::new(), Vec::new(), Vec::new()),
5006                            }
5007                        };
5008                    // Bound-endpoint constraint: drop paths whose
5009                    // terminal node doesn't match the node already
5010                    // bound at the constraint var. When combined
5011                    // with `optional`, an input whose paths are
5012                    // all filtered out still triggers the
5013                    // null-fallback below.
5014                    if let Some(constraint_var) = &self.dst_constraint_var {
5015                        let target_id = match row.get(constraint_var) {
5016                            Some(Value::Node(n)) => Some(n.id),
5017                            _ => None,
5018                        };
5019                        match target_id {
5020                            Some(id) => {
5021                                let mut kept_paths = Vec::new();
5022                                let mut kept_node_paths = Vec::new();
5023                                let mut kept_targets = Vec::new();
5024                                for ((p, np), t) in paths
5025                                    .drain(..)
5026                                    .zip(node_paths.drain(..))
5027                                    .zip(targets.drain(..))
5028                                {
5029                                    if t == id {
5030                                        kept_paths.push(p);
5031                                        kept_node_paths.push(np);
5032                                        kept_targets.push(t);
5033                                    }
5034                                }
5035                                paths = kept_paths;
5036                                node_paths = kept_node_paths;
5037                                targets = kept_targets;
5038                            }
5039                            None => {
5040                                paths.clear();
5041                                node_paths.clear();
5042                                targets.clear();
5043                            }
5044                        }
5045                    }
5046                    if paths.is_empty() && self.optional {
5047                        // Left-join fallback: emit one row with
5048                        // the expansion's output vars set to Null
5049                        // so the outer OPTIONAL MATCH preserves
5050                        // this input row.
5051                        let mut out = row;
5052                        if let Some(ev) = &self.edge_var {
5053                            out.insert(ev.clone(), Value::Null);
5054                        }
5055                        out.insert(self.dst_var.clone(), Value::Null);
5056                        if let Some(pv) = &self.path_var {
5057                            out.insert(pv.clone(), Value::Null);
5058                        }
5059                        return Ok(Some(out));
5060                    }
5061                    self.pending_paths = paths;
5062                    self.pending_node_paths = node_paths;
5063                    self.pending_targets = targets;
5064                    self.pending_idx = 0;
5065                    self.current_row = Some(row);
5066                }
5067            }
5068        }
5069    }
5070}
5071
5072struct FilterOp {
5073    input: Box<dyn Operator>,
5074    predicate: Expr,
5075}
5076
5077impl FilterOp {
5078    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
5079        Self { input, predicate }
5080    }
5081}
5082
5083impl Operator for FilterOp {
5084    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5085        while let Some(row) = self.input.next(ctx)? {
5086            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
5087                Ok(v) => v,
5088                // Type mismatches and non-boolean errors in filter predicates
5089                // are treated as false (row filtered out), not hard errors
5090                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
5091                Err(e) => return Err(e),
5092            };
5093            if to_bool(&v).unwrap_or(false) {
5094                return Ok(Some(row));
5095            }
5096        }
5097        Ok(None)
5098    }
5099}
5100
5101/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
5102/// row from the input unchanged.
5103struct IdentityOp {
5104    input: Box<dyn Operator>,
5105}
5106
5107impl IdentityOp {
5108    fn new(input: Box<dyn Operator>) -> Self {
5109        Self { input }
5110    }
5111}
5112
5113impl Operator for IdentityOp {
5114    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5115        self.input.next(ctx)
5116    }
5117}
5118
5119/// Passes input rows through; if the input produces zero rows,
5120/// emits exactly one row with `null_vars` bound to Value::Null.
5121/// Implements standalone OPTIONAL MATCH semantics (e.g.
5122/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
5123/// row with n=null rather than the empty result set).
5124struct CoalesceNullRowOp {
5125    input: Box<dyn Operator>,
5126    null_vars: Vec<String>,
5127    produced_any: bool,
5128    done: bool,
5129}
5130
5131impl CoalesceNullRowOp {
5132    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
5133        Self {
5134            input,
5135            null_vars,
5136            produced_any: false,
5137            done: false,
5138        }
5139    }
5140}
5141
5142impl Operator for CoalesceNullRowOp {
5143    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5144        if self.done {
5145            return Ok(None);
5146        }
5147        match self.input.next(ctx)? {
5148            Some(row) => {
5149                self.produced_any = true;
5150                Ok(Some(row))
5151            }
5152            None => {
5153                self.done = true;
5154                if self.produced_any {
5155                    Ok(None)
5156                } else {
5157                    let mut row = Row::new();
5158                    for v in &self.null_vars {
5159                        row.insert(v.clone(), Value::Null);
5160                    }
5161                    Ok(Some(row))
5162                }
5163            }
5164        }
5165    }
5166}
5167
5168struct ProjectOp {
5169    input: Box<dyn Operator>,
5170    items: Vec<ReturnItem>,
5171}
5172
5173impl ProjectOp {
5174    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
5175        Self { input, items }
5176    }
5177}
5178
5179impl Operator for ProjectOp {
5180    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5181        match self.input.next(ctx)? {
5182            Some(row) => {
5183                let mut out = Row::new();
5184                for (i, item) in self.items.iter().enumerate() {
5185                    let name = item.alias.clone().unwrap_or_else(|| {
5186                        item.raw_text
5187                            .clone()
5188                            .unwrap_or_else(|| default_name(&item.expr, i))
5189                    });
5190                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
5191                    out.insert(name, value);
5192                }
5193                Ok(Some(out))
5194            }
5195            None => Ok(None),
5196        }
5197    }
5198}
5199
5200fn default_name(expr: &Expr, idx: usize) -> String {
5201    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
5202}
5203
5204fn render_expr_name(expr: &Expr) -> Option<String> {
5205    Some(match expr {
5206        Expr::Identifier(s) => s.clone(),
5207        Expr::Property { var, key } => format!("{var}.{key}"),
5208        Expr::PropertyAccess { base, key } => {
5209            // Match the source syntax's parenthesisation of a
5210            // bracketed base: `(list[1]).k` round-trips, while a
5211            // plain identifier base stays bare (`a.b`).
5212            if matches!(
5213                base.as_ref(),
5214                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
5215            ) {
5216                format!("({}).{key}", render_expr_name(base)?)
5217            } else {
5218                format!("{}.{key}", render_expr_name(base)?)
5219            }
5220        }
5221        Expr::Parameter(name) => format!("${name}"),
5222        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
5223        Expr::Literal(Literal::Integer(i)) => i.to_string(),
5224        Expr::Literal(Literal::Float(f)) => f.to_string(),
5225        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
5226        Expr::Literal(Literal::Null) => "NULL".into(),
5227        Expr::Call { name, args } => {
5228            let arg_str = match args {
5229                CallArgs::Star => "*".into(),
5230                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
5231                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
5232                        "DISTINCT "
5233                    } else {
5234                        ""
5235                    };
5236                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
5237                    if inner.len() != es.len() {
5238                        return None;
5239                    }
5240                    format!("{prefix}{}", inner.join(", "))
5241                }
5242            };
5243            format!("{name}({arg_str})")
5244        }
5245        Expr::BinaryOp { op, left, right } => {
5246            let op_str = match op {
5247                BinaryOp::Add => " + ",
5248                BinaryOp::Sub => " - ",
5249                BinaryOp::Mul => " * ",
5250                BinaryOp::Div => " / ",
5251                BinaryOp::Mod => " % ",
5252                BinaryOp::Pow => " ^ ",
5253            };
5254            format!(
5255                "{}{op_str}{}",
5256                render_expr_name(left)?,
5257                render_expr_name(right)?
5258            )
5259        }
5260        Expr::UnaryOp { op, operand } => {
5261            let op_str = match op {
5262                UnaryOp::Neg => "-",
5263            };
5264            format!("{op_str}{}", render_expr_name(operand)?)
5265        }
5266        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
5267        Expr::IsNull { negated, inner } => {
5268            if *negated {
5269                format!("{} IS NOT NULL", render_expr_name(inner)?)
5270            } else {
5271                format!("{} IS NULL", render_expr_name(inner)?)
5272            }
5273        }
5274        Expr::Compare { op, left, right } => {
5275            let op_str = match op {
5276                CompareOp::Eq => " = ",
5277                CompareOp::Ne => " <> ",
5278                CompareOp::Lt => " < ",
5279                CompareOp::Le => " <= ",
5280                CompareOp::Gt => " > ",
5281                CompareOp::Ge => " >= ",
5282                CompareOp::StartsWith => " STARTS WITH ",
5283                CompareOp::EndsWith => " ENDS WITH ",
5284                CompareOp::Contains => " CONTAINS ",
5285                CompareOp::RegexMatch => " =~ ",
5286            };
5287            format!(
5288                "{}{op_str}{}",
5289                render_expr_name(left)?,
5290                render_expr_name(right)?
5291            )
5292        }
5293        Expr::List(items) => {
5294            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
5295            if inner.len() != items.len() {
5296                return None;
5297            }
5298            format!("[{}]", inner.join(", "))
5299        }
5300        Expr::Map(entries) => {
5301            let inner: Vec<String> = entries
5302                .iter()
5303                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
5304                .collect::<Option<Vec<_>>>()?;
5305            format!("{{{}}}", inner.join(", "))
5306        }
5307        Expr::IndexAccess { base, index } => {
5308            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
5309        }
5310        Expr::InList { element, list } => {
5311            format!(
5312                "{} IN {}",
5313                render_expr_name(element)?,
5314                render_expr_name(list)?
5315            )
5316        }
5317        Expr::HasLabels { expr, labels } => {
5318            let mut s = format!("({}", render_expr_name(expr)?);
5319            for l in labels {
5320                s.push(':');
5321                s.push_str(l);
5322            }
5323            s.push(')');
5324            s
5325        }
5326        _ => return None,
5327    })
5328}
5329
5330struct DistinctOp {
5331    input: Box<dyn Operator>,
5332    seen: HashSet<String>,
5333}
5334
5335impl DistinctOp {
5336    fn new(input: Box<dyn Operator>) -> Self {
5337        Self {
5338            input,
5339            seen: HashSet::new(),
5340        }
5341    }
5342}
5343
5344impl Operator for DistinctOp {
5345    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5346        while let Some(row) = self.input.next(ctx)? {
5347            let key = row_key(&row);
5348            if self.seen.insert(key) {
5349                return Ok(Some(row));
5350            }
5351        }
5352        Ok(None)
5353    }
5354}
5355
5356/// Assemble a `Value::Path` from a row's already-bound node and
5357/// edge variables. Emitted by `plan_pattern` when the source
5358/// pattern carries `path_var`. The operator pulls `node_vars[i]`
5359/// and `edge_vars[i]` out of every row, walks them in order, and
5360/// inserts the result into the row under `path_var` before
5361/// forwarding downstream.
5362///
5363/// Rows where any referenced variable is missing or not the
5364/// expected Node/Edge shape get a `Value::Null` at `path_var` —
5365/// matching how `OPTIONAL MATCH` flows null bindings through the
5366/// downstream projection without hard-erroring. Missing bindings
5367/// shouldn't normally happen (the planner fills in synthetic
5368/// names via `ensure_path_bindings`), but the null fallback
5369/// means an unexpected upstream shape degrades gracefully
5370/// instead of crashing the whole query.
5371struct BindPathOp {
5372    input: Box<dyn Operator>,
5373    path_var: String,
5374    node_vars: Vec<String>,
5375    edge_vars: Vec<String>,
5376}
5377
5378impl BindPathOp {
5379    fn new(
5380        input: Box<dyn Operator>,
5381        path_var: String,
5382        node_vars: Vec<String>,
5383        edge_vars: Vec<String>,
5384    ) -> Self {
5385        Self {
5386            input,
5387            path_var,
5388            node_vars,
5389            edge_vars,
5390        }
5391    }
5392}
5393
5394impl Operator for BindPathOp {
5395    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5396        let Some(mut row) = self.input.next(ctx)? else {
5397            return Ok(None);
5398        };
5399        // Extract the ordered node + edge sequence from the row.
5400        // Any missing or wrong-shaped entry collapses the whole
5401        // path binding to null and falls through.
5402        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
5403        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
5404        let mut abort = false;
5405        // Interleave node/edge vars. For each hop i:
5406        //   node_vars[i] = start/intermediate node
5407        //   edge_vars[i] = edge (or sub-path for var-length)
5408        //   node_vars[i+1] = target node
5409        // For var-length hops, edge_vars[i] may contain a
5410        // Value::Path — splice its interior into the running path.
5411        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
5412            nodes.push(n.clone());
5413        } else {
5414            abort = true;
5415        }
5416        if !abort {
5417            for (i, ev) in self.edge_vars.iter().enumerate() {
5418                match row.get(ev) {
5419                    Some(Value::Edge(e)) => {
5420                        edges.push(e.clone());
5421                        match row.get(&self.node_vars[i + 1]) {
5422                            Some(Value::Node(n)) => nodes.push(n.clone()),
5423                            _ => {
5424                                abort = true;
5425                                break;
5426                            }
5427                        }
5428                    }
5429                    Some(Value::Path {
5430                        nodes: sub_nodes,
5431                        edges: sub_edges,
5432                    }) => {
5433                        // Splice the sub-path. The sub-path's first
5434                        // node is the same as nodes.last() (already
5435                        // pushed), so skip it. All sub-edges go in.
5436                        // Sub-path interior nodes go in. The sub-path's
5437                        // last node is the target for this hop.
5438                        edges.extend(sub_edges.iter().cloned());
5439                        if sub_nodes.len() > 1 {
5440                            nodes.extend(sub_nodes[1..].iter().cloned());
5441                        }
5442                    }
5443                    _ => {
5444                        abort = true;
5445                        break;
5446                    }
5447                }
5448            }
5449        }
5450        if abort {
5451            row.insert(self.path_var.clone(), Value::Null);
5452        } else {
5453            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5454        }
5455        Ok(Some(row))
5456    }
5457}
5458
5459/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
5460/// row (which must already bind both `src_var` and `dst_var`
5461/// to `Value::Node`), runs a breadth-first search from the
5462/// source node toward the target, filtering edges by
5463/// `edge_type` and walking up to `max_hops` steps. Emits one
5464/// row per successful search with `path_var` set to a
5465/// `Value::Path` carrying the traversed node/edge sequence;
5466/// rows where BFS finds no path are dropped entirely (matching
5467/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
5468///
5469/// The BFS uses classic parent-pointer reconstruction: each
5470/// visited node's `(parent_node_id, edge_id)` pair is stored
5471/// in a hashmap, and once the target is reached we walk back
5472/// to the source, reversing the accumulated edges to produce
5473/// a forward-order path. Cycle detection is a side-effect of
5474/// the visited set — the first time BFS sees a node is
5475/// necessarily via a shortest path, so later visits are
5476/// ignored.
5477struct ShortestPathOp {
5478    input: Box<dyn Operator>,
5479    src_var: String,
5480    dst_var: String,
5481    path_var: String,
5482    edge_types: Vec<String>,
5483    direction: meshdb_cypher::Direction,
5484    max_hops: u64,
5485    kind: meshdb_cypher::ShortestKind,
5486    /// Buffered `(base_row, path)` pairs from the current
5487    /// input row, used only by `AllShortest` mode where a
5488    /// single input can expand into multiple shortest paths
5489    /// of the same length. Each call to `next` drains one
5490    /// entry before pulling a fresh input row; in `Shortest`
5491    /// mode the buffer is always empty or has one element.
5492    pending: std::collections::VecDeque<(Row, Value)>,
5493}
5494
5495impl ShortestPathOp {
5496    #[allow(clippy::too_many_arguments)]
5497    fn new(
5498        input: Box<dyn Operator>,
5499        src_var: String,
5500        dst_var: String,
5501        path_var: String,
5502        edge_types: Vec<String>,
5503        direction: meshdb_cypher::Direction,
5504        max_hops: u64,
5505        kind: meshdb_cypher::ShortestKind,
5506    ) -> Self {
5507        Self {
5508            input,
5509            src_var,
5510            dst_var,
5511            path_var,
5512            edge_types,
5513            direction,
5514            max_hops,
5515            kind,
5516            pending: std::collections::VecDeque::new(),
5517        }
5518    }
5519}
5520
5521impl Operator for ShortestPathOp {
5522    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5523        loop {
5524            // Drain buffered paths from a prior input row
5525            // before pulling the next one. Only reachable in
5526            // `AllShortest` mode; `Shortest` mode's buffer
5527            // never accumulates more than one entry.
5528            if let Some((mut row, path)) = self.pending.pop_front() {
5529                row.insert(self.path_var.clone(), path);
5530                return Ok(Some(row));
5531            }
5532            let Some(row) = self.input.next(ctx)? else {
5533                return Ok(None);
5534            };
5535            let src = match row.get(&self.src_var) {
5536                Some(Value::Node(n)) => n.clone(),
5537                _ => continue,
5538            };
5539            let dst = match row.get(&self.dst_var) {
5540                Some(Value::Node(n)) => n.clone(),
5541                _ => continue,
5542            };
5543            let paths = bfs_shortest_paths(
5544                &src,
5545                &dst,
5546                &self.edge_types,
5547                self.direction,
5548                self.max_hops,
5549                self.kind,
5550                ctx.store,
5551            )?;
5552            if paths.is_empty() {
5553                // No path of length ≤ max_hops — drop the row.
5554                continue;
5555            }
5556            for path in paths {
5557                self.pending.push_back((row.clone(), path));
5558            }
5559        }
5560    }
5561}
5562
5563/// Layered breadth-first search from `src` toward `dst`,
5564/// constrained by edge type, direction, and max hop count.
5565/// Returns every path of minimum length when `kind` is
5566/// `AllShortest`, or just the first discovered shortest path
5567/// when `kind` is `Shortest`. An empty vector means no path
5568/// of length ≤ `max_hops` exists.
5569///
5570/// `src == dst` is a zero-hop special case that always
5571/// returns a singleton path regardless of `max_hops`.
5572///
5573/// The algorithm builds a parent DAG as it expands each
5574/// level: every node that was first reached at depth `d+1`
5575/// records every `(parent, edge)` pair from frontier[d] that
5576/// reaches it, not just the first one. This lets the
5577/// reconstruction walk enumerate all source-to-target paths
5578/// of the discovered shortest length. For `Shortest` mode
5579/// the reconstruction short-circuits on the first complete
5580/// path.
5581fn bfs_shortest_paths(
5582    src: &Node,
5583    dst: &Node,
5584    edge_types: &[String],
5585    direction: meshdb_cypher::Direction,
5586    max_hops: u64,
5587    kind: meshdb_cypher::ShortestKind,
5588    reader: &dyn crate::reader::GraphReader,
5589) -> Result<Vec<Value>> {
5590    use meshdb_cypher::Direction;
5591
5592    if src.id == dst.id {
5593        return Ok(vec![Value::Path {
5594            nodes: vec![src.clone()],
5595            edges: vec![],
5596        }]);
5597    }
5598
5599    // `dist` tracks the shortest distance from `src` to each
5600    // reached node. `parents` maps each reached node id to
5601    // every `(parent_id, edge_id)` pair that reached it at
5602    // that shortest distance — a node may have multiple
5603    // parents in the parent DAG for `AllShortest`.
5604    let mut dist: HashMap<NodeId, u64> = HashMap::new();
5605    dist.insert(src.id, 0);
5606    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5607
5608    let mut frontier: Vec<NodeId> = vec![src.id];
5609    let mut depth: u64 = 0;
5610    let mut found = false;
5611
5612    while !frontier.is_empty() && depth < max_hops && !found {
5613        let mut next_frontier: Vec<NodeId> = Vec::new();
5614        for node_id in &frontier {
5615            let neighbors = match direction {
5616                Direction::Outgoing => reader.outgoing(*node_id)?,
5617                Direction::Incoming => reader.incoming(*node_id)?,
5618                Direction::Both => {
5619                    let mut out = reader.outgoing(*node_id)?;
5620                    out.extend(reader.incoming(*node_id)?);
5621                    out
5622                }
5623            };
5624            for (edge_id, neighbor_id) in neighbors {
5625                // Edge-type filter. Only fetch the edge record
5626                // when a type constraint is present.
5627                if !edge_types.is_empty() {
5628                    let edge = match reader.get_edge(edge_id)? {
5629                        Some(e) => e,
5630                        None => continue,
5631                    };
5632                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
5633                        continue;
5634                    }
5635                }
5636                match dist.get(&neighbor_id) {
5637                    Some(&d) if d == depth + 1 => {
5638                        // Alternate parent at the same
5639                        // shortest-path level — record the
5640                        // additional edge so the reconstruction
5641                        // can enumerate it, but don't re-add to
5642                        // the next frontier.
5643                        parents
5644                            .entry(neighbor_id)
5645                            .or_default()
5646                            .push((*node_id, edge_id));
5647                    }
5648                    Some(_) => {
5649                        // Already reached at a strictly shorter
5650                        // depth; this edge isn't on a shortest
5651                        // path.
5652                    }
5653                    None => {
5654                        dist.insert(neighbor_id, depth + 1);
5655                        parents
5656                            .entry(neighbor_id)
5657                            .or_default()
5658                            .push((*node_id, edge_id));
5659                        if neighbor_id == dst.id {
5660                            found = true;
5661                        } else {
5662                            next_frontier.push(neighbor_id);
5663                        }
5664                    }
5665                }
5666            }
5667        }
5668        depth += 1;
5669        if !found {
5670            frontier = next_frontier;
5671        }
5672    }
5673
5674    if !found {
5675        return Ok(Vec::new());
5676    }
5677
5678    // Enumerate all shortest paths from the parent DAG.
5679    // `Shortest` mode short-circuits after the first complete
5680    // walk; `AllShortest` collects every distinct path.
5681    let mut out: Vec<Value> = Vec::new();
5682    let mut nodes_rev: Vec<Node> = Vec::new();
5683    let mut edges_rev: Vec<Edge> = Vec::new();
5684    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5685    collect_shortest_paths(
5686        src,
5687        dst,
5688        &parents,
5689        reader,
5690        &mut nodes_rev,
5691        &mut edges_rev,
5692        &mut out,
5693        only_first,
5694    )?;
5695    Ok(out)
5696}
5697
5698/// Depth-first walk through the parent DAG built by
5699/// `bfs_shortest_paths`, collecting every source-to-target
5700/// path of the BFS-determined shortest length. The recursion
5701/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
5702/// representing the partially-reconstructed path in reverse
5703/// order; on reaching `src` we copy the reversed accumulators
5704/// into a forward-order `Value::Path` and push it into `out`.
5705///
5706/// `only_first = true` short-circuits after the first
5707/// complete path, giving `Shortest` mode the optimal-case
5708/// early exit.
5709#[allow(clippy::too_many_arguments)]
5710fn collect_shortest_paths(
5711    src: &Node,
5712    current: &Node,
5713    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5714    reader: &dyn crate::reader::GraphReader,
5715    nodes_rev: &mut Vec<Node>,
5716    edges_rev: &mut Vec<Edge>,
5717    out: &mut Vec<Value>,
5718    only_first: bool,
5719) -> Result<()> {
5720    if current.id == src.id {
5721        // Complete walk: `nodes_rev` holds dst, ..., (last
5722        // node before src) in reverse; prepend src and
5723        // reverse to produce the forward-order node list.
5724        // Edges are similarly reversed.
5725        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5726        nodes.push(src.clone());
5727        nodes.extend(nodes_rev.iter().rev().cloned());
5728        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5729        out.push(Value::Path { nodes, edges });
5730        return Ok(());
5731    }
5732    let Some(parent_edges) = parents.get(&current.id) else {
5733        // Unreachable in normal flow — BFS only inserts dst
5734        // into `parents` when it found at least one incoming
5735        // edge — but handled defensively.
5736        return Ok(());
5737    };
5738    for (parent_id, edge_id) in parent_edges {
5739        if only_first && !out.is_empty() {
5740            return Ok(());
5741        }
5742        let edge = reader
5743            .get_edge(*edge_id)?
5744            .expect("BFS inserted this edge id; it must still exist");
5745        let parent_node = reader
5746            .get_node(*parent_id)?
5747            .expect("BFS visited this node id; it must still exist");
5748        nodes_rev.push(current.clone());
5749        edges_rev.push(edge);
5750        collect_shortest_paths(
5751            src,
5752            &parent_node,
5753            parents,
5754            reader,
5755            nodes_rev,
5756            edges_rev,
5757            out,
5758            only_first,
5759        )?;
5760        nodes_rev.pop();
5761        edges_rev.pop();
5762    }
5763    Ok(())
5764}
5765
5766/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
5767/// its rows through. For plain `UNION` (`all = false`) we
5768/// deduplicate across the combined stream using the same
5769/// `row_key` the `DistinctOp` uses, so the semantics match
5770/// Neo4j's "set union" shape regardless of whether duplicates
5771/// sit inside a single branch or straddle branches. For
5772/// `UNION ALL` the `seen` set stays `None` and every produced
5773/// row is forwarded as-is.
5774struct UnionOp {
5775    branches: Vec<Box<dyn Operator>>,
5776    current: usize,
5777    seen: Option<HashSet<String>>,
5778}
5779
5780impl UnionOp {
5781    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5782        Self {
5783            branches,
5784            current: 0,
5785            seen: if all { None } else { Some(HashSet::new()) },
5786        }
5787    }
5788}
5789
5790impl Operator for UnionOp {
5791    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5792        while self.current < self.branches.len() {
5793            match self.branches[self.current].next(ctx)? {
5794                Some(row) => {
5795                    if let Some(seen) = self.seen.as_mut() {
5796                        let key = row_key(&row);
5797                        if !seen.insert(key) {
5798                            continue;
5799                        }
5800                    }
5801                    return Ok(Some(row));
5802                }
5803                None => {
5804                    self.current += 1;
5805                }
5806            }
5807        }
5808        Ok(None)
5809    }
5810}
5811
5812struct OrderByOp {
5813    input: Box<dyn Operator>,
5814    sort_items: Vec<SortItem>,
5815    sorted: Option<Vec<Row>>,
5816    cursor: usize,
5817}
5818
5819impl OrderByOp {
5820    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5821        Self {
5822            input,
5823            sort_items,
5824            sorted: None,
5825            cursor: 0,
5826        }
5827    }
5828}
5829
5830impl Operator for OrderByOp {
5831    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5832        if self.sorted.is_none() {
5833            let mut rows: Vec<Row> = Vec::new();
5834            while let Some(row) = self.input.next(ctx)? {
5835                rows.push(row);
5836            }
5837            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5838            for row in rows {
5839                let mut keys = Vec::with_capacity(self.sort_items.len());
5840                for item in &self.sort_items {
5841                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5842                }
5843                keyed.push((keys, row));
5844            }
5845            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5846            keyed.sort_by(|a, b| {
5847                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5848                    let ord = compare_values(va, vb);
5849                    let ord = if descs[i] { ord.reverse() } else { ord };
5850                    if ord != Ordering::Equal {
5851                        return ord;
5852                    }
5853                }
5854                Ordering::Equal
5855            });
5856            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5857        }
5858        let rows = self.sorted.as_ref().unwrap();
5859        if self.cursor < rows.len() {
5860            let row = rows[self.cursor].clone();
5861            self.cursor += 1;
5862            Ok(Some(row))
5863        } else {
5864            Ok(None)
5865        }
5866    }
5867}
5868
5869struct AggregateOp {
5870    input: Box<dyn Operator>,
5871    group_keys: Vec<ReturnItem>,
5872    aggregates: Vec<AggregateSpec>,
5873    results: Option<Vec<Row>>,
5874    cursor: usize,
5875}
5876
5877impl AggregateOp {
5878    fn new(
5879        input: Box<dyn Operator>,
5880        group_keys: Vec<ReturnItem>,
5881        aggregates: Vec<AggregateSpec>,
5882    ) -> Self {
5883        Self {
5884            input,
5885            group_keys,
5886            aggregates,
5887            results: None,
5888            cursor: 0,
5889        }
5890    }
5891
5892    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5893        let mut groups: HashMap<String, GroupState> = HashMap::new();
5894        let mut order: Vec<String> = Vec::new();
5895
5896        // If there are no input rows AND no group keys, we still emit one row
5897        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
5898        let mut saw_any = false;
5899
5900        while let Some(row) = self.input.next(ctx)? {
5901            saw_any = true;
5902            let mut key_values = Vec::with_capacity(self.group_keys.len());
5903            for item in &self.group_keys {
5904                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5905            }
5906            let mut hash_key = String::new();
5907            for v in &key_values {
5908                hash_key.push_str(&value_key(v));
5909                hash_key.push('|');
5910            }
5911            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5912                order.push(hash_key.clone());
5913                GroupState {
5914                    key_values: key_values.clone(),
5915                    agg_states: self
5916                        .aggregates
5917                        .iter()
5918                        .map(|a| AggState::initial(a.function))
5919                        .collect(),
5920                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5921                }
5922            });
5923            for (i, spec) in self.aggregates.iter().enumerate() {
5924                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5925                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5926                    if matches!(v, Value::Null) {
5927                        continue;
5928                    }
5929                    let key = value_key(&v);
5930                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5931                    if !seen.insert(key) {
5932                        continue;
5933                    }
5934                }
5935                // Resolve the extra-arg constant *before* update so
5936                // `apoc.agg.nth` sees its target index on row 0;
5937                // percentile aggregators just collect into a Vec and
5938                // don't read the extra arg during update, so the
5939                // ordering is safe for them too.
5940                if let Some(extra_expr) = &spec.extra_arg {
5941                    let need_resolve_percentile = matches!(
5942                        &entry.agg_states[i],
5943                        AggState::PercentileDisc {
5944                            percentile: None,
5945                            ..
5946                        } | AggState::PercentileCont {
5947                            percentile: None,
5948                            ..
5949                        }
5950                    );
5951                    let need_resolve_nth =
5952                        matches!(&entry.agg_states[i], AggState::ApocNth { target: None, .. });
5953                    if need_resolve_percentile {
5954                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5955                        let p = match pv {
5956                            Value::Property(Property::Float64(f)) => f,
5957                            Value::Property(Property::Int64(i)) => i as f64,
5958                            _ => 0.0,
5959                        };
5960                        // openCypher requires the percentile to
5961                        // lie in [0.0, 1.0]; anything else is an
5962                        // `ArgumentError: NumberOutOfRange`.
5963                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
5964                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
5965                        }
5966                        match &mut entry.agg_states[i] {
5967                            AggState::PercentileDisc { percentile, .. }
5968                            | AggState::PercentileCont { percentile, .. } => {
5969                                *percentile = Some(p);
5970                            }
5971                            _ => {}
5972                        }
5973                    }
5974                    if need_resolve_nth {
5975                        let nv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5976                        let n = match nv {
5977                            Value::Property(Property::Int64(i)) => i,
5978                            _ => {
5979                                return Err(Error::Procedure(
5980                                    "apoc.agg.nth expects an integer index".into(),
5981                                ))
5982                            }
5983                        };
5984                        if n < 0 {
5985                            return Err(Error::Procedure(format!(
5986                                "apoc.agg.nth index out of range: {n}"
5987                            )));
5988                        }
5989                        if let AggState::ApocNth { target, .. } = &mut entry.agg_states[i] {
5990                            *target = Some(n);
5991                        }
5992                    }
5993                }
5994                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5995            }
5996        }
5997
5998        let mut out = Vec::new();
5999        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
6000            // Empty group, single aggregate row
6001            let mut row = Row::new();
6002            for spec in &self.aggregates {
6003                row.insert(
6004                    spec.alias.clone(),
6005                    AggState::initial(spec.function).finalize(),
6006                );
6007            }
6008            out.push(row);
6009        } else {
6010            for key in order {
6011                let state = groups.remove(&key).unwrap();
6012                let mut row = Row::new();
6013                for (i, item) in self.group_keys.iter().enumerate() {
6014                    let name = item
6015                        .alias
6016                        .clone()
6017                        .unwrap_or_else(|| default_name(&item.expr, i));
6018                    row.insert(name, state.key_values[i].clone());
6019                }
6020                for (i, spec) in self.aggregates.iter().enumerate() {
6021                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
6022                }
6023                out.push(row);
6024            }
6025        }
6026        self.results = Some(out);
6027        Ok(())
6028    }
6029}
6030
6031impl Operator for AggregateOp {
6032    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6033        if self.results.is_none() {
6034            self.compute(ctx)?;
6035        }
6036        let rows = self.results.as_ref().unwrap();
6037        if self.cursor < rows.len() {
6038            let row = rows[self.cursor].clone();
6039            self.cursor += 1;
6040            Ok(Some(row))
6041        } else {
6042            Ok(None)
6043        }
6044    }
6045}
6046
6047struct GroupState {
6048    key_values: Vec<Value>,
6049    agg_states: Vec<AggState>,
6050    distinct_seen: Vec<Option<HashSet<String>>>,
6051}
6052
6053enum AggState {
6054    Count(i64),
6055    Sum {
6056        int_part: i64,
6057        float_part: f64,
6058        is_float: bool,
6059    },
6060    Avg {
6061        total: f64,
6062        count: i64,
6063    },
6064    Min(Option<Value>),
6065    Max(Option<Value>),
6066    Collect(Vec<Value>),
6067    StDev {
6068        sum: f64,
6069        sum_sq: f64,
6070        count: i64,
6071    },
6072    StDevP {
6073        sum: f64,
6074        sum_sq: f64,
6075        count: i64,
6076    },
6077    PercentileDisc {
6078        items: Vec<Value>,
6079        percentile: Option<f64>,
6080    },
6081    PercentileCont {
6082        items: Vec<Value>,
6083        percentile: Option<f64>,
6084    },
6085    /// `apoc.agg.first` — latches the first non-null value seen.
6086    ApocFirst(Option<Value>),
6087    /// `apoc.agg.last` — overwrites the slot on every non-null value.
6088    ApocLast(Option<Value>),
6089    /// `apoc.agg.nth(expr, n)` — `target` is the constant `n`, resolved
6090    /// once from the spec's `extra_arg` on the first row (same shape
6091    /// as percentiles). `count` tracks non-null inputs; `slot` is set
6092    /// exactly when `count` reaches `target`.
6093    ApocNth {
6094        target: Option<i64>,
6095        count: i64,
6096        slot: Option<Value>,
6097    },
6098    /// `apoc.agg.median` — collects numeric values, sort + pick middle
6099    /// (or average of the two middles) at finalize.
6100    ApocMedian(Vec<f64>),
6101    /// `apoc.agg.product` — running product. Mirrors the Sum
6102    /// int-vs-float split so integer-only streams stay in Int64.
6103    ApocProduct {
6104        int_part: i64,
6105        float_part: f64,
6106        is_float: bool,
6107        seen: bool,
6108    },
6109}
6110
6111impl AggState {
6112    fn initial(func: AggregateFn) -> Self {
6113        match func {
6114            AggregateFn::Count => AggState::Count(0),
6115            AggregateFn::Sum => AggState::Sum {
6116                int_part: 0,
6117                float_part: 0.0,
6118                is_float: false,
6119            },
6120            AggregateFn::Avg => AggState::Avg {
6121                total: 0.0,
6122                count: 0,
6123            },
6124            AggregateFn::Min => AggState::Min(None),
6125            AggregateFn::Max => AggState::Max(None),
6126            AggregateFn::Collect => AggState::Collect(Vec::new()),
6127            AggregateFn::StDev => AggState::StDev {
6128                sum: 0.0,
6129                sum_sq: 0.0,
6130                count: 0,
6131            },
6132            AggregateFn::StDevP => AggState::StDevP {
6133                sum: 0.0,
6134                sum_sq: 0.0,
6135                count: 0,
6136            },
6137            AggregateFn::PercentileDisc => AggState::PercentileDisc {
6138                items: Vec::new(),
6139                percentile: None,
6140            },
6141            AggregateFn::PercentileCont => AggState::PercentileCont {
6142                items: Vec::new(),
6143                percentile: None,
6144            },
6145            AggregateFn::ApocFirst => AggState::ApocFirst(None),
6146            AggregateFn::ApocLast => AggState::ApocLast(None),
6147            AggregateFn::ApocNth => AggState::ApocNth {
6148                target: None,
6149                count: 0,
6150                slot: None,
6151            },
6152            AggregateFn::ApocMedian => AggState::ApocMedian(Vec::new()),
6153            AggregateFn::ApocProduct => AggState::ApocProduct {
6154                int_part: 1,
6155                float_part: 1.0,
6156                is_float: false,
6157                seen: false,
6158            },
6159        }
6160    }
6161
6162    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
6163        match self {
6164            AggState::Count(c) => match arg {
6165                AggregateArg::Star => *c += 1,
6166                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
6167                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
6168                        *c += 1;
6169                    }
6170                }
6171            },
6172            AggState::Sum {
6173                int_part,
6174                float_part,
6175                is_float,
6176            } => {
6177                let v = expr_arg_value(arg, ctx)?;
6178                match v {
6179                    Value::Null => {}
6180                    Value::Property(Property::Int64(i)) => *int_part += i,
6181                    Value::Property(Property::Float64(f)) => {
6182                        *float_part += f;
6183                        *is_float = true;
6184                    }
6185                    _ => return Err(Error::AggregateTypeError),
6186                }
6187            }
6188            AggState::Avg { total, count } => {
6189                let v = expr_arg_value(arg, ctx)?;
6190                match v {
6191                    Value::Null => {}
6192                    Value::Property(Property::Int64(i)) => {
6193                        *total += i as f64;
6194                        *count += 1;
6195                    }
6196                    Value::Property(Property::Float64(f)) => {
6197                        *total += f;
6198                        *count += 1;
6199                    }
6200                    _ => return Err(Error::AggregateTypeError),
6201                }
6202            }
6203            AggState::Min(slot) => {
6204                // `min` / `max` ignore null inputs and operate
6205                // over any `Value` (including lists, nodes etc.),
6206                // not just scalar Properties. The ordering is
6207                // `compare_values`: within a single type the
6208                // natural order, across types the
6209                // `type_order_value` rank.
6210                let v = expr_arg_value(arg, ctx)?;
6211                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6212                    // skip
6213                } else {
6214                    match slot {
6215                        None => *slot = Some(v),
6216                        Some(cur) => {
6217                            if compare_values(&v, cur) == Ordering::Less {
6218                                *cur = v;
6219                            }
6220                        }
6221                    }
6222                }
6223            }
6224            AggState::Max(slot) => {
6225                let v = expr_arg_value(arg, ctx)?;
6226                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6227                    // skip
6228                } else {
6229                    match slot {
6230                        None => *slot = Some(v),
6231                        Some(cur) => {
6232                            if compare_values(&v, cur) == Ordering::Greater {
6233                                *cur = v;
6234                            }
6235                        }
6236                    }
6237                }
6238            }
6239            AggState::Collect(items) => {
6240                let v = expr_arg_value(arg, ctx)?;
6241                if !matches!(v, Value::Null) {
6242                    items.push(v);
6243                }
6244            }
6245            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
6246                let v = expr_arg_value(arg, ctx)?;
6247                if !matches!(v, Value::Null) {
6248                    items.push(v);
6249                }
6250            }
6251            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
6252                let v = expr_arg_value(arg, ctx)?;
6253                match v {
6254                    Value::Null => {}
6255                    Value::Property(Property::Int64(i)) => {
6256                        let f = i as f64;
6257                        *sum += f;
6258                        *sum_sq += f * f;
6259                        *count += 1;
6260                    }
6261                    Value::Property(Property::Float64(f)) => {
6262                        *sum += f;
6263                        *sum_sq += f * f;
6264                        *count += 1;
6265                    }
6266                    _ => return Err(Error::AggregateTypeError),
6267                }
6268            }
6269            AggState::ApocFirst(slot) => {
6270                if slot.is_some() {
6271                    return Ok(());
6272                }
6273                let v = expr_arg_value(arg, ctx)?;
6274                if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6275                    *slot = Some(v);
6276                }
6277            }
6278            AggState::ApocLast(slot) => {
6279                let v = expr_arg_value(arg, ctx)?;
6280                if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6281                    *slot = Some(v);
6282                }
6283            }
6284            AggState::ApocNth {
6285                target,
6286                count,
6287                slot,
6288            } => {
6289                if slot.is_some() {
6290                    return Ok(());
6291                }
6292                let v = expr_arg_value(arg, ctx)?;
6293                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6294                    return Ok(());
6295                }
6296                if let Some(t) = *target {
6297                    if *count == t {
6298                        *slot = Some(v);
6299                    }
6300                    *count += 1;
6301                }
6302            }
6303            AggState::ApocMedian(items) => {
6304                let v = expr_arg_value(arg, ctx)?;
6305                match v {
6306                    Value::Null | Value::Property(Property::Null) => {}
6307                    Value::Property(Property::Int64(i)) => items.push(i as f64),
6308                    Value::Property(Property::Float64(f)) => items.push(f),
6309                    _ => return Err(Error::AggregateTypeError),
6310                }
6311            }
6312            AggState::ApocProduct {
6313                int_part,
6314                float_part,
6315                is_float,
6316                seen,
6317            } => {
6318                let v = expr_arg_value(arg, ctx)?;
6319                match v {
6320                    Value::Null | Value::Property(Property::Null) => {}
6321                    Value::Property(Property::Int64(i)) => {
6322                        *int_part = int_part.saturating_mul(i);
6323                        *seen = true;
6324                    }
6325                    Value::Property(Property::Float64(f)) => {
6326                        *float_part *= f;
6327                        *is_float = true;
6328                        *seen = true;
6329                    }
6330                    _ => return Err(Error::AggregateTypeError),
6331                }
6332            }
6333        }
6334        Ok(())
6335    }
6336
6337    fn finalize(&self) -> Value {
6338        match self {
6339            AggState::Count(c) => Value::Property(Property::Int64(*c)),
6340            AggState::Sum {
6341                int_part,
6342                float_part,
6343                is_float,
6344            } => {
6345                if *is_float {
6346                    Value::Property(Property::Float64(*float_part + *int_part as f64))
6347                } else {
6348                    Value::Property(Property::Int64(*int_part))
6349                }
6350            }
6351            AggState::Avg { total, count } => {
6352                if *count == 0 {
6353                    Value::Null
6354                } else {
6355                    Value::Property(Property::Float64(*total / *count as f64))
6356                }
6357            }
6358            AggState::Min(slot) | AggState::Max(slot) => match slot {
6359                Some(v) => v.clone(),
6360                None => Value::Null,
6361            },
6362            AggState::Collect(items) => Value::List(items.clone()),
6363            AggState::StDevP { sum, sum_sq, count } => {
6364                if *count == 0 {
6365                    Value::Property(Property::Float64(0.0))
6366                } else {
6367                    let n = *count as f64;
6368                    let variance = *sum_sq / n - (*sum / n).powi(2);
6369                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6370                }
6371            }
6372            AggState::StDev { sum, sum_sq, count } => {
6373                if *count < 2 {
6374                    Value::Property(Property::Float64(0.0))
6375                } else {
6376                    let n = *count as f64;
6377                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
6378                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6379                }
6380            }
6381            AggState::PercentileDisc { items, percentile } => {
6382                percentile_disc(items, percentile.unwrap_or(0.0))
6383            }
6384            AggState::PercentileCont { items, percentile } => {
6385                percentile_cont(items, percentile.unwrap_or(0.0))
6386            }
6387            AggState::ApocFirst(slot) | AggState::ApocLast(slot) => match slot {
6388                Some(v) => v.clone(),
6389                None => Value::Null,
6390            },
6391            AggState::ApocNth { slot, .. } => match slot {
6392                Some(v) => v.clone(),
6393                None => Value::Null,
6394            },
6395            AggState::ApocMedian(items) => {
6396                if items.is_empty() {
6397                    return Value::Null;
6398                }
6399                let mut sorted = items.clone();
6400                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6401                let n = sorted.len();
6402                let median = if n % 2 == 1 {
6403                    sorted[n / 2]
6404                } else {
6405                    (sorted[n / 2 - 1] + sorted[n / 2]) / 2.0
6406                };
6407                Value::Property(Property::Float64(median))
6408            }
6409            AggState::ApocProduct {
6410                int_part,
6411                float_part,
6412                is_float,
6413                seen,
6414            } => {
6415                // Empty stream: null, matching the convention for
6416                // avg/min/max (and unlike sum, which folds to 0).
6417                if !*seen {
6418                    return Value::Null;
6419                }
6420                if *is_float {
6421                    Value::Property(Property::Float64(*float_part * *int_part as f64))
6422                } else {
6423                    Value::Property(Property::Int64(*int_part))
6424                }
6425            }
6426        }
6427    }
6428}
6429
6430fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
6431    match arg {
6432        AggregateArg::Star => Err(Error::AggregateTypeError),
6433        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
6434    }
6435}
6436
6437/// Coerce a collected aggregate value into an f64 for percentile
6438/// math. Unhandled types fall back to NaN; the caller sorts NaN
6439/// out of the stream before computing the percentile.
6440fn value_to_f64(v: &Value) -> f64 {
6441    match v {
6442        Value::Property(Property::Int64(i)) => *i as f64,
6443        Value::Property(Property::Float64(f)) => *f,
6444        _ => f64::NAN,
6445    }
6446}
6447
6448/// `percentileDisc(expr, p)` — discrete percentile. Returns the
6449/// smallest value at or above the `p`-ranked position. Numbers only;
6450/// non-numeric values get sorted to the end and are effectively
6451/// ignored unless the percentile lands on one.
6452fn percentile_disc(items: &[Value], p: f64) -> Value {
6453    let mut nums: Vec<(f64, Value)> = items
6454        .iter()
6455        .map(|v| (value_to_f64(v), v.clone()))
6456        .filter(|(f, _)| !f.is_nan())
6457        .collect();
6458    if nums.is_empty() {
6459        return Value::Null;
6460    }
6461    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
6462    let p = p.clamp(0.0, 1.0);
6463    let n = nums.len();
6464    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
6465    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
6466    nums[idx.min(n - 1)].1.clone()
6467}
6468
6469/// `percentileCont(expr, p)` — continuous percentile. Linearly
6470/// interpolates between the two ranks that bracket the fractional
6471/// position `p * (n - 1)`. Returns a Float64.
6472fn percentile_cont(items: &[Value], p: f64) -> Value {
6473    let mut nums: Vec<f64> = items
6474        .iter()
6475        .map(value_to_f64)
6476        .filter(|f| !f.is_nan())
6477        .collect();
6478    if nums.is_empty() {
6479        return Value::Null;
6480    }
6481    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6482    let p = p.clamp(0.0, 1.0);
6483    let n = nums.len();
6484    if n == 1 {
6485        return Value::Property(Property::Float64(nums[0]));
6486    }
6487    let pos = p * (n as f64 - 1.0);
6488    let lo = pos.floor() as usize;
6489    let hi = pos.ceil() as usize;
6490    let frac = pos - lo as f64;
6491    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
6492    Value::Property(Property::Float64(v))
6493}
6494
6495struct SkipOp {
6496    input: Box<dyn Operator>,
6497    count_expr: Expr,
6498    remaining: Option<i64>,
6499}
6500
6501impl SkipOp {
6502    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
6503        Self {
6504            input,
6505            count_expr,
6506            remaining: None,
6507        }
6508    }
6509}
6510
6511impl Operator for SkipOp {
6512    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6513        if self.remaining.is_none() {
6514            let empty = Row::new();
6515            let ectx = ctx.eval_ctx(&empty);
6516            let val = eval_expr(&self.count_expr, &ectx)?;
6517            self.remaining = Some(expr_to_count(val)?);
6518        }
6519        let rem = self.remaining.as_mut().unwrap();
6520        while *rem > 0 {
6521            if self.input.next(ctx)?.is_none() {
6522                return Ok(None);
6523            }
6524            *rem -= 1;
6525        }
6526        self.input.next(ctx)
6527    }
6528}
6529
6530struct LimitOp {
6531    input: Box<dyn Operator>,
6532    count_expr: Expr,
6533    remaining: Option<i64>,
6534    /// True when the input subtree carries a write operator
6535    /// (CREATE / SET / DELETE / etc.). After the limit is hit
6536    /// (including the `LIMIT 0` case where it's hit before the
6537    /// first pull), drain the input fully so eager side effects
6538    /// land — Cypher specifies writes happen regardless of how
6539    /// many rows downstream consumes.
6540    drain_on_complete: bool,
6541    /// Set after the post-limit drain runs so we don't re-drain
6542    /// on every subsequent `next()` call.
6543    drained: bool,
6544}
6545
6546impl LimitOp {
6547    fn new(input: Box<dyn Operator>, count_expr: Expr, drain_on_complete: bool) -> Self {
6548        Self {
6549            input,
6550            count_expr,
6551            remaining: None,
6552            drain_on_complete,
6553            drained: false,
6554        }
6555    }
6556
6557    /// Pull from input until exhausted, discarding rows. Used
6558    /// after the limit is satisfied to ensure write operators
6559    /// in the subtree run their full set of writes.
6560    fn drain_input(&mut self, ctx: &ExecCtx) -> Result<()> {
6561        if self.drained {
6562            return Ok(());
6563        }
6564        while self.input.next(ctx)?.is_some() {}
6565        self.drained = true;
6566        Ok(())
6567    }
6568}
6569
6570impl Operator for LimitOp {
6571    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6572        if self.remaining.is_none() {
6573            let empty = Row::new();
6574            let ectx = ctx.eval_ctx(&empty);
6575            let val = eval_expr(&self.count_expr, &ectx)?;
6576            self.remaining = Some(expr_to_count(val)?);
6577        }
6578        let rem = self.remaining.as_mut().unwrap();
6579        if *rem <= 0 {
6580            if self.drain_on_complete {
6581                self.drain_input(ctx)?;
6582            }
6583            return Ok(None);
6584        }
6585        match self.input.next(ctx)? {
6586            Some(row) => {
6587                *rem -= 1;
6588                Ok(Some(row))
6589            }
6590            None => Ok(None),
6591        }
6592    }
6593}
6594
6595fn expr_to_count(val: Value) -> Result<i64> {
6596    match val {
6597        Value::Null | Value::Property(Property::Null) => Ok(0),
6598        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
6599        // openCypher: SKIP/LIMIT require an integer. Float values
6600        // (including integer-valued floats) are rejected as
6601        // InvalidArgumentType — they'd only be valid after an
6602        // explicit cast, which the user must write themselves.
6603        _ => Err(Error::TypeMismatch),
6604    }
6605}
6606
6607/// Trivial source operator that yields a fixed list of rows in
6608/// order. Used by [`execute_with_in_tx_substitute`] to inject
6609/// the pre-materialised batched-execution outputs from a
6610/// `CALL { } IN TRANSACTIONS` into the regular operator pipeline
6611/// so wrapping clauses (Project, OrderBy, Limit, Aggregate, …)
6612/// can run untouched.
6613struct RowsLiteralOp {
6614    rows: Vec<Row>,
6615    cursor: usize,
6616}
6617
6618impl RowsLiteralOp {
6619    fn new(rows: Vec<Row>) -> Self {
6620        Self { rows, cursor: 0 }
6621    }
6622}
6623
6624impl Operator for RowsLiteralOp {
6625    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
6626        if self.cursor < self.rows.len() {
6627            let row = self.rows[self.cursor].clone();
6628            self.cursor += 1;
6629            Ok(Some(row))
6630        } else {
6631            Ok(None)
6632        }
6633    }
6634}