Skip to main content

meshdb_executor/
ops.rs

1use crate::{
2    error::{Error, Result},
3    eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4    procedures::ProcedureRegistry,
5    reader::GraphReader,
6    value::{ParamMap, Row, Value},
7    writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11    AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12    ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13    Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14    ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17    ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18    PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24/// Shared tombstone set for nodes / edges that have been DELETEd
25/// earlier in the current query. Property / label / type / keys /
26/// id accesses consult this set and raise `DeletedEntityAccess`
27/// so `MATCH (n) DELETE n RETURN n.num` surfaces the error at
28/// runtime instead of reading off a stale clone.
29#[derive(Default)]
30pub struct Tombstones {
31    pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32    pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36    pub store: &'a dyn GraphReader,
37    pub writer: &'a dyn GraphWriter,
38    /// Per-query parameter bindings. Empty for the single-node and Raft
39    /// entry points that don't carry a Bolt RUN; populated with the
40    /// driver-supplied params on the Bolt path. Threaded into every
41    /// `eval_expr` call so `Expr::Parameter("name")` resolves.
42    pub params: &'a ParamMap,
43    /// Procedures known to this execution. Defaults to an empty
44    /// registry on call sites that don't care; the TCK harness and
45    /// any future server startup plug in a populated registry so
46    /// `CALL ns.name(...)` resolves.
47    pub procedures: &'a ProcedureRegistry,
48    /// Outer-scope rows contributed by enclosing operators. The
49    /// innermost slot (first entry) is the nearest outer. Set by
50    /// [`CartesianProductOp`] when running its right side so that
51    /// operators inside — typically `EdgeExpandOp` /
52    /// `VarLengthExpandOp` constraint lookups — can resolve
53    /// variables the right-side scan didn't bind directly. Empty
54    /// at the top level.
55    pub outer_rows: &'a [&'a Row],
56    /// Tombstones for entities deleted earlier in the same query.
57    /// Shared by reference so DeleteOp can insert and downstream
58    /// eval can check.
59    pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64    fn put_node(&self, _: &Node) -> Result<()> {
65        Ok(())
66    }
67    fn put_edge(&self, _: &Edge) -> Result<()> {
68        Ok(())
69    }
70    fn delete_edge(&self, _: EdgeId) -> Result<()> {
71        Ok(())
72    }
73    fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74        Ok(())
75    }
76}
77
78impl<'a> ExecCtx<'a> {
79    /// Build an `EvalCtx` wrapping the given row alongside this
80    /// exec context's params and reader. Used by operators to
81    /// bridge the per-operator context into the expression
82    /// evaluator without repeating the field list at every
83    /// `eval_expr` call site.
84    pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85    where
86        'a: 'b,
87    {
88        EvalCtx {
89            row,
90            params: self.params,
91            reader: self.store,
92            procedures: self.procedures,
93            outer_rows: self.outer_rows,
94            tombstones: self.tombstones,
95        }
96    }
97
98    /// Look up a variable in `row` first, then in each outer-scope
99    /// row in order. Returns the nearest match. Used for constraint
100    /// lookups inside right-side operators of a `CartesianProduct`
101    /// so a fresh scan that references an outer binding
102    /// (`MATCH ()-[r]-() MATCH (n)-[r]-(m)`) can still resolve it.
103    pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104        if let Some(v) = row.get(name) {
105            return Some(v);
106        }
107        for outer in self.outer_rows {
108            if let Some(v) = outer.get(name) {
109                return Some(v);
110            }
111        }
112        None
113    }
114}
115
116pub trait Operator {
117    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120/// Execute a plan using the given store for both reads and writes.
121/// Equivalent to [`execute_with_reader`] with the store acting as both
122/// and an empty parameter map.
123pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124    let params = ParamMap::new();
125    execute_with_reader(
126        plan,
127        store as &dyn GraphReader,
128        store as &dyn GraphWriter,
129        &params,
130    )
131}
132
133/// Execute a plan against a local [`RocksDbStorageEngine`] for reads,
134/// sending mutations to a separate [`GraphWriter`]. Used by in-process
135/// setups where reads are always cheap-local (single node or
136/// full-replica Raft). No parameters.
137pub fn execute_with_writer(
138    plan: &LogicalPlan,
139    store: &RocksDbStorageEngine,
140    writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142    let params = ParamMap::new();
143    execute_with_reader(plan, store as &dyn GraphReader, writer, &params)
144}
145
146/// Execute a plan with an arbitrary [`GraphReader`] for reads and a
147/// parameter map for `$param` resolution. Routing mode uses a
148/// partitioned reader that fans out reads across peers; the Bolt
149/// listener supplies the driver-bound param map.
150pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151    let text = meshdb_cypher::format_plan(plan);
152    let mut row = Row::new();
153    row.insert("plan".to_string(), Value::Property(Property::String(text)));
154    vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158    let result_rows = execute(plan, store)?;
159    let row_count = result_rows.len() as i64;
160    let plan_text = meshdb_cypher::format_plan(plan);
161    let summary = format!("{plan_text}\nRows: {row_count}");
162    let mut row = Row::new();
163    row.insert(
164        "profile".to_string(),
165        Value::Property(Property::String(summary)),
166    );
167    row.insert(
168        "rows".to_string(),
169        Value::Property(Property::Int64(row_count)),
170    );
171    Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175    plan: &LogicalPlan,
176    reader: &dyn GraphReader,
177    writer: &dyn GraphWriter,
178    params: &ParamMap,
179) -> Result<Vec<Row>> {
180    let empty_procs = ProcedureRegistry::new();
181    execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184/// Like [`execute_with_reader`] but with an explicit procedure
185/// registry in scope. Used by the TCK harness to mount mock
186/// procedures declared by `there exists a procedure ...` steps,
187/// and reserved for the server startup path once built-in
188/// procedures land.
189/// Run `plan` once for a caller-supplied "outer" row that's
190/// folded into the operator pipeline as a seed. Used by
191/// `CALL { ... } IN TRANSACTIONS` to evaluate the body once per
192/// input row (with the row's bindings visible inside the body)
193/// while letting the caller batch writes across multiple
194/// invocations into a shared `BufferingGraphWriter`.
195///
196/// The `seed` row is threaded into every operator's
197/// `outer_rows` list, mirroring what `CallSubqueryOp` does
198/// internally for the non-batched form. Pass `None` to invoke
199/// without a seed (equivalent to
200/// [`execute_with_reader_and_procs`]).
201pub fn execute_with_seed(
202    plan: &LogicalPlan,
203    seed: Option<&Row>,
204    reader: &dyn GraphReader,
205    writer: &dyn GraphWriter,
206    params: &ParamMap,
207    procedures: &ProcedureRegistry,
208) -> Result<Vec<Row>> {
209    crate::eval::reset_statement_time();
210    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
211        return Ok(rows);
212    }
213    let suppress_output = is_write_only_plan(plan);
214    let mut op = build_op_inner(plan, seed, &mut None);
215    let tombstones = Tombstones::default();
216    let outer_rows: Vec<&Row> = seed.into_iter().collect();
217    let ctx = ExecCtx {
218        store: reader,
219        writer,
220        params,
221        procedures,
222        outer_rows: &outer_rows,
223        tombstones: &tombstones,
224    };
225    let mut rows = Vec::new();
226    while let Some(row) = op.next(&ctx)? {
227        rows.push(row);
228    }
229    if suppress_output {
230        Ok(Vec::new())
231    } else {
232        Ok(rows)
233    }
234}
235
236/// Run `plan` with a pre-materialised set of rows substituted
237/// for the (single) `LogicalPlan::CallSubqueryInTransactions`
238/// node in the tree. Used by
239/// `MeshService::execute_call_in_transactions` to fold its
240/// already-batched-and-committed body output back into the
241/// regular pipeline so wrapping clauses (Project / OrderBy /
242/// Limit / Aggregate / Filter) can run untouched.
243///
244/// Panics at build time if the plan doesn't contain exactly one
245/// IN TRANSACTIONS node. The dispatcher detects the wrapping
246/// shape before calling this, so reaching that panic indicates a
247/// wiring bug.
248pub fn execute_with_in_tx_substitute(
249    plan: &LogicalPlan,
250    in_tx_rows: Vec<Row>,
251    reader: &dyn GraphReader,
252    writer: &dyn GraphWriter,
253    params: &ParamMap,
254    procedures: &ProcedureRegistry,
255) -> Result<Vec<Row>> {
256    crate::eval::reset_statement_time();
257    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
258        return Ok(rows);
259    }
260    let suppress_output = is_write_only_plan(plan);
261    let mut substitute = Some(in_tx_rows);
262    let mut op = build_op_inner(plan, None, &mut substitute);
263    let tombstones = Tombstones::default();
264    let ctx = ExecCtx {
265        store: reader,
266        writer,
267        params,
268        procedures,
269        outer_rows: &[],
270        tombstones: &tombstones,
271    };
272    let mut rows = Vec::new();
273    while let Some(row) = op.next(&ctx)? {
274        rows.push(row);
275    }
276    if suppress_output {
277        Ok(Vec::new())
278    } else {
279        Ok(rows)
280    }
281}
282
283pub fn execute_with_reader_and_procs(
284    plan: &LogicalPlan,
285    reader: &dyn GraphReader,
286    writer: &dyn GraphWriter,
287    params: &ParamMap,
288    procedures: &ProcedureRegistry,
289) -> Result<Vec<Row>> {
290    // `datetime()`, `localtime()`, and friends cache "now" in a
291    // thread-local so multiple calls inside the same statement see
292    // the same instant. Clear it at the start of every execution so
293    // the next query gets a fresh reading.
294    crate::eval::reset_statement_time();
295    // Schema DDL never enters the operator pipeline — it's a
296    // side-effect on the backing store, not a row-producing query.
297    // Short-circuit here so `build_op` stays a pure plan-to-operator
298    // mapping and doesn't need DDL-specific cases.
299    if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
300        return Ok(rows);
301    }
302    let suppress_output = is_write_only_plan(plan);
303    let mut op = build_op(plan);
304    let tombstones = Tombstones::default();
305    let ctx = ExecCtx {
306        store: reader,
307        writer,
308        params,
309        procedures,
310        outer_rows: &[],
311        tombstones: &tombstones,
312    };
313    let mut rows = Vec::new();
314    while let Some(row) = op.next(&ctx)? {
315        rows.push(row);
316    }
317    if suppress_output {
318        Ok(Vec::new())
319    } else {
320        Ok(rows)
321    }
322}
323
324fn is_write_only_plan(plan: &LogicalPlan) -> bool {
325    // A plan is write-only when its top-level operator is a mutation
326    // with no projection (Project/Identity) wrapping it. Mutations
327    // with RETURN have a Project on top.
328    match plan {
329        LogicalPlan::CreatePath { .. }
330        | LogicalPlan::Delete { .. }
331        | LogicalPlan::SetProperty { .. }
332        | LogicalPlan::Remove { .. }
333        | LogicalPlan::Foreach { .. }
334        | LogicalPlan::MergeNode { .. }
335        | LogicalPlan::MergeEdge { .. } => true,
336        _ => false,
337    }
338}
339
340/// True when `plan` contains any write-bearing operator anywhere
341/// in its subtree. Used by [`build_op`] to flag `Limit` operators
342/// that need to drain their input to ensure side effects happen
343/// even when the limit suppresses all output rows — Cypher
344/// specifies write operators are eager regardless of how many
345/// rows downstream `LIMIT 0` / `LIMIT N` consumes.
346fn plan_contains_writes(plan: &LogicalPlan) -> bool {
347    use LogicalPlan::*;
348    match plan {
349        // Write-bearing variants — terminate true.
350        CreatePath { .. }
351        | Delete { .. }
352        | SetProperty { .. }
353        | Remove { .. }
354        | MergeNode { .. }
355        | MergeEdge { .. }
356        | Foreach { .. } => true,
357
358        // Container variants with a single required `input`.
359        Filter { input, .. }
360        | Project { input, .. }
361        | Aggregate { input, .. }
362        | Distinct { input, .. }
363        | OrderBy { input, .. }
364        | Skip { input, .. }
365        | Limit { input, .. }
366        | EdgeExpand { input, .. }
367        | OptionalEdgeExpand { input, .. }
368        | VarLengthExpand { input, .. }
369        | Identity { input, .. }
370        | CoalesceNullRow { input, .. }
371        | UnwindChain { input, .. }
372        | BindPath { input, .. }
373        | ShortestPath { input, .. } => plan_contains_writes(input),
374
375        // Two-input variant.
376        CartesianProduct { left, right } => {
377            plan_contains_writes(left) || plan_contains_writes(right)
378        }
379
380        // UNION carries N branches; any can be write-bearing.
381        Union { branches, .. } => branches.iter().any(plan_contains_writes),
382
383        // Subquery wrappers carry both an `input` row source and a
384        // `body` plan; either can be write-bearing.
385        CallSubquery { input, body } | OptionalApply { input, body, .. } => {
386            plan_contains_writes(input) || plan_contains_writes(body)
387        }
388        // CALL ... IN TRANSACTIONS commits each batch internally, so
389        // its writes don't leak into a surrounding LimitOp's drain
390        // logic — but the input subtree can still be write-bearing
391        // (e.g. a preceding CREATE before the CALL). Recurse only
392        // into the input.
393        CallSubqueryInTransactions { input, .. } => plan_contains_writes(input),
394        // apoc.periodic.iterate is fully self-contained: both
395        // iterate and action subtrees are run by the dispatcher,
396        // not by the surrounding operator pipeline. From a
397        // wrapping LimitOp's perspective, no writes pass
398        // through this node.
399        ApocPeriodicIterate { .. } => false,
400
401        // Variants whose `input` is optional (top-level producer
402        // when `None`).
403        ProcedureCall { input, .. } | LoadCsv { input, .. } => {
404            input.as_ref().map_or(false, |i| plan_contains_writes(i))
405        }
406
407        // Leaves and DDL — no writes flow through here.
408        _ => false,
409    }
410}
411
412/// DDL dispatch. Returns `Ok(Some(rows))` when `plan` is a schema
413/// statement and was handled; `Ok(None)` when it's a regular graph
414/// operation that the operator pipeline should execute normally.
415///
416/// `CREATE INDEX` / `DROP INDEX` return one row carrying an `ok`
417/// value so Bolt clients see a non-empty RECORD stream rather than
418/// an empty SUCCESS — mirrors how Neo4j reports schema writes.
419/// `SHOW INDEXES` returns one row per registered index with
420/// `label`, `property`, and `state` columns.
421fn try_execute_ddl(
422    plan: &LogicalPlan,
423    reader: &dyn GraphReader,
424    writer: &dyn GraphWriter,
425) -> Result<Option<Vec<Row>>> {
426    match plan {
427        LogicalPlan::CreatePropertyIndex { label, properties } => {
428            writer.create_property_index(label, properties)?;
429            Ok(Some(vec![node_index_ddl_ack_row(
430                "created", label, properties,
431            )]))
432        }
433        LogicalPlan::DropPropertyIndex { label, properties } => {
434            writer.drop_property_index(label, properties)?;
435            Ok(Some(vec![node_index_ddl_ack_row(
436                "dropped", label, properties,
437            )]))
438        }
439        LogicalPlan::CreateEdgePropertyIndex {
440            edge_type,
441            properties,
442        } => {
443            writer.create_edge_property_index(edge_type, properties)?;
444            Ok(Some(vec![edge_index_ddl_ack_row(
445                "created", edge_type, properties,
446            )]))
447        }
448        LogicalPlan::DropEdgePropertyIndex {
449            edge_type,
450            properties,
451        } => {
452            writer.drop_edge_property_index(edge_type, properties)?;
453            Ok(Some(vec![edge_index_ddl_ack_row(
454                "dropped", edge_type, properties,
455            )]))
456        }
457        LogicalPlan::ShowPropertyIndexes => {
458            // SHOW is a pure read — source it from the reader so
459            // overlay/partitioned readers deliver the right view
460            // without round-tripping through a writer. Both
461            // node-scoped and edge-scoped indexes land in the same
462            // row stream, distinguished by the `scope` column.
463            let mut rows: Vec<Row> = Vec::new();
464            for (label, properties) in reader.list_property_indexes()? {
465                rows.push(show_index_row("NODE", label, properties));
466            }
467            for (edge_type, properties) in reader.list_edge_property_indexes()? {
468                rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
469            }
470            Ok(Some(rows))
471        }
472        LogicalPlan::CreatePointIndex { label, property } => {
473            writer.create_point_index(label, property)?;
474            Ok(Some(vec![point_index_ddl_ack_row(
475                "created", "NODE", label, property,
476            )]))
477        }
478        LogicalPlan::DropPointIndex { label, property } => {
479            writer.drop_point_index(label, property)?;
480            Ok(Some(vec![point_index_ddl_ack_row(
481                "dropped", "NODE", label, property,
482            )]))
483        }
484        LogicalPlan::CreateEdgePointIndex {
485            edge_type,
486            property,
487        } => {
488            writer.create_edge_point_index(edge_type, property)?;
489            Ok(Some(vec![point_index_ddl_ack_row(
490                "created",
491                "RELATIONSHIP",
492                edge_type,
493                property,
494            )]))
495        }
496        LogicalPlan::DropEdgePointIndex {
497            edge_type,
498            property,
499        } => {
500            writer.drop_edge_point_index(edge_type, property)?;
501            Ok(Some(vec![point_index_ddl_ack_row(
502                "dropped",
503                "RELATIONSHIP",
504                edge_type,
505                property,
506            )]))
507        }
508        LogicalPlan::ShowPointIndexes => {
509            // Kept on its own row stream rather than folded into
510            // `SHOW INDEXES` so the point-specific `type` column
511            // stays a stable part of the shape and callers can
512            // filter without depending on string-matching `type`
513            // across mixed rows. Node- and edge-scope rows are
514            // merged here, disambiguated by the `scope` column.
515            let mut rows: Vec<Row> = Vec::new();
516            for (label, property) in reader.list_point_indexes()? {
517                rows.push(show_point_index_row("NODE", label, property));
518            }
519            for (edge_type, property) in reader.list_edge_point_indexes()? {
520                rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
521            }
522            Ok(Some(rows))
523        }
524        LogicalPlan::CreatePropertyConstraint {
525            name,
526            scope,
527            properties,
528            kind,
529            if_not_exists,
530        } => {
531            let storage_kind = match kind {
532                ConstraintKind::Unique => PropertyConstraintKind::Unique,
533                ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
534                ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
535                ConstraintKind::PropertyType(t) => {
536                    PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
537                }
538            };
539            let storage_scope = cypher_to_storage_scope(scope);
540            let spec = writer.create_property_constraint(
541                name.as_deref(),
542                &storage_scope,
543                properties,
544                storage_kind,
545                *if_not_exists,
546            )?;
547            Ok(Some(vec![constraint_ack_row("created", &spec)]))
548        }
549        LogicalPlan::DropPropertyConstraint { name, if_exists } => {
550            writer.drop_property_constraint(name, *if_exists)?;
551            let mut row = Row::default();
552            row.insert(
553                "state".into(),
554                Value::Property(Property::String("dropped".into())),
555            );
556            row.insert(
557                "name".into(),
558                Value::Property(Property::String(name.clone())),
559            );
560            Ok(Some(vec![row]))
561        }
562        LogicalPlan::ShowPropertyConstraints => {
563            let specs = reader.list_property_constraints()?;
564            let rows = specs.into_iter().map(constraint_show_row).collect();
565            Ok(Some(rows))
566        }
567        _ => Ok(None),
568    }
569}
570
571/// One-row acknowledgement emitted after `CREATE CONSTRAINT`. Carries
572/// the resolved spec so callers see the final name when they omitted
573/// it. Columns match `SHOW CONSTRAINTS` (plus `state`) so Bolt clients
574/// can uniformly format DDL responses.
575fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
576    let mut row = constraint_show_row(spec.clone());
577    row.insert(
578        "state".into(),
579        Value::Property(Property::String(state.into())),
580    );
581    row
582}
583
584/// Row shape for `SHOW CONSTRAINTS` and `db.constraints()`. Columns:
585/// `name`, `scope` ("NODE" / "RELATIONSHIP"), `label` (label or edge
586/// type, depending on scope), `properties` (list), and `type`.
587/// Single-property constraints still carry a one-element `properties`
588/// list; composite kinds (e.g. `NodeKey`) carry the full tuple.
589fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
590    let mut row = Row::default();
591    row.insert("name".into(), Value::Property(Property::String(spec.name)));
592    let (scope_tag, target) = match spec.scope {
593        meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
594        meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
595    };
596    row.insert(
597        "scope".into(),
598        Value::Property(Property::String(scope_tag.into())),
599    );
600    // `label` stays for backwards compatibility — it carries the
601    // scope's target regardless of node vs relationship, matching
602    // the index DDL row's column for historical consistency. A
603    // separate `scope` column tells you what kind of target it is.
604    row.insert("label".into(), Value::Property(Property::String(target)));
605    let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
606    row.insert("properties".into(), Value::Property(Property::List(props)));
607    row.insert(
608        "type".into(),
609        Value::Property(Property::String(spec.kind.as_string())),
610    );
611    row
612}
613
614/// Convert the Cypher AST's `ConstraintScope` into the storage-layer
615/// enum. Narrow bridge — same shape on both sides; kept in
616/// different crates to preserve the dependency direction.
617fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
618    match scope {
619        CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
620        CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
621    }
622}
623
624/// Convert the Cypher AST's `PropertyType` into the storage-layer
625/// enum. Narrow bridge — the two enums carry the same four variants
626/// but live in different crates to keep the dependency direction
627/// clean (cypher → executor → storage, never the reverse).
628fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
629    match t {
630        CypherPropertyType::String => StoragePropertyType::String,
631        CypherPropertyType::Integer => StoragePropertyType::Integer,
632        CypherPropertyType::Float => StoragePropertyType::Float,
633        CypherPropertyType::Boolean => StoragePropertyType::Boolean,
634    }
635}
636
637/// One-row acknowledgement emitted after `CREATE INDEX` /
638/// `DROP INDEX` on a node scope. Columns mirror the `SHOW INDEXES`
639/// shape (`scope`, `label`, `property`) so Bolt clients can format
640/// DDL responses uniformly with schema-read responses. `label` is
641/// kept as a column name for backwards-compat with pre-edge-index
642/// clients; the `scope` column disambiguates node from edge.
643fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
644    let mut row = Row::default();
645    row.insert(
646        "state".into(),
647        Value::Property(Property::String(state.into())),
648    );
649    row.insert(
650        "scope".into(),
651        Value::Property(Property::String("NODE".into())),
652    );
653    row.insert(
654        "label".into(),
655        Value::Property(Property::String(label.into())),
656    );
657    // `property` stays a single string for backward compat with
658    // drivers that read it directly. Composite specs render as a
659    // comma-joined preview; the `properties` column carries the
660    // authoritative list.
661    row.insert(
662        "property".into(),
663        Value::Property(Property::String(properties.join(","))),
664    );
665    row.insert("properties".into(), properties_list_value(properties));
666    row
667}
668
669/// One-row acknowledgement emitted after `CREATE INDEX` /
670/// `DROP INDEX` on a relationship scope. Mirrors
671/// [`node_index_ddl_ack_row`] but the target lives under
672/// `edge_type` (and `label` carries the same string so existing
673/// driver code that read `label` still sees the target name).
674fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
675    let mut row = Row::default();
676    row.insert(
677        "state".into(),
678        Value::Property(Property::String(state.into())),
679    );
680    row.insert(
681        "scope".into(),
682        Value::Property(Property::String("RELATIONSHIP".into())),
683    );
684    row.insert(
685        "label".into(),
686        Value::Property(Property::String(edge_type.into())),
687    );
688    row.insert(
689        "edge_type".into(),
690        Value::Property(Property::String(edge_type.into())),
691    );
692    row.insert(
693        "property".into(),
694        Value::Property(Property::String(properties.join(","))),
695    );
696    row.insert("properties".into(), properties_list_value(properties));
697    row
698}
699
700fn properties_list_value(properties: &[String]) -> Value {
701    Value::List(
702        properties
703            .iter()
704            .map(|p| Value::Property(Property::String(p.clone())))
705            .collect(),
706    )
707}
708
709/// One-row ack emitted after `CREATE POINT INDEX` / `DROP POINT
710/// INDEX`. `scope` disambiguates node vs relationship; `label`
711/// carries the scope target string (label for node, edge type for
712/// relationship) and — when relationship — `edge_type` carries it
713/// too so drivers can read either column. Matches the two-column
714/// convention from `show_index_row`.
715fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
716    let mut row = Row::default();
717    row.insert(
718        "state".into(),
719        Value::Property(Property::String(state.into())),
720    );
721    row.insert(
722        "scope".into(),
723        Value::Property(Property::String(scope.into())),
724    );
725    row.insert(
726        "type".into(),
727        Value::Property(Property::String("POINT".into())),
728    );
729    row.insert(
730        "label".into(),
731        Value::Property(Property::String(target.into())),
732    );
733    if scope == "RELATIONSHIP" {
734        row.insert(
735            "edge_type".into(),
736            Value::Property(Property::String(target.into())),
737        );
738    }
739    row.insert(
740        "property".into(),
741        Value::Property(Property::String(property.into())),
742    );
743    row
744}
745
746/// Row shape for `SHOW POINT INDEXES`. One row per registered point
747/// index under both scopes, disambiguated by the `scope` column.
748/// Same column shape as `point_index_ddl_ack_row` minus `state`'s
749/// DDL verb (it carries `"online"` here instead).
750fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
751    let mut row = Row::default();
752    row.insert(
753        "scope".into(),
754        Value::Property(Property::String(scope.into())),
755    );
756    row.insert(
757        "type".into(),
758        Value::Property(Property::String("POINT".into())),
759    );
760    row.insert(
761        "label".into(),
762        Value::Property(Property::String(target.clone())),
763    );
764    if scope == "RELATIONSHIP" {
765        row.insert(
766            "edge_type".into(),
767            Value::Property(Property::String(target)),
768        );
769    }
770    row.insert(
771        "property".into(),
772        Value::Property(Property::String(property)),
773    );
774    row.insert(
775        "state".into(),
776        Value::Property(Property::String("online".into())),
777    );
778    row
779}
780
781/// Row shape for `SHOW INDEXES` output. Columns: `scope`
782/// (`"NODE"` / `"RELATIONSHIP"`), `label` (scope target — label for
783/// node, edge type for relationship; kept for backwards-compat),
784/// `property` (comma-joined preview of the properties for composite
785/// specs), `properties` (full list — authoritative), and `state`.
786/// `label` and `edge_type` carry the same string when
787/// `scope == "RELATIONSHIP"` so clients can read either column.
788fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
789    let mut row = Row::default();
790    row.insert(
791        "scope".into(),
792        Value::Property(Property::String(scope.into())),
793    );
794    row.insert(
795        "label".into(),
796        Value::Property(Property::String(target.clone())),
797    );
798    if scope == "RELATIONSHIP" {
799        row.insert(
800            "edge_type".into(),
801            Value::Property(Property::String(target)),
802        );
803    }
804    row.insert(
805        "property".into(),
806        Value::Property(Property::String(properties.join(","))),
807    );
808    row.insert("properties".into(), properties_list_value(&properties));
809    row.insert(
810        "state".into(),
811        Value::Property(Property::String("online".into())),
812    );
813    row
814}
815
816fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
817    build_op_inner(plan, None, &mut None)
818}
819
820/// Build the operator tree for `plan`. `seed` is the optional
821/// outer row threaded into operators that can consume one (used
822/// by `CallSubqueryOp` for per-row body evaluation).
823///
824/// `in_tx_substitute` lets the caller pre-materialize the rows
825/// that a `CallSubqueryInTransactions` node would normally
826/// produce and inject them as a `RowsLiteralOp` instead. Used by
827/// `MeshService::execute_call_in_transactions` to fold its
828/// already-batched-and-committed outputs back into the regular
829/// pipeline so wrapping clauses (Project / OrderBy / Limit /
830/// Aggregate / Filter) can run untouched. When `None` and a
831/// `CallSubqueryInTransactions` node is encountered, the
832/// operator panics — this surfaces missing dispatcher wiring at
833/// build time instead of as a silent semantic regression.
834pub(crate) fn build_op_inner(
835    plan: &LogicalPlan,
836    seed: Option<&Row>,
837    in_tx_substitute: &mut Option<Vec<Row>>,
838) -> Box<dyn Operator> {
839    macro_rules! child {
840        ($p:expr) => {
841            build_op_inner($p, seed, in_tx_substitute)
842        };
843    }
844    match plan {
845        LogicalPlan::CreatePath {
846            input,
847            nodes,
848            edges,
849        } => Box::new(CreatePathOp::new(
850            input.as_ref().map(|p| child!(p)),
851            nodes.clone(),
852            edges.clone(),
853        )),
854        LogicalPlan::CartesianProduct { left, right } => {
855            Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
856        }
857        LogicalPlan::Delete {
858            input,
859            detach,
860            vars,
861            exprs,
862        } => Box::new(DeleteOp::new(
863            child!(input),
864            *detach,
865            vars.clone(),
866            exprs.clone(),
867        )),
868        LogicalPlan::SetProperty { input, assignments } => {
869            Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
870        }
871        LogicalPlan::Remove { input, items } => {
872            Box::new(RemoveOp::new(child!(input), items.clone()))
873        }
874        LogicalPlan::LoadCsv {
875            input,
876            path_expr,
877            var,
878            with_headers,
879        } => Box::new(LoadCsvOp::new(
880            input.as_ref().map(|p| child!(p)),
881            path_expr.clone(),
882            var.clone(),
883            *with_headers,
884        )),
885        LogicalPlan::Foreach {
886            input,
887            var,
888            list_expr,
889            set_assignments,
890            remove_items,
891        } => Box::new(ForeachOp::new(
892            child!(input),
893            var.clone(),
894            list_expr.clone(),
895            set_assignments.clone(),
896            remove_items.clone(),
897        )),
898        LogicalPlan::CallSubquery { input, body } => {
899            Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
900        }
901        LogicalPlan::CallSubqueryInTransactions { .. }
902        | LogicalPlan::ApocPeriodicIterate { .. } => {
903            // CALL ... IN TRANSACTIONS and apoc.periodic.iterate
904            // both commit each batch in its own transaction,
905            // which the operator pipeline can't do (it has no
906            // commit handle). The dispatcher above
907            // (`MeshService::execute_cypher_in_tx`) does the
908            // batched execution itself, then folds the
909            // resulting rows back into the pipeline by passing
910            // them via `in_tx_substitute`. We swap a
911            // `RowsLiteralOp` in place of the batched node so
912            // any wrapping Project / OrderBy / Limit / etc.
913            // operators run untouched against those
914            // pre-materialized rows.
915            //
916            // Reaching this arm with `in_tx_substitute = None`
917            // means the dispatcher was bypassed — panic loudly
918            // so the missing wiring surfaces at build time.
919            match in_tx_substitute.take() {
920                Some(rows) => Box::new(RowsLiteralOp::new(rows)),
921                None => panic!(
922                    "Batched-commit LogicalPlan variant reached `build_op` without a row \
923                     substitute — the server-side dispatcher \
924                     (execute_cypher_in_tx → execute_call_in_transactions / \
925                     execute_apoc_periodic_iterate) must inject one before invoking the \
926                     operator pipeline"
927                ),
928            }
929        }
930        LogicalPlan::OptionalApply {
931            input,
932            body,
933            null_vars,
934        } => Box::new(OptionalApplyOp::new(
935            child!(input),
936            (**body).clone(),
937            null_vars.clone(),
938        )),
939        LogicalPlan::ProcedureCall {
940            input,
941            qualified_name,
942            args,
943            yield_spec,
944            standalone,
945        } => Box::new(ProcedureCallOp::new(
946            input.as_ref().map(|p| child!(p)),
947            qualified_name.clone(),
948            args.clone(),
949            yield_spec.clone(),
950            *standalone,
951        )),
952        LogicalPlan::SeedRow => match seed {
953            Some(r) => Box::new(SeededRowOp {
954                row: Some(r.clone()),
955            }),
956            None => Box::new(SeedRowOp { done: false }),
957        },
958        LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
959        LogicalPlan::NodeScanByLabels { var, labels } => {
960            Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
961        }
962        LogicalPlan::EdgeExpand {
963            input,
964            src_var,
965            edge_var,
966            dst_var,
967            dst_labels,
968            edge_properties,
969            edge_types,
970            direction,
971            edge_constraint_var,
972        } => Box::new(EdgeExpandOp::new(
973            child!(input),
974            src_var.clone(),
975            edge_var.clone(),
976            dst_var.clone(),
977            dst_labels.clone(),
978            edge_properties.clone(),
979            edge_types.clone(),
980            *direction,
981            edge_constraint_var.clone(),
982        )),
983        LogicalPlan::OptionalEdgeExpand {
984            input,
985            src_var,
986            edge_var,
987            dst_var,
988            dst_labels,
989            dst_properties,
990            edge_types,
991            direction,
992            dst_constraint_var,
993            edge_constraint_var,
994        } => Box::new(OptionalEdgeExpandOp::new(
995            child!(input),
996            src_var.clone(),
997            edge_var.clone(),
998            dst_var.clone(),
999            dst_labels.clone(),
1000            dst_properties.clone(),
1001            edge_types.clone(),
1002            *direction,
1003            dst_constraint_var.clone(),
1004            edge_constraint_var.clone(),
1005        )),
1006        LogicalPlan::VarLengthExpand {
1007            input,
1008            src_var,
1009            edge_var,
1010            dst_var,
1011            dst_labels,
1012            edge_types,
1013            edge_properties,
1014            direction,
1015            min_hops,
1016            max_hops,
1017            path_var,
1018            optional,
1019            dst_constraint_var,
1020            bound_edge_list_var,
1021            excluded_edge_vars,
1022        } => Box::new(VarLengthExpandOp::new(
1023            child!(input),
1024            src_var.clone(),
1025            edge_var.clone(),
1026            dst_var.clone(),
1027            dst_labels.clone(),
1028            edge_types.clone(),
1029            edge_properties.clone(),
1030            *direction,
1031            *min_hops,
1032            *max_hops,
1033            path_var.clone(),
1034            *optional,
1035            dst_constraint_var.clone(),
1036            bound_edge_list_var.clone(),
1037            excluded_edge_vars.clone(),
1038        )),
1039        LogicalPlan::Filter { input, predicate } => {
1040            Box::new(FilterOp::new(child!(input), predicate.clone()))
1041        }
1042        LogicalPlan::Project { input, items } => {
1043            Box::new(ProjectOp::new(child!(input), items.clone()))
1044        }
1045        LogicalPlan::Aggregate {
1046            input,
1047            group_keys,
1048            aggregates,
1049        } => Box::new(AggregateOp::new(
1050            child!(input),
1051            group_keys.clone(),
1052            aggregates.clone(),
1053        )),
1054        LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
1055        LogicalPlan::CoalesceNullRow { input, null_vars } => {
1056            Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
1057        }
1058        LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
1059        LogicalPlan::OrderBy { input, sort_items } => {
1060            Box::new(OrderByOp::new(child!(input), sort_items.clone()))
1061        }
1062        LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
1063        LogicalPlan::Limit { input, count } => {
1064            // When the input subtree contains a write, we must
1065            // drain it after the limit is exhausted (or before
1066            // returning None on `LIMIT 0`) — Cypher specifies
1067            // write side effects are eager regardless of how
1068            // many output rows downstream consumes.
1069            let drain_on_complete = plan_contains_writes(input);
1070            Box::new(LimitOp::new(
1071                child!(input),
1072                count.clone(),
1073                drain_on_complete,
1074            ))
1075        }
1076        LogicalPlan::MergeNode {
1077            input,
1078            var,
1079            labels,
1080            properties,
1081            on_create,
1082            on_match,
1083        } => Box::new(MergeNodeOp::new(
1084            input.as_ref().map(|p| child!(p)),
1085            var.clone(),
1086            labels.clone(),
1087            properties.clone(),
1088            on_create.clone(),
1089            on_match.clone(),
1090        )),
1091        LogicalPlan::MergeEdge {
1092            input,
1093            edge_var,
1094            src_var,
1095            dst_var,
1096            edge_type,
1097            undirected,
1098            properties,
1099            on_create,
1100            on_match,
1101        } => Box::new(MergeEdgeOp::new(
1102            child!(input),
1103            edge_var.clone(),
1104            src_var.clone(),
1105            dst_var.clone(),
1106            edge_type.clone(),
1107            *undirected,
1108            properties.clone(),
1109            on_create.clone(),
1110            on_match.clone(),
1111        )),
1112        LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
1113        LogicalPlan::UnwindChain { input, var, expr } => {
1114            Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
1115        }
1116        LogicalPlan::IndexSeek {
1117            var,
1118            label,
1119            properties,
1120            values,
1121        } => Box::new(IndexSeekOp::new(
1122            var.clone(),
1123            label.clone(),
1124            properties.clone(),
1125            values.clone(),
1126        )),
1127        LogicalPlan::PointIndexSeek {
1128            var,
1129            label,
1130            property,
1131            bounds,
1132        } => Box::new(PointIndexSeekOp::new(
1133            var.clone(),
1134            label.clone(),
1135            property.clone(),
1136            bounds.clone(),
1137        )),
1138        LogicalPlan::EdgeSeek {
1139            edge_var,
1140            src_var,
1141            dst_var,
1142            edge_type,
1143            property,
1144            value,
1145            direction,
1146            residual_properties,
1147        } => Box::new(EdgeSeekOp::new(
1148            edge_var.clone(),
1149            src_var.clone(),
1150            dst_var.clone(),
1151            edge_type.clone(),
1152            property.clone(),
1153            value.clone(),
1154            *direction,
1155            residual_properties.clone(),
1156        )),
1157        LogicalPlan::EdgePointIndexSeek {
1158            edge_var,
1159            src_var,
1160            dst_var,
1161            edge_type,
1162            property,
1163            direction,
1164            bounds,
1165        } => Box::new(EdgePointIndexSeekOp::new(
1166            edge_var.clone(),
1167            src_var.clone(),
1168            dst_var.clone(),
1169            edge_type.clone(),
1170            property.clone(),
1171            *direction,
1172            bounds.clone(),
1173        )),
1174        // DDL plans are handled by `try_execute_ddl` before the
1175        // operator pipeline is built, so they should never reach
1176        // this point. An assertion here catches any future
1177        // refactor that forgets to gate them.
1178        LogicalPlan::Union { branches, all } => {
1179            let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
1180            Box::new(UnionOp::new(branch_ops, *all))
1181        }
1182        LogicalPlan::BindPath {
1183            input,
1184            path_var,
1185            node_vars,
1186            edge_vars,
1187        } => Box::new(BindPathOp::new(
1188            child!(input),
1189            path_var.clone(),
1190            node_vars.clone(),
1191            edge_vars.clone(),
1192        )),
1193        LogicalPlan::ShortestPath {
1194            input,
1195            src_var,
1196            dst_var,
1197            path_var,
1198            edge_types,
1199            direction,
1200            max_hops,
1201            kind,
1202        } => Box::new(ShortestPathOp::new(
1203            child!(input),
1204            src_var.clone(),
1205            dst_var.clone(),
1206            path_var.clone(),
1207            edge_types.clone(),
1208            *direction,
1209            *max_hops,
1210            *kind,
1211        )),
1212        LogicalPlan::CreatePropertyIndex { .. }
1213        | LogicalPlan::DropPropertyIndex { .. }
1214        | LogicalPlan::CreateEdgePropertyIndex { .. }
1215        | LogicalPlan::DropEdgePropertyIndex { .. }
1216        | LogicalPlan::ShowPropertyIndexes
1217        | LogicalPlan::CreatePointIndex { .. }
1218        | LogicalPlan::DropPointIndex { .. }
1219        | LogicalPlan::CreateEdgePointIndex { .. }
1220        | LogicalPlan::DropEdgePointIndex { .. }
1221        | LogicalPlan::ShowPointIndexes
1222        | LogicalPlan::CreatePropertyConstraint { .. }
1223        | LogicalPlan::DropPropertyConstraint { .. }
1224        | LogicalPlan::ShowPropertyConstraints => {
1225            panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1226        }
1227    }
1228}
1229
1230struct UnwindOp {
1231    var: String,
1232    expr: Expr,
1233    items: Option<Vec<Value>>,
1234    cursor: usize,
1235}
1236
1237impl UnwindOp {
1238    fn new(var: String, expr: Expr) -> Self {
1239        Self {
1240            var,
1241            expr,
1242            items: None,
1243            cursor: 0,
1244        }
1245    }
1246}
1247
1248impl Operator for UnwindOp {
1249    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1250        if self.items.is_none() {
1251            let empty = Row::new();
1252            let ectx = EvalCtx {
1253                row: &empty,
1254                params: ctx.params,
1255                reader: ctx.store,
1256                procedures: ctx.procedures,
1257                outer_rows: ctx.outer_rows,
1258                tombstones: ctx.tombstones,
1259            };
1260            let val = eval_expr(&self.expr, &ectx)?;
1261            self.items = Some(coerce_unwind_list(val)?);
1262        }
1263        let items = self.items.as_ref().unwrap();
1264        if self.cursor < items.len() {
1265            let v = items[self.cursor].clone();
1266            self.cursor += 1;
1267            let mut row = Row::new();
1268            row.insert(self.var.clone(), v);
1269            Ok(Some(row))
1270        } else {
1271            Ok(None)
1272        }
1273    }
1274}
1275
1276/// Per-row UNWIND: pulls one input row, evaluates `expr` against it
1277/// to produce a list, and emits one output row per list element.
1278/// Each output row inherits every binding from the input row plus
1279/// a new `var` binding. Empty / null lists drop the input row and
1280/// the operator immediately pulls the next input row.
1281struct UnwindChainOp {
1282    input: Box<dyn Operator>,
1283    var: String,
1284    expr: Expr,
1285    current_row: Option<Row>,
1286    items: Vec<Value>,
1287    cursor: usize,
1288}
1289
1290impl UnwindChainOp {
1291    fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1292        Self {
1293            input,
1294            var,
1295            expr,
1296            current_row: None,
1297            items: Vec::new(),
1298            cursor: 0,
1299        }
1300    }
1301}
1302
1303impl Operator for UnwindChainOp {
1304    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1305        loop {
1306            if let Some(base) = &self.current_row {
1307                if self.cursor < self.items.len() {
1308                    let v = self.items[self.cursor].clone();
1309                    self.cursor += 1;
1310                    let mut row = base.clone();
1311                    row.insert(self.var.clone(), v);
1312                    return Ok(Some(row));
1313                }
1314                self.current_row = None;
1315                self.items.clear();
1316                self.cursor = 0;
1317            }
1318            let base = match self.input.next(ctx)? {
1319                Some(r) => r,
1320                None => return Ok(None),
1321            };
1322            let ectx = EvalCtx {
1323                row: &base,
1324                params: ctx.params,
1325                reader: ctx.store,
1326                procedures: ctx.procedures,
1327                outer_rows: ctx.outer_rows,
1328                tombstones: ctx.tombstones,
1329            };
1330            let val = eval_expr(&self.expr, &ectx)?;
1331            self.items = coerce_unwind_list(val)?;
1332            self.current_row = Some(base);
1333        }
1334    }
1335}
1336
1337/// Shared list-coercion used by both UNWIND operators. Accepts a
1338/// native executor `Value::List`, a property-list
1339/// `Property::List`, or `Null` (treated as an empty list). Any
1340/// other shape is a type error — UNWIND is defined over lists.
1341fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1342    match val {
1343        Value::List(items) => Ok(items),
1344        Value::Property(Property::List(props)) => {
1345            Ok(props.into_iter().map(Value::Property).collect())
1346        }
1347        Value::Null => Ok(Vec::new()),
1348        _ => Err(Error::TypeMismatch),
1349    }
1350}
1351
1352struct CreatePathOp {
1353    input: Option<Box<dyn Operator>>,
1354    nodes: Vec<CreateNodeSpec>,
1355    edges: Vec<CreateEdgeSpec>,
1356    done: bool,
1357    buffered: Option<Vec<Row>>,
1358    cursor: usize,
1359}
1360
1361impl CreatePathOp {
1362    fn new(
1363        input: Option<Box<dyn Operator>>,
1364        nodes: Vec<CreateNodeSpec>,
1365        edges: Vec<CreateEdgeSpec>,
1366    ) -> Self {
1367        Self {
1368            input,
1369            nodes,
1370            edges,
1371            done: false,
1372            buffered: None,
1373            cursor: 0,
1374        }
1375    }
1376
1377    fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1378        let mut out = row.clone();
1379        let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1380        for spec in &self.nodes {
1381            match spec {
1382                CreateNodeSpec::New {
1383                    var,
1384                    labels,
1385                    properties,
1386                } => {
1387                    let mut node = Node::new();
1388                    for label in labels {
1389                        node.labels.push(label.clone());
1390                    }
1391                    // Property values are `Expr` (literal | parameter
1392                    // per the grammar). Evaluate against the current row
1393                    // + params and convert to a stored Property via the
1394                    // existing helper, which rejects Node/Edge values.
1395                    // Null-valued properties aren't stored — openCypher
1396                    // treats `{k: null}` as "no such property," so
1397                    // `keys(n)` and `n.k IS NOT NULL` both skip it.
1398                    for (k, expr) in properties {
1399                        let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1400                        let prop = value_to_property(value)?;
1401                        if matches!(prop, Property::Null) {
1402                            continue;
1403                        }
1404                        node.properties.insert(k.clone(), prop);
1405                    }
1406                    ctx.writer.put_node(&node)?;
1407                    node_ids.push(node.id);
1408                    if let Some(v) = var {
1409                        out.insert(v.clone(), Value::Node(node));
1410                    }
1411                }
1412                CreateNodeSpec::Reference(name) => {
1413                    let id = match out.get(name) {
1414                        Some(Value::Node(n)) => n.id,
1415                        _ => return Err(Error::UnboundVariable(name.clone())),
1416                    };
1417                    node_ids.push(id);
1418                }
1419            }
1420        }
1421        for spec in &self.edges {
1422            let src = node_ids[spec.src_idx];
1423            let dst = node_ids[spec.dst_idx];
1424            let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1425            for (k, expr) in &spec.properties {
1426                let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1427                let prop = value_to_property(value)?;
1428                if matches!(prop, Property::Null) {
1429                    continue;
1430                }
1431                edge.properties.insert(k.clone(), prop);
1432            }
1433            ctx.writer.put_edge(&edge)?;
1434            if let Some(v) = &spec.var {
1435                out.insert(v.clone(), Value::Edge(edge));
1436            }
1437        }
1438        Ok(out)
1439    }
1440}
1441
1442impl Operator for CreatePathOp {
1443    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1444        if self.input.is_some() {
1445            // Eager evaluation: drain the entire input *and* apply
1446            // all writes on the first call, then stream the
1447            // already-applied rows. This is required by Cypher's
1448            // "writes are eager" semantics — subsequent operators
1449            // (especially fresh scans inside a CartesianProduct
1450            // for `WITH * MATCH ...`) must see the post-write
1451            // graph state. The previous lazy per-`next()` apply
1452            // interleaved writes with downstream scan caching,
1453            // so a `MATCH () CREATE () WITH * MATCH () CREATE ()`
1454            // pattern undercounted: the right-side scan rebuilt
1455            // for each left row only saw writes done before that
1456            // particular iteration.
1457            if let Some(buffered) = self.buffered.as_mut() {
1458                if self.cursor < buffered.len() {
1459                    let row = buffered[self.cursor].clone();
1460                    self.cursor += 1;
1461                    return Ok(Some(row));
1462                }
1463                return Ok(None);
1464            }
1465            let input_rows: Vec<Row> = {
1466                let input = self.input.as_mut().unwrap();
1467                let mut acc = Vec::new();
1468                while let Some(row) = input.next(ctx)? {
1469                    acc.push(row);
1470                }
1471                acc
1472            };
1473            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1474            for row in input_rows {
1475                applied.push(self.apply(ctx, &row)?);
1476            }
1477            self.buffered = Some(applied);
1478            self.cursor = 0;
1479            // Fall through to next call via recursion to yield
1480            // the first buffered row.
1481            self.next(ctx)
1482        } else {
1483            if self.done {
1484                return Ok(None);
1485            }
1486            self.done = true;
1487            let empty = Row::new();
1488            Ok(Some(self.apply(ctx, &empty)?))
1489        }
1490    }
1491}
1492
1493struct CartesianProductOp {
1494    left: Box<dyn Operator>,
1495    right_plan: LogicalPlan,
1496    left_row: Option<Row>,
1497    right_op: Option<Box<dyn Operator>>,
1498}
1499
1500impl CartesianProductOp {
1501    fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1502        Self {
1503            left,
1504            right_plan,
1505            left_row: None,
1506            right_op: None,
1507        }
1508    }
1509}
1510
1511impl Operator for CartesianProductOp {
1512    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1513        loop {
1514            if self.left_row.is_none() {
1515                match self.left.next(ctx)? {
1516                    None => return Ok(None),
1517                    Some(row) => {
1518                        self.left_row = Some(row);
1519                        self.right_op = Some(build_op(&self.right_plan));
1520                    }
1521                }
1522            }
1523            let right_op = self.right_op.as_mut().expect("right_op set");
1524            // Expose the current left row to right-side operators
1525            // via a shadow context so constraint lookups (for
1526            // bound edges / bound edge lists) can find vars that
1527            // the right-side scan itself doesn't bind.
1528            let left_ref = self.left_row.as_ref().unwrap();
1529            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1530            stacked.push(left_ref);
1531            stacked.extend_from_slice(ctx.outer_rows);
1532            let inner_ctx = ExecCtx {
1533                store: ctx.store,
1534                writer: ctx.writer,
1535                params: ctx.params,
1536                procedures: ctx.procedures,
1537                outer_rows: &stacked,
1538                tombstones: ctx.tombstones,
1539            };
1540            match right_op.next(&inner_ctx)? {
1541                Some(right_row) => {
1542                    let mut combined = left_ref.clone();
1543                    for (k, v) in right_row {
1544                        combined.insert(k, v);
1545                    }
1546                    return Ok(Some(combined));
1547                }
1548                None => {
1549                    self.left_row = None;
1550                    self.right_op = None;
1551                }
1552            }
1553        }
1554    }
1555}
1556
1557struct DeleteOp {
1558    input: Box<dyn Operator>,
1559    detach: bool,
1560    #[allow(dead_code)]
1561    vars: Vec<String>,
1562    exprs: Vec<Expr>,
1563    /// Rows drained from `input` before any deletes have run.
1564    /// Populated on first call to `next`. openCypher treats DELETE
1565    /// as a batch mutation: the whole preceding row set is
1566    /// materialised, then the mutations happen, then rows are
1567    /// forwarded. Without buffering, `MATCH (a)-[r]-(b) DELETE r,
1568    /// a, b RETURN count(*)` deletes the first row's nodes before
1569    /// the pipelined scan reaches the second row.
1570    buffered: Option<Vec<Row>>,
1571    cursor: usize,
1572}
1573
1574impl DeleteOp {
1575    fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1576        Self {
1577            input,
1578            detach,
1579            vars,
1580            exprs,
1581            buffered: None,
1582            cursor: 0,
1583        }
1584    }
1585
1586    /// Gather every edge and node id referenced by the row's
1587    /// DELETE expressions and run the delete in two phases —
1588    /// edges first, then nodes. The two-phase ordering is what
1589    /// lets `DELETE p1, p2` succeed when two paths share nodes
1590    /// connected by each other's edges: by the time the node
1591    /// phase runs, the in-batch edges are already gone. The
1592    /// non-detach check probes the graph then filters out
1593    /// any edge we just deleted, so the batch-level detachment
1594    /// is counted as "no longer attached" rather than failing.
1595    fn apply_deletes(
1596        &self,
1597        ctx: &ExecCtx,
1598        row: &Row,
1599        seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1600        seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1601    ) -> Result<()> {
1602        let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1603        let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1604        for expr in &self.exprs {
1605            let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1606            match v {
1607                Value::Node(n) => node_ids.push(n.id),
1608                Value::Edge(e) => edge_ids.push(e.id),
1609                Value::Path { nodes, edges } => {
1610                    for e in edges {
1611                        edge_ids.push(e.id);
1612                    }
1613                    for n in nodes {
1614                        node_ids.push(n.id);
1615                    }
1616                }
1617                Value::Null | Value::Property(Property::Null) => continue,
1618                _ => return Err(Error::TypeMismatch),
1619            }
1620        }
1621        for eid in &edge_ids {
1622            if seen_edges.insert(*eid) {
1623                ctx.writer.delete_edge(*eid)?;
1624                ctx.tombstones.edges.borrow_mut().insert(*eid);
1625            }
1626        }
1627        for nid in &node_ids {
1628            if !seen_nodes.insert(*nid) {
1629                continue;
1630            }
1631            if self.detach {
1632                // DETACH DELETE also removes every attached edge;
1633                // tombstone them too so a later projection over a
1634                // formerly-attached edge clone also raises rather
1635                // than reading stale properties.
1636                for (eid, _) in ctx.store.outgoing(*nid)? {
1637                    ctx.tombstones.edges.borrow_mut().insert(eid);
1638                }
1639                for (eid, _) in ctx.store.incoming(*nid)? {
1640                    ctx.tombstones.edges.borrow_mut().insert(eid);
1641                }
1642                ctx.writer.detach_delete_node(*nid)?;
1643            } else {
1644                let out = ctx.store.outgoing(*nid)?;
1645                let inc = ctx.store.incoming(*nid)?;
1646                let still_attached = out
1647                    .iter()
1648                    .chain(inc.iter())
1649                    .any(|(eid, _)| !seen_edges.contains(eid));
1650                if still_attached {
1651                    return Err(Error::CannotDeleteAttachedNode);
1652                }
1653                ctx.writer.detach_delete_node(*nid)?;
1654            }
1655            ctx.tombstones.nodes.borrow_mut().insert(*nid);
1656        }
1657        Ok(())
1658    }
1659}
1660
1661impl Operator for DeleteOp {
1662    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1663        // First call: drain the input, run all deletes, then
1664        // start forwarding rows. Buffering protects against the
1665        // pipelined-scan-vs-mutation race: `MATCH (a)-[r]-(b)
1666        // DELETE r, a, b RETURN count(*)` must observe every
1667        // MATCH row before any node or edge disappears, otherwise
1668        // the undirected expansion hits an already-deleted source
1669        // on the second iteration.
1670        if self.buffered.is_none() {
1671            let mut rows: Vec<Row> = Vec::new();
1672            while let Some(row) = self.input.next(ctx)? {
1673                rows.push(row);
1674            }
1675            let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1676            let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1677            for row in &rows {
1678                self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1679            }
1680            self.buffered = Some(rows);
1681            self.cursor = 0;
1682        }
1683        let rows = self.buffered.as_ref().unwrap();
1684        if self.cursor < rows.len() {
1685            let row = rows[self.cursor].clone();
1686            self.cursor += 1;
1687            return Ok(Some(row));
1688        }
1689        Ok(None)
1690    }
1691}
1692
1693struct SetPropertyOp {
1694    input: Box<dyn Operator>,
1695    assignments: Vec<SetAssignment>,
1696    /// Eager-evaluated output: drained input with all SET
1697    /// assignments already applied to the store. Populated on
1698    /// the first `next()` call so the writes happen before any
1699    /// downstream operator (e.g. a fresh MATCH inside a
1700    /// CartesianProduct after a `WITH *` boundary) starts
1701    /// scanning. Without this, lazy per-row apply interleaved
1702    /// the writes with downstream scan caching and
1703    /// undercounted side effects in patterns like
1704    /// `MATCH (n) SET n.x = 1 WITH * MATCH (m) ...`.
1705    buffered: Option<Vec<Row>>,
1706    cursor: usize,
1707}
1708
1709impl SetPropertyOp {
1710    fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1711        Self {
1712            input,
1713            assignments,
1714            buffered: None,
1715            cursor: 0,
1716        }
1717    }
1718
1719    fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1720        // Phase 1: evaluate any RHSes against the original row bindings.
1721        enum Action {
1722            SetKey {
1723                var: String,
1724                key: String,
1725                prop: Property,
1726            },
1727            AddLabels {
1728                var: String,
1729                labels: Vec<String>,
1730            },
1731            Replace {
1732                var: String,
1733                props: Vec<(String, Property)>,
1734            },
1735            Merge {
1736                var: String,
1737                props: Vec<(String, Property)>,
1738            },
1739        }
1740        let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1741        for a in &self.assignments {
1742            match a {
1743                SetAssignment::Property { var, key, value } => {
1744                    let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1745                    let prop = value_to_property(evaluated)?;
1746                    actions.push(Action::SetKey {
1747                        var: var.clone(),
1748                        key: key.clone(),
1749                        prop,
1750                    });
1751                }
1752                SetAssignment::Labels { var, labels } => {
1753                    actions.push(Action::AddLabels {
1754                        var: var.clone(),
1755                        labels: labels.clone(),
1756                    });
1757                }
1758                SetAssignment::Replace { var, properties } => {
1759                    // Property values are now `Expr`. Evaluate
1760                    // each against the current row + params and
1761                    // convert via the shared helper, which
1762                    // surfaces InvalidSetValue for Node/Edge.
1763                    let props = properties
1764                        .iter()
1765                        .map(|(k, expr)| {
1766                            let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1767                            Ok((k.clone(), value_to_property(v)?))
1768                        })
1769                        .collect::<Result<Vec<(String, Property)>>>()?;
1770                    actions.push(Action::Replace {
1771                        var: var.clone(),
1772                        props,
1773                    });
1774                }
1775                SetAssignment::Merge { var, properties } => {
1776                    let props = properties
1777                        .iter()
1778                        .map(|(k, expr)| {
1779                            let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1780                            Ok((k.clone(), value_to_property(v)?))
1781                        })
1782                        .collect::<Result<Vec<(String, Property)>>>()?;
1783                    actions.push(Action::Merge {
1784                        var: var.clone(),
1785                        props,
1786                    });
1787                }
1788                SetAssignment::ReplaceFromExpr {
1789                    var,
1790                    source,
1791                    replace,
1792                } => {
1793                    let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1794                    let props = extract_property_map(&v)?;
1795                    if *replace {
1796                        actions.push(Action::Replace {
1797                            var: var.clone(),
1798                            props,
1799                        });
1800                    } else {
1801                        actions.push(Action::Merge {
1802                            var: var.clone(),
1803                            props,
1804                        });
1805                    }
1806                }
1807            }
1808        }
1809
1810        // Phase 2: apply updates in-place to the row bindings.
1811        let mut updated_nodes: HashSet<String> = HashSet::new();
1812        let mut updated_edges: HashSet<String> = HashSet::new();
1813        for action in actions {
1814            match action {
1815                Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1816                    // openCypher: SET on a null target (from
1817                    // OPTIONAL MATCH that didn't bind) is a
1818                    // silent no-op rather than an error.
1819                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1820                    // openCypher: SET a.key = null removes the
1821                    // property rather than storing a null value.
1822                    Some(Value::Node(n)) => {
1823                        if matches!(prop, Property::Null) {
1824                            n.properties.remove(&key);
1825                        } else {
1826                            n.properties.insert(key, prop);
1827                        }
1828                        updated_nodes.insert(var);
1829                    }
1830                    Some(Value::Edge(e)) => {
1831                        if matches!(prop, Property::Null) {
1832                            e.properties.remove(&key);
1833                        } else {
1834                            e.properties.insert(key, prop);
1835                        }
1836                        updated_edges.insert(var);
1837                    }
1838                    _ => return Err(Error::UnboundVariable(var)),
1839                },
1840                Action::AddLabels { var, labels } => match row.get_mut(&var) {
1841                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1842                    Some(Value::Node(n)) => {
1843                        for label in labels {
1844                            if !n.labels.contains(&label) {
1845                                n.labels.push(label);
1846                            }
1847                        }
1848                        updated_nodes.insert(var);
1849                    }
1850                    _ => return Err(Error::UnboundVariable(var)),
1851                },
1852                Action::Replace { var, props } => match row.get_mut(&var) {
1853                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1854                    Some(Value::Node(n)) => {
1855                        n.properties.clear();
1856                        for (k, v) in props {
1857                            if !matches!(v, Property::Null) {
1858                                n.properties.insert(k, v);
1859                            }
1860                        }
1861                        updated_nodes.insert(var);
1862                    }
1863                    Some(Value::Edge(e)) => {
1864                        e.properties.clear();
1865                        for (k, v) in props {
1866                            if !matches!(v, Property::Null) {
1867                                e.properties.insert(k, v);
1868                            }
1869                        }
1870                        updated_edges.insert(var);
1871                    }
1872                    _ => return Err(Error::UnboundVariable(var)),
1873                },
1874                Action::Merge { var, props } => match row.get_mut(&var) {
1875                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1876                    Some(Value::Node(n)) => {
1877                        for (k, v) in props {
1878                            if matches!(v, Property::Null) {
1879                                n.properties.remove(&k);
1880                            } else {
1881                                n.properties.insert(k, v);
1882                            }
1883                        }
1884                        updated_nodes.insert(var);
1885                    }
1886                    Some(Value::Edge(e)) => {
1887                        for (k, v) in props {
1888                            if matches!(v, Property::Null) {
1889                                e.properties.remove(&k);
1890                            } else {
1891                                e.properties.insert(k, v);
1892                            }
1893                        }
1894                        updated_edges.insert(var);
1895                    }
1896                    _ => return Err(Error::UnboundVariable(var)),
1897                },
1898            }
1899        }
1900
1901        // Phase 3: flush each mutated entity once to the writer.
1902        for var in &updated_nodes {
1903            if let Some(Value::Node(n)) = row.get(var) {
1904                ctx.writer.put_node(n)?;
1905            }
1906        }
1907        for var in &updated_edges {
1908            if let Some(Value::Edge(e)) = row.get(var) {
1909                ctx.writer.put_edge(e)?;
1910            }
1911        }
1912
1913        Ok(row)
1914    }
1915}
1916
1917impl Operator for SetPropertyOp {
1918    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1919        // Eager: drain input + apply all SETs upfront, then
1920        // stream the mutated rows. See `buffered` doc comment
1921        // for why lazy per-row apply broke `WITH *`-then-MATCH.
1922        if self.buffered.is_none() {
1923            let mut input_rows: Vec<Row> = Vec::new();
1924            while let Some(row) = self.input.next(ctx)? {
1925                input_rows.push(row);
1926            }
1927            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1928            for row in input_rows {
1929                applied.push(self.apply_one(ctx, row)?);
1930            }
1931            self.buffered = Some(applied);
1932            self.cursor = 0;
1933        }
1934        let rows = self.buffered.as_ref().unwrap();
1935        if self.cursor < rows.len() {
1936            let row = rows[self.cursor].clone();
1937            self.cursor += 1;
1938            return Ok(Some(row));
1939        }
1940        Ok(None)
1941    }
1942}
1943
1944struct RemoveOp {
1945    input: Box<dyn Operator>,
1946    items: Vec<RemoveSpec>,
1947    /// Eager-evaluated output, same rationale as
1948    /// [`SetPropertyOp::buffered`]: drain input and apply all
1949    /// removes to the store before any downstream operator
1950    /// starts. Lazy per-row apply would interleave writes with
1951    /// downstream scan caching.
1952    buffered: Option<Vec<Row>>,
1953    cursor: usize,
1954}
1955
1956impl RemoveOp {
1957    fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1958        Self {
1959            input,
1960            items,
1961            buffered: None,
1962            cursor: 0,
1963        }
1964    }
1965
1966    fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1967        let mut updated_nodes: HashSet<String> = HashSet::new();
1968        let mut updated_edges: HashSet<String> = HashSet::new();
1969        for item in &self.items {
1970            match item {
1971                RemoveSpec::Property { var, key } => match row.get_mut(var) {
1972                    // Null target (from OPTIONAL MATCH) is a
1973                    // no-op, matching Neo4j's REMOVE semantics.
1974                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1975                    Some(Value::Node(n)) => {
1976                        n.properties.remove(key);
1977                        updated_nodes.insert(var.clone());
1978                    }
1979                    Some(Value::Edge(e)) => {
1980                        e.properties.remove(key);
1981                        updated_edges.insert(var.clone());
1982                    }
1983                    _ => return Err(Error::UnboundVariable(var.clone())),
1984                },
1985                RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1986                    Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1987                    Some(Value::Node(n)) => {
1988                        n.labels.retain(|l| !labels.contains(l));
1989                        updated_nodes.insert(var.clone());
1990                    }
1991                    _ => return Err(Error::UnboundVariable(var.clone())),
1992                },
1993            }
1994        }
1995        for var in &updated_nodes {
1996            if let Some(Value::Node(n)) = row.get(var) {
1997                ctx.writer.put_node(n)?;
1998            }
1999        }
2000        for var in &updated_edges {
2001            if let Some(Value::Edge(e)) = row.get(var) {
2002                ctx.writer.put_edge(e)?;
2003            }
2004        }
2005        Ok(row)
2006    }
2007}
2008
2009impl Operator for RemoveOp {
2010    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2011        if self.buffered.is_none() {
2012            let mut input_rows: Vec<Row> = Vec::new();
2013            while let Some(row) = self.input.next(ctx)? {
2014                input_rows.push(row);
2015            }
2016            let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
2017            for row in input_rows {
2018                applied.push(self.apply_one(ctx, row)?);
2019            }
2020            self.buffered = Some(applied);
2021            self.cursor = 0;
2022        }
2023        let rows = self.buffered.as_ref().unwrap();
2024        if self.cursor < rows.len() {
2025            let row = rows[self.cursor].clone();
2026            self.cursor += 1;
2027            return Ok(Some(row));
2028        }
2029        Ok(None)
2030    }
2031}
2032
2033struct LoadCsvOp {
2034    input: Option<Box<dyn Operator>>,
2035    path_expr: Expr,
2036    var: String,
2037    with_headers: bool,
2038    rows: Option<Vec<Value>>,
2039    cursor: usize,
2040}
2041
2042impl LoadCsvOp {
2043    fn new(
2044        input: Option<Box<dyn Operator>>,
2045        path_expr: Expr,
2046        var: String,
2047        with_headers: bool,
2048    ) -> Self {
2049        Self {
2050            input,
2051            path_expr,
2052            var,
2053            with_headers,
2054            rows: None,
2055            cursor: 0,
2056        }
2057    }
2058
2059    fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
2060        let ectx = ctx.eval_ctx(base_row);
2061        let path_val = eval_expr(&self.path_expr, &ectx)?;
2062        let path = match path_val {
2063            Value::Property(Property::String(s)) => s,
2064            _ => return Err(Error::TypeMismatch),
2065        };
2066        let content = std::fs::read_to_string(&path).map_err(|e| {
2067            Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
2068        })?;
2069        let mut lines = content.lines();
2070        let headers: Option<Vec<String>> = if self.with_headers {
2071            lines
2072                .next()
2073                .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
2074        } else {
2075            None
2076        };
2077        let mut csv_rows = Vec::new();
2078        for line in lines {
2079            if line.trim().is_empty() {
2080                continue;
2081            }
2082            let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
2083            if let Some(hdrs) = &headers {
2084                let mut map = std::collections::HashMap::new();
2085                for (i, h) in hdrs.iter().enumerate() {
2086                    let val = fields.get(i).cloned().unwrap_or_default();
2087                    map.insert(h.clone(), Property::String(val));
2088                }
2089                csv_rows.push(Value::Property(Property::Map(map)));
2090            } else {
2091                let list: Vec<Value> = fields
2092                    .into_iter()
2093                    .map(|f| Value::Property(Property::String(f)))
2094                    .collect();
2095                csv_rows.push(Value::List(list));
2096            }
2097        }
2098        self.rows = Some(csv_rows);
2099        self.cursor = 0;
2100        Ok(())
2101    }
2102}
2103
2104impl Operator for LoadCsvOp {
2105    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2106        if self.rows.is_none() {
2107            let base = if let Some(input) = &mut self.input {
2108                match input.next(ctx)? {
2109                    Some(r) => r,
2110                    None => return Ok(None),
2111                }
2112            } else {
2113                Row::new()
2114            };
2115            self.load(ctx, &base)?;
2116        }
2117        let rows = self.rows.as_ref().unwrap();
2118        if self.cursor < rows.len() {
2119            let val = rows[self.cursor].clone();
2120            self.cursor += 1;
2121            let mut row = Row::new();
2122            row.insert(self.var.clone(), val);
2123            Ok(Some(row))
2124        } else {
2125            Ok(None)
2126        }
2127    }
2128}
2129
2130struct ForeachOp {
2131    input: Box<dyn Operator>,
2132    var: String,
2133    list_expr: Expr,
2134    set_assignments: Vec<SetAssignment>,
2135    remove_items: Vec<RemoveSpec>,
2136}
2137
2138impl ForeachOp {
2139    fn new(
2140        input: Box<dyn Operator>,
2141        var: String,
2142        list_expr: Expr,
2143        set_assignments: Vec<SetAssignment>,
2144        remove_items: Vec<RemoveSpec>,
2145    ) -> Self {
2146        Self {
2147            input,
2148            var,
2149            list_expr,
2150            set_assignments,
2151            remove_items,
2152        }
2153    }
2154}
2155
2156impl Operator for ForeachOp {
2157    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2158        let Some(row) = self.input.next(ctx)? else {
2159            return Ok(None);
2160        };
2161        let ectx = ctx.eval_ctx(&row);
2162        let list_val = eval_expr(&self.list_expr, &ectx)?;
2163        let items = match list_val {
2164            Value::List(items) => items,
2165            Value::Property(Property::List(props)) => {
2166                props.into_iter().map(Value::Property).collect()
2167            }
2168            Value::Null | Value::Property(Property::Null) => Vec::new(),
2169            _ => return Err(Error::TypeMismatch),
2170        };
2171        for item in items {
2172            let mut scratch = row.clone();
2173            scratch.insert(self.var.clone(), item);
2174            for a in &self.set_assignments {
2175                match a {
2176                    SetAssignment::Property { var, key, value } => {
2177                        let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
2178                        let prop = value_to_property(evaluated)?;
2179                        match scratch.get_mut(var) {
2180                            Some(Value::Node(n)) => {
2181                                n.properties.insert(key.clone(), prop);
2182                            }
2183                            Some(Value::Edge(e)) => {
2184                                e.properties.insert(key.clone(), prop);
2185                            }
2186                            _ => return Err(Error::UnboundVariable(var.clone())),
2187                        }
2188                    }
2189                    SetAssignment::Labels { var, labels } => {
2190                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2191                            for l in labels {
2192                                if !n.labels.contains(l) {
2193                                    n.labels.push(l.clone());
2194                                }
2195                            }
2196                        }
2197                    }
2198                    _ => {}
2199                }
2200            }
2201            for ri in &self.remove_items {
2202                match ri {
2203                    RemoveSpec::Property { var, key } => {
2204                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2205                            n.properties.remove(key);
2206                        } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
2207                            e.properties.remove(key);
2208                        }
2209                    }
2210                    RemoveSpec::Labels { var, labels } => {
2211                        if let Some(Value::Node(n)) = scratch.get_mut(var) {
2212                            n.labels.retain(|l| !labels.contains(l));
2213                        }
2214                    }
2215                }
2216            }
2217            // Flush mutated entities for each iteration
2218            for (_, val) in scratch.iter() {
2219                match val {
2220                    Value::Node(n) => ctx.writer.put_node(n)?,
2221                    Value::Edge(e) => ctx.writer.put_edge(e)?,
2222                    _ => {}
2223                }
2224            }
2225        }
2226        Ok(Some(row))
2227    }
2228}
2229
2230struct SeedRowOp {
2231    done: bool,
2232}
2233
2234impl Operator for SeedRowOp {
2235    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2236        if self.done {
2237            return Ok(None);
2238        }
2239        self.done = true;
2240        Ok(Some(Row::new()))
2241    }
2242}
2243
2244struct SeededRowOp {
2245    row: Option<Row>,
2246}
2247
2248impl Operator for SeededRowOp {
2249    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2250        Ok(self.row.take())
2251    }
2252}
2253
2254struct CallSubqueryOp {
2255    input: Box<dyn Operator>,
2256    body_plan: LogicalPlan,
2257    pending: Vec<Row>,
2258    pending_idx: usize,
2259}
2260
2261impl CallSubqueryOp {
2262    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
2263        Self {
2264            input,
2265            body_plan,
2266            pending: Vec::new(),
2267            pending_idx: 0,
2268        }
2269    }
2270}
2271
2272impl Operator for CallSubqueryOp {
2273    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2274        loop {
2275            if self.pending_idx < self.pending.len() {
2276                let row = self.pending[self.pending_idx].clone();
2277                self.pending_idx += 1;
2278                return Ok(Some(row));
2279            }
2280            let outer_row = match self.input.next(ctx)? {
2281                Some(r) => r,
2282                None => return Ok(None),
2283            };
2284            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2285            let mut results = Vec::new();
2286            while let Some(body_row) = body_op.next(ctx)? {
2287                let mut merged = outer_row.clone();
2288                for (k, v) in body_row {
2289                    merged.insert(k, v);
2290                }
2291                results.push(merged);
2292            }
2293            if results.is_empty() {
2294                continue;
2295            }
2296            self.pending = results;
2297            self.pending_idx = 0;
2298        }
2299    }
2300}
2301
2302/// Per-input-row left-join driver (see
2303/// `LogicalPlan::OptionalApply`). Replays `body_plan` once per
2304/// outer row with the row as its seed; forwards all emitted rows
2305/// merged with the outer bindings, and emits one null-fallback
2306/// row (outer bindings plus `null_vars` bound to Null) only when
2307/// the body produced zero rows for that input.
2308struct OptionalApplyOp {
2309    input: Box<dyn Operator>,
2310    body_plan: LogicalPlan,
2311    null_vars: Vec<String>,
2312    pending: Vec<Row>,
2313    pending_idx: usize,
2314}
2315
2316impl OptionalApplyOp {
2317    fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2318        Self {
2319            input,
2320            body_plan,
2321            null_vars,
2322            pending: Vec::new(),
2323            pending_idx: 0,
2324        }
2325    }
2326}
2327
2328impl Operator for OptionalApplyOp {
2329    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2330        loop {
2331            if self.pending_idx < self.pending.len() {
2332                let row = self.pending[self.pending_idx].clone();
2333                self.pending_idx += 1;
2334                return Ok(Some(row));
2335            }
2336            let outer_row = match self.input.next(ctx)? {
2337                Some(r) => r,
2338                None => return Ok(None),
2339            };
2340            let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2341            // Push the outer row onto `outer_rows` so Filter /
2342            // eval_expr inside the body can resolve outer
2343            // bindings (`OPTIONAL MATCH ... WHERE outer_var = x`).
2344            let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2345            stacked.push(&outer_row);
2346            stacked.extend_from_slice(ctx.outer_rows);
2347            let inner_ctx = ExecCtx {
2348                store: ctx.store,
2349                writer: ctx.writer,
2350                params: ctx.params,
2351                procedures: ctx.procedures,
2352                outer_rows: &stacked,
2353                tombstones: ctx.tombstones,
2354            };
2355            let mut results = Vec::new();
2356            while let Some(body_row) = body_op.next(&inner_ctx)? {
2357                let mut merged = outer_row.clone();
2358                for (k, v) in body_row {
2359                    merged.insert(k, v);
2360                }
2361                results.push(merged);
2362            }
2363            if results.is_empty() {
2364                let mut fallback = outer_row;
2365                for v in &self.null_vars {
2366                    fallback.insert(v.clone(), Value::Null);
2367                }
2368                return Ok(Some(fallback));
2369            }
2370            self.pending = results;
2371            self.pending_idx = 0;
2372        }
2373    }
2374}
2375
2376/// Runtime operator for `CALL ns.name[(args)] [YIELD ...]`.
2377///
2378/// Resolves the procedure against the [`ProcedureRegistry`] at
2379/// `next()` time, validates arity / types / form (implicit args,
2380/// `YIELD *`, in-query YIELD requirement), then scans the registered
2381/// data-table for rows whose input cells match the call arguments.
2382/// Each matching row is projected according to the YIELD spec and
2383/// merged into the upstream row.
2384///
2385/// Two shapes are supported at once:
2386/// * Standalone (`input = None`): the op is the full plan. It
2387///   runs the procedure exactly once (against a synthetic empty row)
2388///   and emits the matching rows directly.
2389/// * In-query (`input = Some(...)`): for each upstream row, runs
2390///   the procedure with args evaluated against that row and emits
2391///   one merged row per match. Procedures with zero declared output
2392///   columns act as a pass-through — the upstream row is forwarded
2393///   unchanged, matching Neo4j's side-effect-only semantics.
2394struct ProcedureCallOp {
2395    input: Option<Box<dyn Operator>>,
2396    qualified_name: Vec<String>,
2397    args: Option<Vec<Expr>>,
2398    yield_spec: Option<YieldSpec>,
2399    standalone: bool,
2400    buffered: Vec<Row>,
2401    buffered_idx: usize,
2402    // Only set for the standalone form, which drives itself off a
2403    // synthetic seed row exactly once.
2404    done: bool,
2405}
2406
2407impl ProcedureCallOp {
2408    fn new(
2409        input: Option<Box<dyn Operator>>,
2410        qualified_name: Vec<String>,
2411        args: Option<Vec<Expr>>,
2412        yield_spec: Option<YieldSpec>,
2413        standalone: bool,
2414    ) -> Self {
2415        Self {
2416            input,
2417            qualified_name,
2418            args,
2419            yield_spec,
2420            standalone,
2421            buffered: Vec::new(),
2422            buffered_idx: 0,
2423            done: false,
2424        }
2425    }
2426
2427    /// Resolve the projection list `(source_column, output_alias)`
2428    /// from the procedure signature and this op's yield spec. Also
2429    /// validates the spec — unknown YIELD columns, duplicate
2430    /// aliases, and `YIELD *` / no-YIELD in disallowed contexts all
2431    /// surface as `Error::Procedure`.
2432    fn resolve_projection(
2433        &self,
2434        proc: &crate::procedures::Procedure,
2435    ) -> Result<Vec<(String, String)>> {
2436        match &self.yield_spec {
2437            None => {
2438                if !self.standalone {
2439                    // In-query CALL with no YIELD: legal only for
2440                    // side-effect-only procedures (zero declared
2441                    // outputs). A procedure with outputs but no
2442                    // YIELD leaves those outputs unbound, so any
2443                    // downstream RETURN that references them would
2444                    // see `UndefinedVariable`; reject here so the
2445                    // error surfaces instead of silently emitting
2446                    // nulls.
2447                    if proc.outputs.is_empty() {
2448                        return Ok(Vec::new());
2449                    }
2450                    return Err(Error::Procedure(format!(
2451                        "procedure '{}' has outputs but no YIELD clause",
2452                        self.qualified_name.join(".")
2453                    )));
2454                }
2455                Ok(proc
2456                    .outputs
2457                    .iter()
2458                    .map(|o| (o.name.clone(), o.name.clone()))
2459                    .collect())
2460            }
2461            Some(YieldSpec::Star) => {
2462                if !self.standalone {
2463                    return Err(Error::Procedure(
2464                        "YIELD * is only allowed on standalone CALL".into(),
2465                    ));
2466                }
2467                Ok(proc
2468                    .outputs
2469                    .iter()
2470                    .map(|o| (o.name.clone(), o.name.clone()))
2471                    .collect())
2472            }
2473            Some(YieldSpec::Items(items)) => {
2474                let mut projection = Vec::with_capacity(items.len());
2475                let mut seen_aliases: std::collections::HashSet<String> =
2476                    std::collections::HashSet::new();
2477                for yi in items {
2478                    if !proc.outputs.iter().any(|o| o.name == yi.column) {
2479                        return Err(Error::Procedure(format!(
2480                            "procedure '{}' has no output column '{}'",
2481                            self.qualified_name.join("."),
2482                            yi.column
2483                        )));
2484                    }
2485                    let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2486                    if !seen_aliases.insert(alias.clone()) {
2487                        return Err(Error::Procedure(format!(
2488                            "variable '{alias}' already bound by YIELD"
2489                        )));
2490                    }
2491                    projection.push((yi.column.clone(), alias));
2492                }
2493                Ok(projection)
2494            }
2495        }
2496    }
2497
2498    /// Evaluate the call's argument list against `row`. For the
2499    /// implicit-args form (`args = None`), each declared input
2500    /// column's value comes from the per-query parameter map
2501    /// (keyed by the input-column name). Returns the argument
2502    /// values in declaration order. Raises `ProcedureError` on
2503    /// arity mismatch, type mismatch, or missing parameter.
2504    fn evaluate_args(
2505        &self,
2506        ctx: &ExecCtx,
2507        row: &Row,
2508        proc: &crate::procedures::Procedure,
2509    ) -> Result<Vec<Value>> {
2510        match &self.args {
2511            Some(exprs) => {
2512                if exprs.len() != proc.inputs.len() {
2513                    return Err(Error::Procedure(format!(
2514                        "procedure '{}' expects {} argument(s), got {}",
2515                        self.qualified_name.join("."),
2516                        proc.inputs.len(),
2517                        exprs.len()
2518                    )));
2519                }
2520                let eval_ctx = ctx.eval_ctx(row);
2521                let mut values = Vec::with_capacity(exprs.len());
2522                for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2523                    let v = eval_expr(expr, &eval_ctx)?;
2524                    if !spec.ty.accepts(&v) {
2525                        return Err(Error::Procedure(format!(
2526                            "argument '{}' has wrong type for procedure '{}'",
2527                            spec.name,
2528                            self.qualified_name.join(".")
2529                        )));
2530                    }
2531                    values.push(coerce_arg(v, spec.ty));
2532                }
2533                Ok(values)
2534            }
2535            None => {
2536                // Implicit-arg form only valid standalone.
2537                if !self.standalone {
2538                    return Err(Error::Procedure(
2539                        "in-query CALL requires explicit argument list".into(),
2540                    ));
2541                }
2542                let mut values = Vec::with_capacity(proc.inputs.len());
2543                for spec in &proc.inputs {
2544                    let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2545                        Error::Procedure(format!(
2546                            "missing parameter ${} for procedure '{}'",
2547                            spec.name,
2548                            self.qualified_name.join(".")
2549                        ))
2550                    })?;
2551                    if !spec.ty.accepts(&v) {
2552                        return Err(Error::Procedure(format!(
2553                            "parameter '{}' has wrong type",
2554                            spec.name
2555                        )));
2556                    }
2557                    values.push(coerce_arg(v, spec.ty));
2558                }
2559                Ok(values)
2560            }
2561        }
2562    }
2563
2564    /// Invoke the procedure once for `input_row` and emit zero or
2565    /// more output rows into `out`. Handles the "zero-output-column
2566    /// pass-through" case that keeps `MATCH (n) CALL test.doNothing()
2567    /// RETURN n.name` from filtering the match rows.
2568    fn invoke_once(
2569        &self,
2570        ctx: &ExecCtx,
2571        input_row: &Row,
2572        proc: &crate::procedures::Procedure,
2573        projection: &[(String, String)],
2574        out: &mut Vec<Row>,
2575    ) -> Result<()> {
2576        // Zero-output-column procedures are side-effect-only in
2577        // the TCK; they either suppress rows entirely (standalone)
2578        // or pass the input row through unchanged (in-query).
2579        if proc.outputs.is_empty() {
2580            if !self.standalone {
2581                out.push(input_row.clone());
2582            }
2583            return Ok(());
2584        }
2585        let args = self.evaluate_args(ctx, input_row, proc)?;
2586        // Write builtins (apoc.create.node, …) take the writer +
2587        // args directly and produce already-final rows, so we skip
2588        // the row_matches filter that read builtins use to look up
2589        // matching candidate rows from a static / read-derived set.
2590        let is_write = proc.is_write_builtin();
2591        let rows = if is_write {
2592            // The cfg gate must include every feature that owns a
2593            // write builtin — `is_write_builtin` returns true only
2594            // when one of those features is on, so widen the gate
2595            // when a new write namespace lands.
2596            #[cfg(any(feature = "apoc-create", feature = "apoc-refactor"))]
2597            {
2598                proc.resolve_write_rows(ctx.store, ctx.writer, &args)?
2599            }
2600            #[cfg(not(any(feature = "apoc-create", feature = "apoc-refactor")))]
2601            {
2602                let _ = (ctx, &args);
2603                return Err(Error::Procedure(
2604                    "write procedure dispatched in a non-write-apoc build".into(),
2605                ));
2606            }
2607        } else {
2608            proc.resolve_rows(ctx.store)?
2609        };
2610        for proc_row in &rows {
2611            if !is_write && !proc.row_matches(proc_row, &args) {
2612                continue;
2613            }
2614            let mut merged = if self.standalone {
2615                Row::new()
2616            } else {
2617                input_row.clone()
2618            };
2619            for (src, alias) in projection {
2620                let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2621                merged.insert(alias.clone(), v);
2622            }
2623            out.push(merged);
2624        }
2625        Ok(())
2626    }
2627}
2628
2629/// Cast an int to a float when the declared type is `FLOAT`. Other
2630/// declared types leave the value as-is (accept() has already
2631/// gated on kind) so the comparison in `row_matches` sees a
2632/// consistent shape.
2633fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2634    use crate::procedures::ProcType;
2635    if matches!(ty, ProcType::Float) {
2636        if let Value::Property(Property::Int64(n)) = v {
2637            return Value::Property(Property::Float64(n as f64));
2638        }
2639    }
2640    v
2641}
2642
2643impl Operator for ProcedureCallOp {
2644    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2645        loop {
2646            if self.buffered_idx < self.buffered.len() {
2647                let row = self.buffered[self.buffered_idx].clone();
2648                self.buffered_idx += 1;
2649                return Ok(Some(row));
2650            }
2651            self.buffered.clear();
2652            self.buffered_idx = 0;
2653
2654            let proc = match ctx.procedures.get(&self.qualified_name) {
2655                Some(p) => p,
2656                None => {
2657                    return Err(Error::Procedure(format!(
2658                        "procedure '{}' not found",
2659                        self.qualified_name.join(".")
2660                    )));
2661                }
2662            };
2663            let projection = self.resolve_projection(proc)?;
2664
2665            let input_row = match &mut self.input {
2666                Some(inp) => match inp.next(ctx)? {
2667                    Some(r) => r,
2668                    None => return Ok(None),
2669                },
2670                None => {
2671                    if self.done {
2672                        return Ok(None);
2673                    }
2674                    self.done = true;
2675                    Row::new()
2676                }
2677            };
2678
2679            let mut produced = Vec::new();
2680            self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2681            if produced.is_empty() {
2682                if self.input.is_some() {
2683                    continue;
2684                }
2685                return Ok(None);
2686            }
2687            self.buffered = produced;
2688        }
2689    }
2690}
2691
2692/// Pull a property map out of a value. Supports node / edge
2693/// (uses their live property map) and map values (parameter or
2694/// map literal). Null propagates as an empty map — `SET x = null`
2695/// is spec'd as a property clear / no-op and SET = <null-binding>
2696/// (from an unmatched OPTIONAL MATCH) matches that shape.
2697fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2698    match v {
2699        Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2700        Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2701        Value::Map(pairs) => pairs
2702            .iter()
2703            .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2704            .collect(),
2705        Value::Property(Property::Map(entries)) => Ok(entries
2706            .iter()
2707            .map(|(k, p)| (k.clone(), p.clone()))
2708            .collect()),
2709        Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2710        _ => Err(Error::InvalidSetValue),
2711    }
2712}
2713
2714fn value_to_property(v: Value) -> Result<Property> {
2715    match v {
2716        Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2717        Value::Property(p) => Ok(p),
2718        Value::Null => Ok(Property::Null),
2719        Value::List(items) => {
2720            let props: Vec<Property> = items
2721                .into_iter()
2722                .map(value_to_property)
2723                .collect::<Result<_>>()?;
2724            Ok(Property::List(props))
2725        }
2726        // Graph-aware `Value::Map` and graph elements can't be
2727        // stored as node / edge property values; SET will reject
2728        // them.
2729        Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2730            Err(Error::InvalidSetValue)
2731        }
2732    }
2733}
2734
2735struct NodeScanAllOp {
2736    var: String,
2737    ids: Option<Vec<NodeId>>,
2738    cursor: usize,
2739}
2740
2741impl NodeScanAllOp {
2742    fn new(var: String) -> Self {
2743        Self {
2744            var,
2745            ids: None,
2746            cursor: 0,
2747        }
2748    }
2749}
2750
2751impl Operator for NodeScanAllOp {
2752    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2753        if self.ids.is_none() {
2754            self.ids = Some(ctx.store.all_node_ids()?);
2755        }
2756        let ids = self.ids.as_ref().unwrap();
2757        while self.cursor < ids.len() {
2758            let id = ids[self.cursor];
2759            self.cursor += 1;
2760            if let Some(node) = ctx.store.get_node(id)? {
2761                let mut row = Row::new();
2762                row.insert(self.var.clone(), Value::Node(node));
2763                return Ok(Some(row));
2764            }
2765        }
2766        Ok(None)
2767    }
2768}
2769
2770struct NodeScanByLabelsOp {
2771    var: String,
2772    labels: Vec<String>,
2773    ids: Option<Vec<NodeId>>,
2774    cursor: usize,
2775}
2776
2777impl NodeScanByLabelsOp {
2778    fn new(var: String, labels: Vec<String>) -> Self {
2779        Self {
2780            var,
2781            labels,
2782            ids: None,
2783            cursor: 0,
2784        }
2785    }
2786}
2787
2788impl Operator for NodeScanByLabelsOp {
2789    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2790        if self.ids.is_none() {
2791            // Use the first label for the index scan, filter the rest per-node.
2792            let primary = self
2793                .labels
2794                .first()
2795                .expect("NodeScanByLabels must have at least one label");
2796            self.ids = Some(ctx.store.nodes_by_label(primary)?);
2797        }
2798        let ids = self.ids.as_ref().unwrap();
2799        while self.cursor < ids.len() {
2800            let id = ids[self.cursor];
2801            self.cursor += 1;
2802            if let Some(node) = ctx.store.get_node(id)? {
2803                if has_all_labels(&node, &self.labels) {
2804                    let mut row = Row::new();
2805                    row.insert(self.var.clone(), Value::Node(node));
2806                    return Ok(Some(row));
2807                }
2808            }
2809        }
2810        Ok(None)
2811    }
2812}
2813
2814fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2815    labels.iter().all(|l| node.labels.contains(l))
2816}
2817
2818/// Equality-lookup operator backed by a property index. Evaluates
2819/// the value expression lazily on the first `next()` — crucially
2820/// so parameters resolve against the per-query `ExecCtx::params`
2821/// map, not against a literal baked in at plan-construction time.
2822///
2823/// Unindexable value types (Float, List, Map, Null, or a Node/Edge
2824/// that slipped through) surface as `Error::InvalidSetValue`. The
2825/// planner only emits this op for indexes that do exist, so the
2826/// reader call should find a populated CF unless a concurrent DROP
2827/// raced us (in which case the result is just empty, which matches
2828/// how Neo4j's planner handles the race).
2829struct IndexSeekOp {
2830    var: String,
2831    label: String,
2832    properties: Vec<String>,
2833    value_exprs: Vec<Expr>,
2834    results: Option<Vec<NodeId>>,
2835    cursor: usize,
2836}
2837
2838impl IndexSeekOp {
2839    fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2840        assert_eq!(
2841            properties.len(),
2842            value_exprs.len(),
2843            "IndexSeekOp: properties and values must have equal length"
2844        );
2845        Self {
2846            var,
2847            label,
2848            properties,
2849            value_exprs,
2850            results: None,
2851            cursor: 0,
2852        }
2853    }
2854}
2855
2856impl Operator for IndexSeekOp {
2857    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2858        if self.results.is_none() {
2859            let empty = Row::new();
2860            let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2861            for expr in &self.value_exprs {
2862                let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2863                let property = match value {
2864                    Value::Property(p) => p,
2865                    Value::Null => Property::Null,
2866                    Value::Node(_)
2867                    | Value::Edge(_)
2868                    | Value::List(_)
2869                    | Value::Map(_)
2870                    | Value::Path { .. } => {
2871                        return Err(Error::InvalidSetValue);
2872                    }
2873                };
2874                values.push(property);
2875            }
2876            let ids = ctx
2877                .store
2878                .nodes_by_properties(&self.label, &self.properties, &values)?;
2879            self.results = Some(ids);
2880        }
2881        let ids = self.results.as_ref().unwrap();
2882        while self.cursor < ids.len() {
2883            let id = ids[self.cursor];
2884            self.cursor += 1;
2885            if let Some(node) = ctx.store.get_node(id)? {
2886                let mut row = Row::new();
2887                row.insert(self.var.clone(), Value::Node(node));
2888                return Ok(Some(row));
2889            }
2890        }
2891        Ok(None)
2892    }
2893}
2894
2895/// Physical operator for [`LogicalPlan::PointIndexSeek`]. Evaluates
2896/// its bounds once on first `next()` and drives
2897/// `reader.nodes_in_bbox(...)`.
2898///
2899/// For [`PointSeekBounds::Corners`] the operator passes the corners
2900/// through as-is and short-circuits on SRID mismatch / null / non-Point
2901/// corners — `point.withinbbox` returns null in those cases, which
2902/// excludes the row from a filter, so "no rows" is the same
2903/// observable outcome.
2904///
2905/// For [`PointSeekBounds::Radius`] the operator derives the enclosing
2906/// bbox from the center's SRID: Cartesian gets a `(cx ± r, cy ± r)`
2907/// square; WGS-84 converts `r` metres to lat/lon spans using the
2908/// `cos(lat)` factor for longitude. The enclosing bbox is always a
2909/// *superset* of the circle, so the planner keeps the original
2910/// distance predicate as a residual `Filter` above the seek — the
2911/// corners of the square that fall outside the circle are culled
2912/// there.
2913struct PointIndexSeekOp {
2914    var: String,
2915    label: String,
2916    property: String,
2917    bounds: PointSeekBounds,
2918    results: Option<Vec<NodeId>>,
2919    cursor: usize,
2920}
2921
2922impl PointIndexSeekOp {
2923    fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
2924        Self {
2925            var,
2926            label,
2927            property,
2928            bounds,
2929            results: None,
2930            cursor: 0,
2931        }
2932    }
2933}
2934
2935impl Operator for PointIndexSeekOp {
2936    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2937        if self.results.is_none() {
2938            let empty = Row::new();
2939            let ectx = ctx.eval_ctx(&empty);
2940            let ids = match &self.bounds {
2941                PointSeekBounds::Corners { lo, hi } => {
2942                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2943                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2944                    match (lo_pt, hi_pt) {
2945                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
2946                            &self.label,
2947                            &self.property,
2948                            lo.srid,
2949                            lo.x,
2950                            lo.y,
2951                            hi.x,
2952                            hi.y,
2953                        )?,
2954                        _ => Vec::new(),
2955                    }
2956                }
2957                PointSeekBounds::Radius { center, radius } => {
2958                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
2959                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2960                    match (center_pt, radius_val) {
2961                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2962                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2963                            ctx.store.nodes_in_bbox(
2964                                &self.label,
2965                                &self.property,
2966                                c.srid,
2967                                xlo,
2968                                ylo,
2969                                xhi,
2970                                yhi,
2971                            )?
2972                        }
2973                        // Null / non-point center or null / negative / NaN radius → no rows.
2974                        _ => Vec::new(),
2975                    }
2976                }
2977            };
2978            self.results = Some(ids);
2979        }
2980        let ids = self.results.as_ref().unwrap();
2981        while self.cursor < ids.len() {
2982            let id = ids[self.cursor];
2983            self.cursor += 1;
2984            if let Some(node) = ctx.store.get_node(id)? {
2985                let mut row = Row::new();
2986                row.insert(self.var.clone(), Value::Node(node));
2987                return Ok(Some(row));
2988            }
2989        }
2990        Ok(None)
2991    }
2992}
2993
2994fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
2995    match v {
2996        Value::Property(Property::Point(p)) => Some(*p),
2997        _ => None,
2998    }
2999}
3000
3001fn extract_f64(v: &Value) -> Option<f64> {
3002    match v {
3003        Value::Property(Property::Float64(f)) => Some(*f),
3004        Value::Property(Property::Int64(i)) => Some(*i as f64),
3005        _ => None,
3006    }
3007}
3008
3009/// Enclosing axis-aligned bbox for a circle of radius `r` around
3010/// `center`. Always a superset of the circle, so callers must still
3011/// apply a precision filter on distance. Cartesian SRIDs use a
3012/// straight `center ± r` square in the CRS's own units. Geographic
3013/// SRIDs (WGS-84, SRID 4326 / 4979) convert `r` metres to lat/lon
3014/// spans via the standard flat-earth approximation with a `cos(lat)`
3015/// longitude-span correction; the factor is clamped away from zero
3016/// so near-pole circles don't collapse into a zero-width bbox (the
3017/// pathological case falls back to the full longitude range).
3018fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
3019    if center.is_geographic() {
3020        // One degree of latitude ≈ 111_320 metres on a spherical
3021        // Earth. One degree of longitude shrinks by cos(latitude).
3022        const METRES_PER_DEG: f64 = 111_320.0;
3023        let dlat = r / METRES_PER_DEG;
3024        let cos_lat = center.y.to_radians().cos().abs();
3025        // `cos(lat)` goes to 0 at the poles; clamp so a polar
3026        // circle maps to a wide-but-finite longitude span rather
3027        // than a divide-by-zero.
3028        let cos_lat_floor = cos_lat.max(1.0e-6);
3029        let dlon = r / (METRES_PER_DEG * cos_lat_floor);
3030        (
3031            center.x - dlon,
3032            center.y - dlat,
3033            center.x + dlon,
3034            center.y + dlat,
3035        )
3036    } else {
3037        (center.x - r, center.y - r, center.x + r, center.y + r)
3038    }
3039}
3040
3041/// Relationship-scope analogue of [`IndexSeekOp`]. Seeks edges
3042/// through the `(edge_type, property)` index, hydrates each one,
3043/// and emits a row per matching edge binding `edge_var`, `src_var`,
3044/// `dst_var`. For `Direction::Both` each edge emits two rows — one
3045/// per orientation — so an undirected pattern surfaces both
3046/// endpoint assignments. `residual_properties` carries any
3047/// non-indexed pattern-property equalities that still need
3048/// per-edge filtering.
3049struct EdgeSeekOp {
3050    edge_var: String,
3051    src_var: String,
3052    dst_var: String,
3053    edge_type: String,
3054    property: String,
3055    value_expr: Expr,
3056    direction: Direction,
3057    residual_properties: Vec<(String, Expr)>,
3058    /// Pre-materialized output rows. Built lazily on the first
3059    /// `next()` call so the seek + endpoint fetches run once.
3060    results: Option<Vec<Row>>,
3061    cursor: usize,
3062}
3063
3064impl EdgeSeekOp {
3065    #[allow(clippy::too_many_arguments)]
3066    fn new(
3067        edge_var: String,
3068        src_var: String,
3069        dst_var: String,
3070        edge_type: String,
3071        property: String,
3072        value_expr: Expr,
3073        direction: Direction,
3074        residual_properties: Vec<(String, Expr)>,
3075    ) -> Self {
3076        Self {
3077            edge_var,
3078            src_var,
3079            dst_var,
3080            edge_type,
3081            property,
3082            value_expr,
3083            direction,
3084            residual_properties,
3085            results: None,
3086            cursor: 0,
3087        }
3088    }
3089}
3090
3091impl Operator for EdgeSeekOp {
3092    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3093        if self.results.is_none() {
3094            let empty = Row::new();
3095            let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
3096            let property = match seek_value {
3097                Value::Property(p) => p,
3098                Value::Null => Property::Null,
3099                Value::Node(_)
3100                | Value::Edge(_)
3101                | Value::List(_)
3102                | Value::Map(_)
3103                | Value::Path { .. } => {
3104                    return Err(Error::InvalidSetValue);
3105                }
3106            };
3107            let ids = ctx
3108                .store
3109                .edges_by_property(&self.edge_type, &self.property, &property)?;
3110            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3111            for id in ids {
3112                let Some(edge) = ctx.store.get_edge(id)? else {
3113                    continue;
3114                };
3115                // Residual pattern-property equality filters — same
3116                // shape as `EdgeExpandOp`'s inline edge-properties
3117                // check. Uses an empty row for expr evaluation since
3118                // an EdgeSeek has no upstream input and the seek's
3119                // own bindings aren't visible to its own filters.
3120                let mut residuals_ok = true;
3121                for (key, expr) in &self.residual_properties {
3122                    let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
3123                    let Some(stored) = edge.properties.get(key) else {
3124                        residuals_ok = false;
3125                        break;
3126                    };
3127                    if !values_equal(&Value::Property(stored.clone()), &wanted) {
3128                        residuals_ok = false;
3129                        break;
3130                    }
3131                }
3132                if !residuals_ok {
3133                    continue;
3134                }
3135                // Hydrate both endpoints so downstream operators
3136                // can read labels / properties off the bound vars.
3137                // A deleted endpoint drops the whole row (same as
3138                // `IndexSeekOp` when `get_node` returns None).
3139                let Some(src_node) = ctx.store.get_node(edge.source)? else {
3140                    continue;
3141                };
3142                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3143                    continue;
3144                };
3145                // Emit output rows per direction. Outgoing / Incoming
3146                // bind a single orientation; Both yields two rows —
3147                // the `-[r]-` pattern in openCypher matches edges in
3148                // either orientation and binds endpoints accordingly.
3149                match self.direction {
3150                    Direction::Outgoing => {
3151                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3152                    }
3153                    Direction::Incoming => {
3154                        rows.push(self.make_row(&edge, &dst_node, &src_node));
3155                    }
3156                    Direction::Both => {
3157                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3158                        // Self-loops still only surface once — the
3159                        // second orientation is the same row.
3160                        if edge.source != edge.target {
3161                            rows.push(self.make_row(&edge, &dst_node, &src_node));
3162                        }
3163                    }
3164                }
3165            }
3166            self.results = Some(rows);
3167        }
3168        let rows = self.results.as_ref().unwrap();
3169        if self.cursor < rows.len() {
3170            let row = rows[self.cursor].clone();
3171            self.cursor += 1;
3172            return Ok(Some(row));
3173        }
3174        Ok(None)
3175    }
3176}
3177
3178impl EdgeSeekOp {
3179    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3180        let mut row = Row::new();
3181        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3182        row.insert(self.src_var.clone(), Value::Node(src.clone()));
3183        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3184        row
3185    }
3186}
3187
3188/// Relationship-scope analogue of [`PointIndexSeekOp`]. Drives
3189/// `reader.edges_in_bbox(...)` off the bounds, then hydrates each
3190/// matching edge + its endpoints and emits rows binding
3191/// `edge_var`, `src_var`, `dst_var` — same shape as [`EdgeSeekOp`].
3192/// For `Direction::Both`, each edge emits two rows (one per
3193/// orientation), matching the `-[r]-` pattern semantics.
3194struct EdgePointIndexSeekOp {
3195    edge_var: String,
3196    src_var: String,
3197    dst_var: String,
3198    edge_type: String,
3199    property: String,
3200    direction: Direction,
3201    bounds: PointSeekBounds,
3202    results: Option<Vec<Row>>,
3203    cursor: usize,
3204}
3205
3206impl EdgePointIndexSeekOp {
3207    #[allow(clippy::too_many_arguments)]
3208    fn new(
3209        edge_var: String,
3210        src_var: String,
3211        dst_var: String,
3212        edge_type: String,
3213        property: String,
3214        direction: Direction,
3215        bounds: PointSeekBounds,
3216    ) -> Self {
3217        Self {
3218            edge_var,
3219            src_var,
3220            dst_var,
3221            edge_type,
3222            property,
3223            direction,
3224            bounds,
3225            results: None,
3226            cursor: 0,
3227        }
3228    }
3229
3230    fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3231        let mut row = Row::new();
3232        row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3233        row.insert(self.src_var.clone(), Value::Node(src.clone()));
3234        row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3235        row
3236    }
3237}
3238
3239impl Operator for EdgePointIndexSeekOp {
3240    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3241        if self.results.is_none() {
3242            let empty = Row::new();
3243            let ectx = ctx.eval_ctx(&empty);
3244            let ids = match &self.bounds {
3245                PointSeekBounds::Corners { lo, hi } => {
3246                    let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3247                    let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3248                    match (lo_pt, hi_pt) {
3249                        (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
3250                            &self.edge_type,
3251                            &self.property,
3252                            lo.srid,
3253                            lo.x,
3254                            lo.y,
3255                            hi.x,
3256                            hi.y,
3257                        )?,
3258                        _ => Vec::new(),
3259                    }
3260                }
3261                PointSeekBounds::Radius { center, radius } => {
3262                    let center_pt = extract_point(&eval_expr(center, &ectx)?);
3263                    let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3264                    match (center_pt, radius_val) {
3265                        (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3266                            let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3267                            ctx.store.edges_in_bbox(
3268                                &self.edge_type,
3269                                &self.property,
3270                                c.srid,
3271                                xlo,
3272                                ylo,
3273                                xhi,
3274                                yhi,
3275                            )?
3276                        }
3277                        _ => Vec::new(),
3278                    }
3279                }
3280            };
3281
3282            let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3283            for id in ids {
3284                let Some(edge) = ctx.store.get_edge(id)? else {
3285                    continue;
3286                };
3287                let Some(src_node) = ctx.store.get_node(edge.source)? else {
3288                    continue;
3289                };
3290                let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3291                    continue;
3292                };
3293                match self.direction {
3294                    Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
3295                    Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
3296                    Direction::Both => {
3297                        rows.push(self.make_row(&edge, &src_node, &dst_node));
3298                        if edge.source != edge.target {
3299                            rows.push(self.make_row(&edge, &dst_node, &src_node));
3300                        }
3301                    }
3302                }
3303            }
3304            self.results = Some(rows);
3305        }
3306        let rows = self.results.as_ref().unwrap();
3307        if self.cursor < rows.len() {
3308            let row = rows[self.cursor].clone();
3309            self.cursor += 1;
3310            return Ok(Some(row));
3311        }
3312        Ok(None)
3313    }
3314}
3315
3316fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3317    props.iter().all(|(k, v)| {
3318        node.properties
3319            .get(k)
3320            .map(|stored| stored == v)
3321            .unwrap_or(false)
3322    })
3323}
3324
3325struct MergeNodeOp {
3326    var: String,
3327    labels: Vec<String>,
3328    /// Pattern property expressions as they came from the planner. These
3329    /// stay as `Expr` because evaluation needs `ExecCtx::params`, which
3330    /// isn't available until `next()` is called.
3331    properties: Vec<(String, Expr)>,
3332    /// `ON CREATE SET ...` assignments — applied only when the
3333    /// merge took the create branch. Evaluated against a row
3334    /// `{var → Node}` so the value expressions can reference the
3335    /// just-created node.
3336    on_create: Vec<SetAssignment>,
3337    /// `ON MATCH SET ...` assignments — applied to every matched
3338    /// node when the merge took the match branch. Same row shape
3339    /// as `on_create`.
3340    on_match: Vec<SetAssignment>,
3341    /// Optional upstream operator. `None` means this is a
3342    /// top-level producer (`MERGE (n) RETURN n`) and emits
3343    /// rows with a fresh empty base. `Some` means this is a
3344    /// mid-chain clause (`MATCH (a) MERGE (b) RETURN a, b`)
3345    /// and each emitted row is a cross-join between an input
3346    /// row and a merge-result node.
3347    input: Option<Box<dyn Operator>>,
3348    /// Cached merge result. Populated on the first `next()`
3349    /// call by running the scan + maybe-create logic *once*,
3350    /// then reused for every input row. Running the merge
3351    /// exactly once sidesteps the read-after-write issue in
3352    /// buffered-writer mode (a node created by the first input
3353    /// row wouldn't be visible to a re-scan on the second).
3354    merged_nodes: Vec<Node>,
3355    /// Whether `merged_nodes` has been populated. The merge
3356    /// logic runs lazily on the first `next()` so
3357    /// `ExecCtx::params` is available.
3358    merge_done: bool,
3359    cursor: usize,
3360    /// Eager-evaluated output for the input-driven case: drain
3361    /// the entire input on first call, run merge for each row
3362    /// (writing to the store as we go), and accumulate the
3363    /// (input_row, merged_node) cross-products. Lazy per-row
3364    /// merging interleaved writes with downstream scan caching
3365    /// for `WITH *`-then-MATCH patterns. Only populated when
3366    /// `input.is_some()`; the standalone case (`input.is_none()`)
3367    /// is already eager via `merge_done`.
3368    input_buffered: Option<Vec<Row>>,
3369    input_cursor: usize,
3370}
3371
3372impl MergeNodeOp {
3373    fn new(
3374        input: Option<Box<dyn Operator>>,
3375        var: String,
3376        labels: Vec<String>,
3377        properties: Vec<(String, Expr)>,
3378        on_create: Vec<SetAssignment>,
3379        on_match: Vec<SetAssignment>,
3380    ) -> Self {
3381        Self {
3382            var,
3383            labels,
3384            properties,
3385            on_create,
3386            on_match,
3387            input,
3388            merged_nodes: Vec::new(),
3389            merge_done: false,
3390            cursor: 0,
3391            input_buffered: None,
3392            input_cursor: 0,
3393        }
3394    }
3395
3396    /// Run the MERGE logic exactly once: scan the store for
3397    /// existing matches, apply ON MATCH SET to each; or create
3398    /// a fresh node and apply ON CREATE SET; persist everything
3399    /// via `ctx.writer`; stash the resulting nodes in
3400    /// `self.merged_nodes`. Idempotent — subsequent calls are
3401    /// no-ops once `self.merge_done` is set.
3402    /// Resolve the pattern properties against `base`, scan the
3403    /// store, and either match existing nodes or create a fresh
3404    /// one. Returns the resulting node set — can be called
3405    /// multiple times with different `base` rows for
3406    /// input-driven merges.
3407    fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3408        let resolved_props: Vec<(String, Property)> = self
3409            .properties
3410            .iter()
3411            .map(|(k, expr)| {
3412                let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3413                Ok((k.clone(), value_to_property(v)?))
3414            })
3415            .collect::<Result<Vec<_>>>()?;
3416
3417        let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3418            ctx.store.nodes_by_label(primary)?
3419        } else {
3420            ctx.store.all_node_ids()?
3421        };
3422        let mut merged_nodes: Vec<Node> = Vec::new();
3423        for id in candidate_ids {
3424            if let Some(node) = ctx.store.get_node(id)? {
3425                if has_all_labels(&node, &self.labels)
3426                    && matches_pattern_props(&node, &resolved_props)
3427                {
3428                    merged_nodes.push(node);
3429                }
3430            }
3431        }
3432
3433        if merged_nodes.is_empty() {
3434            let mut node = Node::new();
3435            for label in &self.labels {
3436                node.labels.push(label.clone());
3437            }
3438            for (k, prop) in resolved_props {
3439                node.properties.insert(k, prop);
3440            }
3441            apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3442            ctx.writer.put_node(&node)?;
3443            merged_nodes.push(node);
3444        } else if !self.on_match.is_empty() {
3445            for node in merged_nodes.iter_mut() {
3446                apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3447                ctx.writer.put_node(node)?;
3448            }
3449        }
3450        Ok(merged_nodes)
3451    }
3452}
3453
3454impl Operator for MergeNodeOp {
3455    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3456        // Top-level producer: no upstream context, so the pattern
3457        // properties can only reference literals / parameters.
3458        // Run the merge once against an empty row, then emit
3459        // each result.
3460        if self.input.is_none() {
3461            if !self.merge_done {
3462                let empty = Row::new();
3463                let nodes = self.run_merge_for(ctx, &empty)?;
3464                self.merged_nodes = nodes;
3465                self.merge_done = true;
3466            }
3467            if self.cursor < self.merged_nodes.len() {
3468                let node = self.merged_nodes[self.cursor].clone();
3469                self.cursor += 1;
3470                let mut row = Row::new();
3471                row.insert(self.var.clone(), Value::Node(node));
3472                return Ok(Some(row));
3473            }
3474            return Ok(None);
3475        }
3476
3477        // Input-driven case: evaluate pattern properties *per
3478        // input row* so references like `MERGE (:City {name:
3479        // person.bornIn})` resolve against the bound `person`.
3480        // Eager evaluation — drain all input + run all merges +
3481        // cross-join with input rows before yielding any. Lazy
3482        // per-row merging interleaved writes with downstream
3483        // scan caching, which broke `WITH *`-then-MATCH chains.
3484        if self.input_buffered.is_none() {
3485            let mut input_rows: Vec<Row> = Vec::new();
3486            while let Some(row) = self.input.as_mut().unwrap().next(ctx)? {
3487                input_rows.push(row);
3488            }
3489            let mut output: Vec<Row> = Vec::new();
3490            for input_row in input_rows {
3491                let nodes = self.run_merge_for(ctx, &input_row)?;
3492                for node in nodes {
3493                    let mut out = input_row.clone();
3494                    out.insert(self.var.clone(), Value::Node(node));
3495                    output.push(out);
3496                }
3497            }
3498            self.input_buffered = Some(output);
3499            self.input_cursor = 0;
3500        }
3501        let rows = self.input_buffered.as_ref().unwrap();
3502        if self.input_cursor < rows.len() {
3503            let row = rows[self.input_cursor].clone();
3504            self.input_cursor += 1;
3505            return Ok(Some(row));
3506        }
3507        Ok(None)
3508    }
3509}
3510
3511/// Apply MERGE-conditional SET assignments (`ON CREATE` or
3512/// Find-or-create executor for edge MERGE
3513/// (`MERGE (a)-[r:KNOWS]->(b)`).
3514///
3515/// For every row pulled from `input`, looks up the `src_var`
3516/// and `dst_var` bindings (which must be `Value::Node` — the
3517/// planner enforces that they came from a prior MATCH or
3518/// MERGE), scans `src`'s outgoing edges, and either:
3519///
3520/// - Picks the first edge of type `edge_type` whose target is
3521///   `dst` and applies `on_match` to it, or
3522/// - Creates a fresh `Edge::new(edge_type, src, dst)`, applies
3523///   `on_create`, and persists it via `ctx.writer.put_edge`.
3524///
3525/// Either way, the resulting edge is bound into `edge_var` in
3526/// the output row and the row is emitted. v1 restrictions:
3527/// single directed hop, both endpoints already bound,
3528/// explicit relationship type.
3529struct MergeEdgeOp {
3530    input: Box<dyn Operator>,
3531    edge_var: String,
3532    src_var: String,
3533    dst_var: String,
3534    edge_type: String,
3535    undirected: bool,
3536    /// Inline edge property filter from the MERGE pattern
3537    /// (`[r:T {k: v}]`). Matched edges must satisfy every entry;
3538    /// the create branch stamps them onto the new edge.
3539    properties: Vec<(String, Expr)>,
3540    on_create: Vec<SetAssignment>,
3541    on_match: Vec<SetAssignment>,
3542    /// All output rows from the eager merge pass, queued for
3543    /// streaming. Populated on the first `next()` call: drain
3544    /// input, run merge for each row (writing to the store as
3545    /// we go), accumulate every (input_row × matched_edge)
3546    /// cross-product. Eager evaluation is required for
3547    /// `WITH *`-then-MATCH chains — see `next` impl for why.
3548    pending: std::collections::VecDeque<Row>,
3549    /// Set after the eager merge pass populates `pending` so
3550    /// subsequent `next()` calls don't re-drain the (already
3551    /// exhausted) input.
3552    drained: bool,
3553}
3554
3555impl MergeEdgeOp {
3556    #[allow(clippy::too_many_arguments)]
3557    fn new(
3558        input: Box<dyn Operator>,
3559        edge_var: String,
3560        src_var: String,
3561        dst_var: String,
3562        edge_type: String,
3563        undirected: bool,
3564        properties: Vec<(String, Expr)>,
3565        on_create: Vec<SetAssignment>,
3566        on_match: Vec<SetAssignment>,
3567    ) -> Self {
3568        Self {
3569            input,
3570            edge_var,
3571            src_var,
3572            dst_var,
3573            edge_type,
3574            undirected,
3575            properties,
3576            on_create,
3577            on_match,
3578            pending: std::collections::VecDeque::new(),
3579            drained: false,
3580        }
3581    }
3582}
3583
3584impl MergeEdgeOp {
3585    /// Per-input-row merge step: scan for matching edges, take
3586    /// either the match or create branch, push resulting output
3587    /// rows onto `out`. Factored out so the eager `next()` can
3588    /// run this for every drained input row before yielding.
3589    fn merge_for(&self, ctx: &ExecCtx, row: Row, out: &mut Vec<Row>) -> Result<()> {
3590        // Resolve src/dst. Both must be Value::Node — the
3591        // planner enforces that the variables came from an
3592        // earlier producer, so anything else is a bug or a
3593        // later-added feature that didn't update the check.
3594        let src_node = match row.get(&self.src_var) {
3595            Some(Value::Node(n)) => n.clone(),
3596            _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3597        };
3598        let dst_node = match row.get(&self.dst_var) {
3599            Some(Value::Node(n)) => n.clone(),
3600            _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3601        };
3602
3603        // Evaluate the inline edge property filter once per
3604        // input row. These are AST expressions so they can
3605        // reference outer bindings (`MERGE (a)-[r:T {k: a.v}]->(b)`).
3606        let required_props: Vec<(String, Property)> = self
3607            .properties
3608            .iter()
3609            .map(|(k, expr)| {
3610                let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3611                Ok((k.clone(), value_to_property(v)?))
3612            })
3613            .collect::<Result<Vec<_>>>()?;
3614        let edge_matches = |edge: &Edge| -> bool {
3615            required_props.iter().all(|(k, want)| {
3616                edge.properties
3617                    .get(k)
3618                    .map(|have| have == want)
3619                    .unwrap_or(false)
3620            })
3621        };
3622
3623        // Collect every edge of type `edge_type` from src to
3624        // dst (and, for undirected patterns, dst to src) that
3625        // also satisfies the inline property filter.
3626        let mut matched: Vec<Edge> = Vec::new();
3627        for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3628            if neighbor_id != dst_node.id {
3629                continue;
3630            }
3631            if let Some(edge) = ctx.store.get_edge(edge_id)? {
3632                if edge.edge_type == self.edge_type && edge_matches(&edge) {
3633                    matched.push(edge);
3634                }
3635            }
3636        }
3637        if self.undirected {
3638            for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3639                if neighbor_id != dst_node.id {
3640                    continue;
3641                }
3642                if let Some(edge) = ctx.store.get_edge(edge_id)? {
3643                    if edge.edge_type == self.edge_type && edge_matches(&edge) {
3644                        matched.push(edge);
3645                    }
3646                }
3647            }
3648        }
3649
3650        if matched.is_empty() {
3651            let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3652            for (k, p) in &required_props {
3653                new_edge.properties.insert(k.clone(), p.clone());
3654            }
3655            let mut row_out = row.clone();
3656            apply_merge_edge_actions(
3657                &mut new_edge,
3658                &self.on_create,
3659                &self.edge_var,
3660                ctx,
3661                &mut row_out,
3662            )?;
3663            ctx.writer.put_edge(&new_edge)?;
3664            row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3665            out.push(row_out);
3666        } else {
3667            for mut existing in matched {
3668                let mut row_out = row.clone();
3669                if !self.on_match.is_empty() {
3670                    apply_merge_edge_actions(
3671                        &mut existing,
3672                        &self.on_match,
3673                        &self.edge_var,
3674                        ctx,
3675                        &mut row_out,
3676                    )?;
3677                    ctx.writer.put_edge(&existing)?;
3678                }
3679                row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3680                out.push(row_out);
3681            }
3682        }
3683        Ok(())
3684    }
3685}
3686
3687impl Operator for MergeEdgeOp {
3688    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3689        // Eager evaluation — drain all input + run all merges
3690        // (writing to the store) before yielding any row. Lazy
3691        // per-input-row merging interleaved writes with
3692        // downstream scan caching for `WITH *`-then-MATCH chains.
3693        if self.pending.is_empty() && !self.drained {
3694            let mut input_rows: Vec<Row> = Vec::new();
3695            while let Some(row) = self.input.next(ctx)? {
3696                input_rows.push(row);
3697            }
3698            let mut out: Vec<Row> = Vec::new();
3699            for row in input_rows {
3700                self.merge_for(ctx, row, &mut out)?;
3701            }
3702            self.pending.extend(out);
3703            self.drained = true;
3704        }
3705        Ok(self.pending.pop_front())
3706    }
3707}
3708
3709/// Edge-side counterpart of [`apply_merge_actions`]. Evaluates
3710/// each `SetAssignment` against the outer `row` (augmented with
3711/// the current edge binding) and mutates either the edge itself
3712/// or a non-edge target node from the outer row. MERGE's ON
3713/// CREATE / ON MATCH clauses are scoped against the whole input
3714/// row, so `MERGE (a)-[:T]->(b) ON CREATE SET b.k = 1` has to
3715/// reach outside the edge-local binding. Non-edge mutations
3716/// are persisted via `ctx.writer.put_node` so the change is
3717/// visible to later clauses.
3718fn apply_merge_edge_actions(
3719    edge: &mut Edge,
3720    actions: &[SetAssignment],
3721    var: &str,
3722    exec_ctx: &ExecCtx,
3723    outer: &mut Row,
3724) -> Result<()> {
3725    if actions.is_empty() {
3726        return Ok(());
3727    }
3728    // Edge binding is live in `outer` while we evaluate — RHS can
3729    // reference both the edge and any sibling node binding.
3730    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3731    for action in actions {
3732        match action {
3733            SetAssignment::Property {
3734                var: target,
3735                key,
3736                value,
3737            } => {
3738                let sub_ctx = exec_ctx.eval_ctx(outer);
3739                let evaluated = eval_expr(value, &sub_ctx)?;
3740                let prop = value_to_property(evaluated)?;
3741                if target == var {
3742                    if matches!(prop, Property::Null) {
3743                        edge.properties.remove(key);
3744                    } else {
3745                        edge.properties.insert(key.clone(), prop);
3746                    }
3747                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3748                } else {
3749                    apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3750                }
3751            }
3752            SetAssignment::Merge {
3753                var: target,
3754                properties,
3755            } => {
3756                let sub_ctx = exec_ctx.eval_ctx(outer);
3757                let resolved: Vec<(String, Property)> = properties
3758                    .iter()
3759                    .map(|(k, expr)| {
3760                        let v = eval_expr(expr, &sub_ctx)?;
3761                        Ok((k.clone(), value_to_property(v)?))
3762                    })
3763                    .collect::<Result<Vec<_>>>()?;
3764                if target == var {
3765                    for (k, p) in resolved {
3766                        edge.properties.insert(k, p);
3767                    }
3768                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3769                } else {
3770                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3771                }
3772            }
3773            SetAssignment::Replace {
3774                var: target,
3775                properties,
3776            } => {
3777                let sub_ctx = exec_ctx.eval_ctx(outer);
3778                let resolved: Vec<(String, Property)> = properties
3779                    .iter()
3780                    .map(|(k, expr)| {
3781                        let v = eval_expr(expr, &sub_ctx)?;
3782                        Ok((k.clone(), value_to_property(v)?))
3783                    })
3784                    .collect::<Result<Vec<_>>>()?;
3785                if target == var {
3786                    edge.properties.clear();
3787                    for (k, p) in resolved {
3788                        edge.properties.insert(k, p);
3789                    }
3790                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3791                } else {
3792                    apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3793                }
3794            }
3795            SetAssignment::Labels {
3796                var: target,
3797                labels,
3798            } => {
3799                if target == var {
3800                    // Edges don't carry labels.
3801                    return Err(Error::UnboundVariable(target.clone()));
3802                }
3803                apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3804            }
3805            SetAssignment::ReplaceFromExpr {
3806                var: target,
3807                source,
3808                replace,
3809            } => {
3810                let sub_ctx = exec_ctx.eval_ctx(outer);
3811                let v = eval_expr(source, &sub_ctx)?;
3812                let props = extract_property_map(&v)?;
3813                if target == var {
3814                    if *replace {
3815                        edge.properties.clear();
3816                    }
3817                    for (k, p) in props {
3818                        edge.properties.insert(k, p);
3819                    }
3820                    outer.insert(var.to_string(), Value::Edge(edge.clone()));
3821                } else {
3822                    apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3823                }
3824            }
3825        }
3826    }
3827    Ok(())
3828}
3829
3830/// Apply a single `SET target.key = prop` to a node or edge bound
3831/// in the outer row. Used by MERGE's ON CREATE / ON MATCH when
3832/// the target isn't the merge edge itself (the common case being
3833/// `MERGE (a)-[:R]->(b) ON CREATE SET b.k = v`).
3834fn apply_set_prop_to_outer(
3835    outer: &mut Row,
3836    exec_ctx: &ExecCtx,
3837    target: &str,
3838    key: &str,
3839    prop: Property,
3840) -> Result<()> {
3841    match outer.get_mut(target) {
3842        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3843            // SET on a null target (typically an unmatched OPTIONAL
3844            // MATCH binding) is a silent no-op in openCypher.
3845            return Ok(());
3846        }
3847        Some(Value::Node(n)) => {
3848            if matches!(prop, Property::Null) {
3849                n.properties.remove(key);
3850            } else {
3851                n.properties.insert(key.to_string(), prop);
3852            }
3853            exec_ctx.writer.put_node(n)?;
3854        }
3855        Some(Value::Edge(e)) => {
3856            if matches!(prop, Property::Null) {
3857                e.properties.remove(key);
3858            } else {
3859                e.properties.insert(key.to_string(), prop);
3860            }
3861            exec_ctx.writer.put_edge(e)?;
3862        }
3863        _ => return Err(Error::UnboundVariable(target.to_string())),
3864    }
3865    Ok(())
3866}
3867
3868/// Apply a property-map assignment (`SET target = {..}` when
3869/// `replace`, or `SET target += {..}` when not) to a node or
3870/// edge bound in the outer row.
3871fn apply_set_map_to_outer(
3872    outer: &mut Row,
3873    exec_ctx: &ExecCtx,
3874    target: &str,
3875    props: Vec<(String, Property)>,
3876    replace: bool,
3877) -> Result<()> {
3878    match outer.get_mut(target) {
3879        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3880        Some(Value::Node(n)) => {
3881            if replace {
3882                n.properties.clear();
3883            }
3884            for (k, p) in props {
3885                if replace || !matches!(p, Property::Null) {
3886                    n.properties.insert(k, p);
3887                } else {
3888                    n.properties.remove(&k);
3889                }
3890            }
3891            exec_ctx.writer.put_node(n)?;
3892            Ok(())
3893        }
3894        Some(Value::Edge(e)) => {
3895            if replace {
3896                e.properties.clear();
3897            }
3898            for (k, p) in props {
3899                if replace || !matches!(p, Property::Null) {
3900                    e.properties.insert(k, p);
3901                } else {
3902                    e.properties.remove(&k);
3903                }
3904            }
3905            exec_ctx.writer.put_edge(e)?;
3906            Ok(())
3907        }
3908        _ => Err(Error::UnboundVariable(target.to_string())),
3909    }
3910}
3911
3912/// Apply a labels assignment to a node bound in the outer row.
3913fn apply_set_labels_to_outer(
3914    outer: &mut Row,
3915    exec_ctx: &ExecCtx,
3916    target: &str,
3917    labels: &[String],
3918) -> Result<()> {
3919    match outer.get_mut(target) {
3920        Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3921        Some(Value::Node(n)) => {
3922            for label in labels {
3923                if !n.labels.contains(label) {
3924                    n.labels.push(label.clone());
3925                }
3926            }
3927            exec_ctx.writer.put_node(n)?;
3928            Ok(())
3929        }
3930        _ => Err(Error::UnboundVariable(target.to_string())),
3931    }
3932}
3933
3934/// `ON MATCH`) to `node` in place. Mirrors the `SetPropertyOp`
3935/// dispatch but specialized to a single bound variable so we
3936/// don't have to materialize a full row dispatcher.
3937///
3938/// Value expressions are evaluated against a temporary row
3939/// `{var → Node(node.clone())}` so the RHS can reference the
3940/// node's existing properties — `MERGE (n) ON MATCH SET n.hits = n.hits + 1`
3941/// works the same as if the SET ran in a SetPropertyOp pipeline.
3942fn apply_merge_actions(
3943    node: &mut Node,
3944    actions: &[SetAssignment],
3945    var: &str,
3946    exec_ctx: &ExecCtx,
3947    base_row: &Row,
3948) -> Result<()> {
3949    if actions.is_empty() {
3950        return Ok(());
3951    }
3952    // Start from the upstream row so `ON CREATE SET n.prop = other.field`
3953    // can resolve `other` — then overlay the merged node under `var`.
3954    let mut row = base_row.clone();
3955    row.insert(var.to_string(), Value::Node(node.clone()));
3956    for action in actions {
3957        let sub_ctx = exec_ctx.eval_ctx(&row);
3958        match action {
3959            SetAssignment::Property {
3960                var: target,
3961                key,
3962                value,
3963            } => {
3964                if target != var {
3965                    return Err(Error::UnboundVariable(target.clone()));
3966                }
3967                let evaluated = eval_expr(value, &sub_ctx)?;
3968                let prop = value_to_property(evaluated)?;
3969                node.properties.insert(key.clone(), prop);
3970                row.insert(var.to_string(), Value::Node(node.clone()));
3971            }
3972            SetAssignment::Labels {
3973                var: target,
3974                labels,
3975            } => {
3976                if target != var {
3977                    return Err(Error::UnboundVariable(target.clone()));
3978                }
3979                for label in labels {
3980                    if !node.labels.contains(label) {
3981                        node.labels.push(label.clone());
3982                    }
3983                }
3984                row.insert(var.to_string(), Value::Node(node.clone()));
3985            }
3986            SetAssignment::Replace {
3987                var: target,
3988                properties,
3989            } => {
3990                if target != var {
3991                    return Err(Error::UnboundVariable(target.clone()));
3992                }
3993                let resolved: Vec<(String, Property)> = properties
3994                    .iter()
3995                    .map(|(k, expr)| {
3996                        let v = eval_expr(expr, &sub_ctx)?;
3997                        Ok((k.clone(), value_to_property(v)?))
3998                    })
3999                    .collect::<Result<Vec<_>>>()?;
4000                node.properties.clear();
4001                for (k, p) in resolved {
4002                    node.properties.insert(k, p);
4003                }
4004                row.insert(var.to_string(), Value::Node(node.clone()));
4005            }
4006            SetAssignment::Merge {
4007                var: target,
4008                properties,
4009            } => {
4010                if target != var {
4011                    return Err(Error::UnboundVariable(target.clone()));
4012                }
4013                let resolved: Vec<(String, Property)> = properties
4014                    .iter()
4015                    .map(|(k, expr)| {
4016                        let v = eval_expr(expr, &sub_ctx)?;
4017                        Ok((k.clone(), value_to_property(v)?))
4018                    })
4019                    .collect::<Result<Vec<_>>>()?;
4020                for (k, p) in resolved {
4021                    node.properties.insert(k, p);
4022                }
4023                row.insert(var.to_string(), Value::Node(node.clone()));
4024            }
4025            SetAssignment::ReplaceFromExpr {
4026                var: target,
4027                source,
4028                replace,
4029            } => {
4030                if target != var {
4031                    return Err(Error::UnboundVariable(target.clone()));
4032                }
4033                let v = eval_expr(source, &sub_ctx)?;
4034                let props = extract_property_map(&v)?;
4035                if *replace {
4036                    node.properties.clear();
4037                }
4038                for (k, p) in props {
4039                    node.properties.insert(k, p);
4040                }
4041                row.insert(var.to_string(), Value::Node(node.clone()));
4042            }
4043        }
4044    }
4045    Ok(())
4046}
4047
4048struct EdgeExpandOp {
4049    input: Box<dyn Operator>,
4050    src_var: String,
4051    edge_var: Option<String>,
4052    dst_var: String,
4053    dst_labels: Vec<String>,
4054    edge_properties: Vec<(String, Expr)>,
4055    edge_types: Vec<String>,
4056    direction: Direction,
4057    /// When set, only the specific edge whose id matches the
4058    /// row-bound value counts as a match. Used by fresh-scan
4059    /// MATCH patterns that reuse an edge variable from a prior
4060    /// clause so the new hop stays an existence check instead of
4061    /// rebinding and clobbering the outer edge.
4062    edge_constraint_var: Option<String>,
4063    current_row: Option<Row>,
4064    pending: Vec<(EdgeId, NodeId)>,
4065    pending_idx: usize,
4066}
4067
4068impl EdgeExpandOp {
4069    #[allow(clippy::too_many_arguments)]
4070    fn new(
4071        input: Box<dyn Operator>,
4072        src_var: String,
4073        edge_var: Option<String>,
4074        dst_var: String,
4075        dst_labels: Vec<String>,
4076        edge_properties: Vec<(String, Expr)>,
4077        edge_types: Vec<String>,
4078        direction: Direction,
4079        edge_constraint_var: Option<String>,
4080    ) -> Self {
4081        Self {
4082            input,
4083            src_var,
4084            edge_var,
4085            dst_var,
4086            dst_labels,
4087            edge_properties,
4088            edge_types,
4089            direction,
4090            edge_constraint_var,
4091            current_row: None,
4092            pending: Vec::new(),
4093            pending_idx: 0,
4094        }
4095    }
4096}
4097
4098impl Operator for EdgeExpandOp {
4099    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4100        loop {
4101            while self.pending_idx < self.pending.len() {
4102                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4103                self.pending_idx += 1;
4104
4105                let edge = match ctx.store.get_edge(edge_id)? {
4106                    Some(e) => e,
4107                    None => continue,
4108                };
4109                if !self.edge_types.is_empty()
4110                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4111                {
4112                    continue;
4113                }
4114                // Pre-bound edge constraint: only accept the edge
4115                // whose id matches the row-bound value. Falls
4116                // through to outer scopes so a fresh right-side
4117                // scan of a CartesianProduct can still see the
4118                // bound edge from the left side. Non-edge / null
4119                // bindings trigger no matches at all — the
4120                // expansion yields nothing for that input row.
4121                if let Some(constraint_var) = &self.edge_constraint_var {
4122                    let base = self
4123                        .current_row
4124                        .as_ref()
4125                        .expect("pending edges without source row");
4126                    let expected = match ctx.lookup_binding(base, constraint_var) {
4127                        Some(Value::Edge(e)) => Some(e.id),
4128                        _ => None,
4129                    };
4130                    match expected {
4131                        Some(id) if id != edge.id => continue,
4132                        None => continue,
4133                        _ => {}
4134                    }
4135                }
4136                // Inline edge property filter: every (key, value)
4137                // must match the traversed edge's property of the
4138                // same name via `=` equality. Missing keys fail the
4139                // check (matching Neo4j).
4140                if !self.edge_properties.is_empty() {
4141                    let base = self
4142                        .current_row
4143                        .as_ref()
4144                        .expect("pending edges without source row");
4145                    let ectx = ctx.eval_ctx(base);
4146                    let mut ok = true;
4147                    for (key, expr) in &self.edge_properties {
4148                        let expected = eval_expr(expr, &ectx)?;
4149                        let actual = match edge.properties.get(key) {
4150                            Some(v) => Value::Property(v.clone()),
4151                            None => {
4152                                ok = false;
4153                                break;
4154                            }
4155                        };
4156                        if !values_equal(&actual, &expected) {
4157                            ok = false;
4158                            break;
4159                        }
4160                    }
4161                    if !ok {
4162                        continue;
4163                    }
4164                }
4165
4166                let neighbor = match ctx.store.get_node(neighbor_id)? {
4167                    Some(n) => n,
4168                    None => continue,
4169                };
4170                if !has_all_labels(&neighbor, &self.dst_labels) {
4171                    continue;
4172                }
4173
4174                let base = self
4175                    .current_row
4176                    .as_ref()
4177                    .expect("pending edges without source row");
4178                let mut out = base.clone();
4179                if let Some(ev) = &self.edge_var {
4180                    out.insert(ev.clone(), Value::Edge(edge));
4181                }
4182                out.insert(self.dst_var.clone(), Value::Node(neighbor));
4183                return Ok(Some(out));
4184            }
4185
4186            match self.input.next(ctx)? {
4187                None => return Ok(None),
4188                Some(row) => {
4189                    let src_id = match row.get(&self.src_var) {
4190                        Some(Value::Node(n)) => n.id,
4191                        // A null source (e.g. from OPTIONAL MATCH
4192                        // that matched nothing) drops the input
4193                        // row — `MATCH (a)-->(b)` against a null
4194                        // `a` is just empty, not an error.
4195                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4196                            continue
4197                        }
4198                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4199                    };
4200                    self.pending = match self.direction {
4201                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
4202                        Direction::Incoming => ctx.store.incoming(src_id)?,
4203                        Direction::Both => {
4204                            // For undirected traversal, a self-loop
4205                            // appears once as outgoing and once as
4206                            // incoming. The Cypher spec visits each
4207                            // physical edge once, so dedupe by
4208                            // edge_id before emitting.
4209                            let mut all = ctx.store.outgoing(src_id)?;
4210                            let mut seen: std::collections::HashSet<EdgeId> =
4211                                all.iter().map(|(e, _)| *e).collect();
4212                            for (e, n) in ctx.store.incoming(src_id)? {
4213                                if seen.insert(e) {
4214                                    all.push((e, n));
4215                                }
4216                            }
4217                            all
4218                        }
4219                    };
4220                    self.pending_idx = 0;
4221                    self.current_row = Some(row);
4222                }
4223            }
4224        }
4225    }
4226}
4227
4228/// Left-join variant of [`EdgeExpandOp`]. For each input row,
4229/// expands the adjacency in the configured direction and
4230/// filters by `edge_type` / `dst_labels` — if **any** neighbor
4231/// survives the filters, emits rows exactly like
4232/// `EdgeExpandOp`. If **zero** neighbors survive, emits one row
4233/// that carries the input row's bindings plus `edge_var` /
4234/// `dst_var` set to `Value::Null`, preserving the input row in
4235/// the output stream. This is the left-outer-join semantics
4236/// OPTIONAL MATCH needs.
4237///
4238/// Tracks per-input-row whether any output was produced so the
4239/// fallback Null row is only emitted after the pending buffer
4240/// drains without yielding anything. The `yielded_for_current`
4241/// flag is reset whenever a new input row is pulled.
4242struct OptionalEdgeExpandOp {
4243    input: Box<dyn Operator>,
4244    src_var: String,
4245    edge_var: Option<String>,
4246    dst_var: String,
4247    dst_labels: Vec<String>,
4248    dst_properties: Vec<(String, Expr)>,
4249    edge_types: Vec<String>,
4250    direction: Direction,
4251    /// When set, edges whose target id differs from the node
4252    /// bound at this variable in the current row are skipped
4253    /// inside the expansion loop. A row whose outgoing edges all
4254    /// fail the constraint then triggers the same null-fallback
4255    /// as having no edges at all.
4256    dst_constraint_var: Option<String>,
4257    /// When set, the expansion only considers the single edge
4258    /// whose id matches the edge already bound at this row
4259    /// variable — used when the OPTIONAL MATCH pattern reuses
4260    /// an edge variable from a prior clause.
4261    edge_constraint_var: Option<String>,
4262    current_row: Option<Row>,
4263    pending: Vec<(EdgeId, NodeId)>,
4264    pending_idx: usize,
4265    yielded_for_current: bool,
4266}
4267
4268impl OptionalEdgeExpandOp {
4269    #[allow(clippy::too_many_arguments)]
4270    fn new(
4271        input: Box<dyn Operator>,
4272        src_var: String,
4273        edge_var: Option<String>,
4274        dst_var: String,
4275        dst_labels: Vec<String>,
4276        dst_properties: Vec<(String, Expr)>,
4277        edge_types: Vec<String>,
4278        direction: Direction,
4279        dst_constraint_var: Option<String>,
4280        edge_constraint_var: Option<String>,
4281    ) -> Self {
4282        Self {
4283            input,
4284            src_var,
4285            edge_var,
4286            dst_var,
4287            dst_labels,
4288            dst_properties,
4289            edge_types,
4290            direction,
4291            dst_constraint_var,
4292            edge_constraint_var,
4293            current_row: None,
4294            pending: Vec::new(),
4295            pending_idx: 0,
4296            yielded_for_current: false,
4297        }
4298    }
4299}
4300
4301impl Operator for OptionalEdgeExpandOp {
4302    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4303        loop {
4304            while self.pending_idx < self.pending.len() {
4305                let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4306                self.pending_idx += 1;
4307
4308                let edge = match ctx.store.get_edge(edge_id)? {
4309                    Some(e) => e,
4310                    None => continue,
4311                };
4312                if !self.edge_types.is_empty()
4313                    && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4314                {
4315                    continue;
4316                }
4317                // Pre-bound edge constraint: only the specific
4318                // edge whose id matches the row-bound value
4319                // counts as a match. Falls through to outer
4320                // scopes so the constraint works for fresh
4321                // scans on the right side of a CartesianProduct.
4322                if let Some(constraint_var) = &self.edge_constraint_var {
4323                    let base = self
4324                        .current_row
4325                        .as_ref()
4326                        .expect("pending without source row");
4327                    let expected = match ctx.lookup_binding(base, constraint_var) {
4328                        Some(Value::Edge(e)) => Some(e.id),
4329                        _ => None,
4330                    };
4331                    match expected {
4332                        Some(id) if id != edge.id => continue,
4333                        None => continue,
4334                        _ => {}
4335                    }
4336                }
4337
4338                let neighbor = match ctx.store.get_node(neighbor_id)? {
4339                    Some(n) => n,
4340                    None => continue,
4341                };
4342                if !has_all_labels(&neighbor, &self.dst_labels) {
4343                    continue;
4344                }
4345                // Bound-endpoint constraint: when the declared
4346                // target is already bound in the row, only edges
4347                // that lead to that exact node count as a match.
4348                // Edges failing the constraint are silently
4349                // skipped — if every candidate fails, the
4350                // per-row left-join fallback below still fires.
4351                if let Some(constraint_var) = &self.dst_constraint_var {
4352                    let base = self
4353                        .current_row
4354                        .as_ref()
4355                        .expect("pending without source row");
4356                    let bound_id = match base.get(constraint_var) {
4357                        Some(Value::Node(n)) => Some(n.id),
4358                        Some(Value::Null)
4359                        | Some(Value::Property(meshdb_core::Property::Null))
4360                        | None => None,
4361                        _ => None,
4362                    };
4363                    match bound_id {
4364                        Some(id) if id != neighbor.id => continue,
4365                        None => continue,
4366                        _ => {}
4367                    }
4368                }
4369                if !self.dst_properties.is_empty() {
4370                    let base = self
4371                        .current_row
4372                        .as_ref()
4373                        .expect("pending without source row");
4374                    let ectx = ctx.eval_ctx(base);
4375                    let mut props_ok = true;
4376                    for (key, expr) in &self.dst_properties {
4377                        let expected = eval_expr(expr, &ectx)?;
4378                        let actual = neighbor
4379                            .properties
4380                            .get(key)
4381                            .cloned()
4382                            .map(Value::Property)
4383                            .unwrap_or(Value::Null);
4384                        if !values_equal(&expected, &actual) {
4385                            props_ok = false;
4386                            break;
4387                        }
4388                    }
4389                    if !props_ok {
4390                        continue;
4391                    }
4392                }
4393
4394                let base = self
4395                    .current_row
4396                    .as_ref()
4397                    .expect("pending edges without source row");
4398                let mut out = base.clone();
4399                if let Some(ev) = &self.edge_var {
4400                    out.insert(ev.clone(), Value::Edge(edge));
4401                }
4402                out.insert(self.dst_var.clone(), Value::Node(neighbor));
4403                self.yielded_for_current = true;
4404                return Ok(Some(out));
4405            }
4406
4407            // Pending drained for the current row. If nothing was
4408            // yielded, emit the left-join fallback: preserve the
4409            // input row with the optional variables set to Null.
4410            //
4411            // Exception: when an edge / dst constraint names a
4412            // variable that was pre-bound in the input row, the
4413            // fallback must NOT clobber that variable — the
4414            // constraint turns the expansion into an existence
4415            // check and the outer value should survive through.
4416            if let Some(base) = self.current_row.take() {
4417                if !self.yielded_for_current {
4418                    let mut out = base;
4419                    if let Some(ev) = &self.edge_var {
4420                        let preserve = self
4421                            .edge_constraint_var
4422                            .as_ref()
4423                            .map(|c| c == ev)
4424                            .unwrap_or(false);
4425                        if !preserve {
4426                            out.insert(ev.clone(), Value::Null);
4427                        }
4428                    }
4429                    let preserve_dst = self
4430                        .dst_constraint_var
4431                        .as_ref()
4432                        .map(|c| c == &self.dst_var)
4433                        .unwrap_or(false);
4434                    if !preserve_dst {
4435                        out.insert(self.dst_var.clone(), Value::Null);
4436                    }
4437                    self.yielded_for_current = true;
4438                    return Ok(Some(out));
4439                }
4440            }
4441
4442            match self.input.next(ctx)? {
4443                None => return Ok(None),
4444                Some(row) => {
4445                    let src_id = match row.get(&self.src_var) {
4446                        Some(Value::Node(n)) => n.id,
4447                        // src_var is Null (because a prior
4448                        // OPTIONAL MATCH chained before this one
4449                        // Null-bound it). Skip adjacency entirely
4450                        // and fall through to the fallback Null
4451                        // row so downstream clauses see the
4452                        // preserved input.
4453                        Some(Value::Null) => {
4454                            self.pending = Vec::new();
4455                            self.pending_idx = 0;
4456                            self.yielded_for_current = false;
4457                            self.current_row = Some(row);
4458                            continue;
4459                        }
4460                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4461                    };
4462                    self.pending = match self.direction {
4463                        Direction::Outgoing => ctx.store.outgoing(src_id)?,
4464                        Direction::Incoming => ctx.store.incoming(src_id)?,
4465                        Direction::Both => {
4466                            // For undirected traversal, a self-loop
4467                            // appears once as outgoing and once as
4468                            // incoming. The Cypher spec visits each
4469                            // physical edge once, so dedupe by
4470                            // edge_id before emitting.
4471                            let mut all = ctx.store.outgoing(src_id)?;
4472                            let mut seen: std::collections::HashSet<EdgeId> =
4473                                all.iter().map(|(e, _)| *e).collect();
4474                            for (e, n) in ctx.store.incoming(src_id)? {
4475                                if seen.insert(e) {
4476                                    all.push((e, n));
4477                                }
4478                            }
4479                            all
4480                        }
4481                    };
4482                    self.pending_idx = 0;
4483                    self.yielded_for_current = false;
4484                    self.current_row = Some(row);
4485                }
4486            }
4487        }
4488    }
4489}
4490
4491struct VarLengthExpandOp {
4492    input: Box<dyn Operator>,
4493    src_var: String,
4494    edge_var: Option<String>,
4495    dst_var: String,
4496    dst_labels: Vec<String>,
4497    edge_types: Vec<String>,
4498    /// Per-edge property filter — every edge along the walked
4499    /// path must have these `(key, value)` pairs. Mirrors the
4500    /// inline filter on `EdgeExpandOp`; applied during DFS so
4501    /// failing edges prune the branch instead of generating
4502    /// wrong-length results.
4503    edge_properties: Vec<(String, Expr)>,
4504    direction: Direction,
4505    min_hops: u64,
4506    max_hops: u64,
4507    path_var: Option<String>,
4508    /// Per-row left-join mode: when `true`, an input row that
4509    /// produces no matching paths still emits one row with the
4510    /// expansion's output vars bound to Null. Set for
4511    /// `OPTIONAL MATCH (a)-[*]->(b)` so the outer row survives
4512    /// even when the path search is empty.
4513    optional: bool,
4514    /// When set, paths whose terminal node id differs from the
4515    /// node bound at this variable in the current row are
4516    /// filtered out before counting as a match. Combined with
4517    /// `optional`, an input row whose candidate paths all miss
4518    /// the bound target triggers the null-fallback instead of
4519    /// silently dropping.
4520    dst_constraint_var: Option<String>,
4521    /// Replay mode: read the walked edge sequence from the list
4522    /// already bound at this row variable instead of doing DFS.
4523    /// Used by openCypher's "walk a pre-bound edge list" form.
4524    bound_edge_list_var: Option<String>,
4525    /// Row variables whose bound edges (or edge lists) must not
4526    /// appear in the walked path — enforces openCypher's
4527    /// relationship-uniqueness rule across hops within the same
4528    /// MATCH pattern. Each entry resolves against the current row
4529    /// (falling through to outer-scope rows) at run time; the
4530    /// union of every referenced edge id becomes the DFS
4531    /// exclusion set.
4532    excluded_edge_vars: Vec<String>,
4533    current_row: Option<Row>,
4534    pending_paths: Vec<Vec<Edge>>,
4535    pending_node_paths: Vec<Vec<NodeId>>,
4536    pending_targets: Vec<NodeId>,
4537    pending_idx: usize,
4538}
4539
4540impl VarLengthExpandOp {
4541    #[allow(clippy::too_many_arguments)]
4542    fn new(
4543        input: Box<dyn Operator>,
4544        src_var: String,
4545        edge_var: Option<String>,
4546        dst_var: String,
4547        dst_labels: Vec<String>,
4548        edge_types: Vec<String>,
4549        edge_properties: Vec<(String, Expr)>,
4550        direction: Direction,
4551        min_hops: u64,
4552        max_hops: u64,
4553        path_var: Option<String>,
4554        optional: bool,
4555        dst_constraint_var: Option<String>,
4556        bound_edge_list_var: Option<String>,
4557        excluded_edge_vars: Vec<String>,
4558    ) -> Self {
4559        Self {
4560            input,
4561            src_var,
4562            edge_var,
4563            dst_var,
4564            dst_labels,
4565            edge_types,
4566            edge_properties,
4567            direction,
4568            min_hops,
4569            max_hops,
4570            path_var,
4571            optional,
4572            dst_constraint_var,
4573            bound_edge_list_var,
4574            excluded_edge_vars,
4575            current_row: None,
4576            pending_paths: Vec::new(),
4577            pending_node_paths: Vec::new(),
4578            pending_targets: Vec::new(),
4579            pending_idx: 0,
4580        }
4581    }
4582
4583    fn enumerate(
4584        &self,
4585        ctx: &ExecCtx,
4586        start: NodeId,
4587        input_row: &Row,
4588    ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4589        let mut paths: Vec<Vec<Edge>> = Vec::new();
4590        let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4591        let mut targets: Vec<NodeId> = Vec::new();
4592        let mut edge_buf: Vec<Edge> = Vec::new();
4593        let mut node_buf: Vec<NodeId> = vec![start];
4594        // Seed `used` with edges from outer-scope bindings the
4595        // pattern flags as exclusions (e.g. the other hops'
4596        // edge / edge-list vars). A fresh walk can't revisit an
4597        // edge that's already bound as a relationship variable
4598        // elsewhere in the MATCH — that's openCypher's
4599        // relationship-uniqueness rule.
4600        let mut used: HashSet<EdgeId> = HashSet::new();
4601        for var in &self.excluded_edge_vars {
4602            match ctx.lookup_binding(input_row, var) {
4603                Some(Value::Edge(e)) => {
4604                    used.insert(e.id);
4605                }
4606                Some(Value::List(items)) => {
4607                    for item in items {
4608                        if let Value::Edge(e) = item {
4609                            used.insert(e.id);
4610                        }
4611                    }
4612                }
4613                _ => {}
4614            }
4615        }
4616        // Evaluate edge-property expected values once per input row
4617        // — they may reference row bindings or `$`-parameters, but
4618        // don't vary per walked edge.
4619        let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4620            Vec::new()
4621        } else {
4622            let ectx = ctx.eval_ctx(input_row);
4623            self.edge_properties
4624                .iter()
4625                .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4626                .collect::<Result<Vec<_>>>()?
4627        };
4628        self.dfs(
4629            ctx,
4630            start,
4631            &expected_edge_props,
4632            &mut edge_buf,
4633            &mut node_buf,
4634            &mut used,
4635            &mut paths,
4636            &mut node_paths,
4637            &mut targets,
4638        )?;
4639        Ok((paths, node_paths, targets))
4640    }
4641
4642    #[allow(clippy::too_many_arguments)]
4643    fn dfs(
4644        &self,
4645        ctx: &ExecCtx,
4646        current_node: NodeId,
4647        expected_edge_props: &[(String, Value)],
4648        edge_buf: &mut Vec<Edge>,
4649        node_buf: &mut Vec<NodeId>,
4650        used: &mut HashSet<EdgeId>,
4651        out_paths: &mut Vec<Vec<Edge>>,
4652        out_node_paths: &mut Vec<Vec<NodeId>>,
4653        out_targets: &mut Vec<NodeId>,
4654    ) -> Result<()> {
4655        let depth = edge_buf.len() as u64;
4656
4657        if depth >= self.min_hops && depth <= self.max_hops {
4658            let terminal_ok = match ctx.store.get_node(current_node)? {
4659                Some(node) => has_all_labels(&node, &self.dst_labels),
4660                None => false,
4661            };
4662            if terminal_ok {
4663                out_paths.push(edge_buf.clone());
4664                out_node_paths.push(node_buf.clone());
4665                out_targets.push(current_node);
4666            }
4667        }
4668
4669        if depth >= self.max_hops {
4670            return Ok(());
4671        }
4672
4673        let neighbors = match self.direction {
4674            Direction::Outgoing => ctx.store.outgoing(current_node)?,
4675            Direction::Incoming => ctx.store.incoming(current_node)?,
4676            Direction::Both => {
4677                let mut all = ctx.store.outgoing(current_node)?;
4678                all.extend(ctx.store.incoming(current_node)?);
4679                all
4680            }
4681        };
4682
4683        for (eid, neighbor_id) in neighbors {
4684            if used.contains(&eid) {
4685                continue;
4686            }
4687            let edge = match ctx.store.get_edge(eid)? {
4688                Some(e) => e,
4689                None => continue,
4690            };
4691            if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4692            {
4693                continue;
4694            }
4695            // Inline edge property filter: skip edges whose
4696            // properties don't equal the expected values computed
4697            // per input row. A missing property fails the check,
4698            // matching `EdgeExpandOp`.
4699            if !expected_edge_props.is_empty() {
4700                let mut ok = true;
4701                for (key, expected) in expected_edge_props {
4702                    let actual = match edge.properties.get(key) {
4703                        Some(v) => Value::Property(v.clone()),
4704                        None => {
4705                            ok = false;
4706                            break;
4707                        }
4708                    };
4709                    if !values_equal(&actual, expected) {
4710                        ok = false;
4711                        break;
4712                    }
4713                }
4714                if !ok {
4715                    continue;
4716                }
4717            }
4718            used.insert(eid);
4719            edge_buf.push(edge);
4720            node_buf.push(neighbor_id);
4721            self.dfs(
4722                ctx,
4723                neighbor_id,
4724                expected_edge_props,
4725                edge_buf,
4726                node_buf,
4727                used,
4728                out_paths,
4729                out_node_paths,
4730                out_targets,
4731            )?;
4732            edge_buf.pop();
4733            node_buf.pop();
4734            used.remove(&eid);
4735        }
4736
4737        Ok(())
4738    }
4739}
4740
4741/// Replay the edge sequence stored at `list_var` in `row` as a
4742/// var-length walk starting from `src_id`. Returns `(paths,
4743/// node_paths, targets)` in the same shape `enumerate` produces
4744/// — either one entry when the list reads as a valid connected
4745/// walk in `direction`, or empty when it doesn't.
4746///
4747/// Validation per step:
4748/// * the list element is a `Value::Edge`,
4749/// * the edge's optional type filter matches `edge_types`,
4750/// * the edge actually touches the current node and proceeds to
4751///   the other endpoint in the requested direction (undirected
4752///   accepts either).
4753///
4754/// A null / missing / non-list value, a null source, or any
4755/// failed step all produce an empty result — which, when the
4756/// caller is in optional mode, triggers the left-join null
4757/// fallback.
4758fn replay_edge_list(
4759    ctx: &ExecCtx,
4760    row: &Row,
4761    list_var: &str,
4762    src_id: Option<NodeId>,
4763    direction: Direction,
4764    edge_types: &[String],
4765) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4766    let start = match src_id {
4767        Some(id) => id,
4768        None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4769    };
4770    let list = match ctx.lookup_binding(row, list_var) {
4771        Some(Value::List(items)) => items.clone(),
4772        Some(Value::Property(meshdb_core::Property::List(items))) => items
4773            .iter()
4774            .cloned()
4775            .map(Value::Property)
4776            .collect::<Vec<_>>(),
4777        _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4778    };
4779    let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4780    let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4781    node_buf.push(start);
4782    let mut current = start;
4783    for item in list {
4784        let edge = match item {
4785            Value::Edge(e) => e,
4786            _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4787        };
4788        if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4789            return Ok((Vec::new(), Vec::new(), Vec::new()));
4790        }
4791        let next_node = match direction {
4792            Direction::Outgoing => {
4793                if edge.source != current {
4794                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4795                }
4796                edge.target
4797            }
4798            Direction::Incoming => {
4799                if edge.target != current {
4800                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4801                }
4802                edge.source
4803            }
4804            Direction::Both => {
4805                if edge.source == current {
4806                    edge.target
4807                } else if edge.target == current {
4808                    edge.source
4809                } else {
4810                    return Ok((Vec::new(), Vec::new(), Vec::new()));
4811                }
4812            }
4813        };
4814        // The stored edge shape should still exist in the graph;
4815        // a missing node mid-walk means the snapshot shifted and
4816        // the list is no longer valid.
4817        if ctx.store.get_node(next_node)?.is_none() {
4818            return Ok((Vec::new(), Vec::new(), Vec::new()));
4819        }
4820        edge_buf.push(edge);
4821        node_buf.push(next_node);
4822        current = next_node;
4823    }
4824    Ok((vec![edge_buf], vec![node_buf], vec![current]))
4825}
4826
4827impl Operator for VarLengthExpandOp {
4828    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4829        loop {
4830            while self.pending_idx < self.pending_paths.len() {
4831                let i = self.pending_idx;
4832                self.pending_idx += 1;
4833
4834                let target_id = self.pending_targets[i];
4835                let target = match ctx.store.get_node(target_id)? {
4836                    Some(n) => n,
4837                    None => continue,
4838                };
4839
4840                let base = self
4841                    .current_row
4842                    .as_ref()
4843                    .expect("pending without source row");
4844                let mut out = base.clone();
4845                out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4846                if let Some(ev) = &self.edge_var {
4847                    let edges: Vec<Value> = self.pending_paths[i]
4848                        .iter()
4849                        .cloned()
4850                        .map(Value::Edge)
4851                        .collect();
4852                    out.insert(ev.clone(), Value::List(edges));
4853                }
4854                if let Some(pv) = &self.path_var {
4855                    let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4856                    for nid in &self.pending_node_paths[i] {
4857                        match ctx.store.get_node(*nid)? {
4858                            Some(n) => nodes.push(n),
4859                            None => continue,
4860                        }
4861                    }
4862                    let edges = self.pending_paths[i].clone();
4863                    out.insert(pv.clone(), Value::Path { nodes, edges });
4864                }
4865                return Ok(Some(out));
4866            }
4867
4868            match self.input.next(ctx)? {
4869                None => return Ok(None),
4870                Some(row) => {
4871                    let src_id = match row.get(&self.src_var) {
4872                        Some(Value::Node(n)) => Some(n.id),
4873                        // Null source → no paths. Same
4874                        // null-propagating semantics as
4875                        // `EdgeExpandOp`. In optional mode we
4876                        // still emit the left-join fallback;
4877                        // otherwise we just skip.
4878                        Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4879                            None
4880                        }
4881                        _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4882                    };
4883                    // Replay path: when the hop names an already-
4884                    // bound edge list (`MATCH (a)-[rs*]->(b)` with
4885                    // `rs = [r1, r2]` from an earlier WITH), skip
4886                    // DFS entirely and verify the listed edges
4887                    // form a connected walk from `src_id` in the
4888                    // required direction. Produces at most one
4889                    // path — the exact list that was supplied.
4890                    let (mut paths, mut node_paths, mut targets) =
4891                        if let Some(list_var) = &self.bound_edge_list_var {
4892                            replay_edge_list(
4893                                ctx,
4894                                &row,
4895                                list_var,
4896                                src_id,
4897                                self.direction,
4898                                &self.edge_types,
4899                            )?
4900                        } else {
4901                            match src_id {
4902                                Some(id) => self.enumerate(ctx, id, &row)?,
4903                                None => (Vec::new(), Vec::new(), Vec::new()),
4904                            }
4905                        };
4906                    // Bound-endpoint constraint: drop paths whose
4907                    // terminal node doesn't match the node already
4908                    // bound at the constraint var. When combined
4909                    // with `optional`, an input whose paths are
4910                    // all filtered out still triggers the
4911                    // null-fallback below.
4912                    if let Some(constraint_var) = &self.dst_constraint_var {
4913                        let target_id = match row.get(constraint_var) {
4914                            Some(Value::Node(n)) => Some(n.id),
4915                            _ => None,
4916                        };
4917                        match target_id {
4918                            Some(id) => {
4919                                let mut kept_paths = Vec::new();
4920                                let mut kept_node_paths = Vec::new();
4921                                let mut kept_targets = Vec::new();
4922                                for ((p, np), t) in paths
4923                                    .drain(..)
4924                                    .zip(node_paths.drain(..))
4925                                    .zip(targets.drain(..))
4926                                {
4927                                    if t == id {
4928                                        kept_paths.push(p);
4929                                        kept_node_paths.push(np);
4930                                        kept_targets.push(t);
4931                                    }
4932                                }
4933                                paths = kept_paths;
4934                                node_paths = kept_node_paths;
4935                                targets = kept_targets;
4936                            }
4937                            None => {
4938                                paths.clear();
4939                                node_paths.clear();
4940                                targets.clear();
4941                            }
4942                        }
4943                    }
4944                    if paths.is_empty() && self.optional {
4945                        // Left-join fallback: emit one row with
4946                        // the expansion's output vars set to Null
4947                        // so the outer OPTIONAL MATCH preserves
4948                        // this input row.
4949                        let mut out = row;
4950                        if let Some(ev) = &self.edge_var {
4951                            out.insert(ev.clone(), Value::Null);
4952                        }
4953                        out.insert(self.dst_var.clone(), Value::Null);
4954                        if let Some(pv) = &self.path_var {
4955                            out.insert(pv.clone(), Value::Null);
4956                        }
4957                        return Ok(Some(out));
4958                    }
4959                    self.pending_paths = paths;
4960                    self.pending_node_paths = node_paths;
4961                    self.pending_targets = targets;
4962                    self.pending_idx = 0;
4963                    self.current_row = Some(row);
4964                }
4965            }
4966        }
4967    }
4968}
4969
4970struct FilterOp {
4971    input: Box<dyn Operator>,
4972    predicate: Expr,
4973}
4974
4975impl FilterOp {
4976    fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4977        Self { input, predicate }
4978    }
4979}
4980
4981impl Operator for FilterOp {
4982    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4983        while let Some(row) = self.input.next(ctx)? {
4984            let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4985                Ok(v) => v,
4986                // Type mismatches and non-boolean errors in filter predicates
4987                // are treated as false (row filtered out), not hard errors
4988                Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4989                Err(e) => return Err(e),
4990            };
4991            if to_bool(&v).unwrap_or(false) {
4992                return Ok(Some(row));
4993            }
4994        }
4995        Ok(None)
4996    }
4997}
4998
4999/// Pass-through operator for `RETURN *` / `WITH *`. Forwards every
5000/// row from the input unchanged.
5001struct IdentityOp {
5002    input: Box<dyn Operator>,
5003}
5004
5005impl IdentityOp {
5006    fn new(input: Box<dyn Operator>) -> Self {
5007        Self { input }
5008    }
5009}
5010
5011impl Operator for IdentityOp {
5012    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5013        self.input.next(ctx)
5014    }
5015}
5016
5017/// Passes input rows through; if the input produces zero rows,
5018/// emits exactly one row with `null_vars` bound to Value::Null.
5019/// Implements standalone OPTIONAL MATCH semantics (e.g.
5020/// `OPTIONAL MATCH (n) RETURN n` on an empty graph yields one
5021/// row with n=null rather than the empty result set).
5022struct CoalesceNullRowOp {
5023    input: Box<dyn Operator>,
5024    null_vars: Vec<String>,
5025    produced_any: bool,
5026    done: bool,
5027}
5028
5029impl CoalesceNullRowOp {
5030    fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
5031        Self {
5032            input,
5033            null_vars,
5034            produced_any: false,
5035            done: false,
5036        }
5037    }
5038}
5039
5040impl Operator for CoalesceNullRowOp {
5041    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5042        if self.done {
5043            return Ok(None);
5044        }
5045        match self.input.next(ctx)? {
5046            Some(row) => {
5047                self.produced_any = true;
5048                Ok(Some(row))
5049            }
5050            None => {
5051                self.done = true;
5052                if self.produced_any {
5053                    Ok(None)
5054                } else {
5055                    let mut row = Row::new();
5056                    for v in &self.null_vars {
5057                        row.insert(v.clone(), Value::Null);
5058                    }
5059                    Ok(Some(row))
5060                }
5061            }
5062        }
5063    }
5064}
5065
5066struct ProjectOp {
5067    input: Box<dyn Operator>,
5068    items: Vec<ReturnItem>,
5069}
5070
5071impl ProjectOp {
5072    fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
5073        Self { input, items }
5074    }
5075}
5076
5077impl Operator for ProjectOp {
5078    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5079        match self.input.next(ctx)? {
5080            Some(row) => {
5081                let mut out = Row::new();
5082                for (i, item) in self.items.iter().enumerate() {
5083                    let name = item.alias.clone().unwrap_or_else(|| {
5084                        item.raw_text
5085                            .clone()
5086                            .unwrap_or_else(|| default_name(&item.expr, i))
5087                    });
5088                    let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
5089                    out.insert(name, value);
5090                }
5091                Ok(Some(out))
5092            }
5093            None => Ok(None),
5094        }
5095    }
5096}
5097
5098fn default_name(expr: &Expr, idx: usize) -> String {
5099    render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
5100}
5101
5102fn render_expr_name(expr: &Expr) -> Option<String> {
5103    Some(match expr {
5104        Expr::Identifier(s) => s.clone(),
5105        Expr::Property { var, key } => format!("{var}.{key}"),
5106        Expr::PropertyAccess { base, key } => {
5107            // Match the source syntax's parenthesisation of a
5108            // bracketed base: `(list[1]).k` round-trips, while a
5109            // plain identifier base stays bare (`a.b`).
5110            if matches!(
5111                base.as_ref(),
5112                Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
5113            ) {
5114                format!("({}).{key}", render_expr_name(base)?)
5115            } else {
5116                format!("{}.{key}", render_expr_name(base)?)
5117            }
5118        }
5119        Expr::Parameter(name) => format!("${name}"),
5120        Expr::Literal(Literal::String(s)) => format!("'{s}'"),
5121        Expr::Literal(Literal::Integer(i)) => i.to_string(),
5122        Expr::Literal(Literal::Float(f)) => f.to_string(),
5123        Expr::Literal(Literal::Boolean(b)) => b.to_string(),
5124        Expr::Literal(Literal::Null) => "NULL".into(),
5125        Expr::Call { name, args } => {
5126            let arg_str = match args {
5127                CallArgs::Star => "*".into(),
5128                CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
5129                    let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
5130                        "DISTINCT "
5131                    } else {
5132                        ""
5133                    };
5134                    let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
5135                    if inner.len() != es.len() {
5136                        return None;
5137                    }
5138                    format!("{prefix}{}", inner.join(", "))
5139                }
5140            };
5141            format!("{name}({arg_str})")
5142        }
5143        Expr::BinaryOp { op, left, right } => {
5144            let op_str = match op {
5145                BinaryOp::Add => " + ",
5146                BinaryOp::Sub => " - ",
5147                BinaryOp::Mul => " * ",
5148                BinaryOp::Div => " / ",
5149                BinaryOp::Mod => " % ",
5150                BinaryOp::Pow => " ^ ",
5151            };
5152            format!(
5153                "{}{op_str}{}",
5154                render_expr_name(left)?,
5155                render_expr_name(right)?
5156            )
5157        }
5158        Expr::UnaryOp { op, operand } => {
5159            let op_str = match op {
5160                UnaryOp::Neg => "-",
5161            };
5162            format!("{op_str}{}", render_expr_name(operand)?)
5163        }
5164        Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
5165        Expr::IsNull { negated, inner } => {
5166            if *negated {
5167                format!("{} IS NOT NULL", render_expr_name(inner)?)
5168            } else {
5169                format!("{} IS NULL", render_expr_name(inner)?)
5170            }
5171        }
5172        Expr::Compare { op, left, right } => {
5173            let op_str = match op {
5174                CompareOp::Eq => " = ",
5175                CompareOp::Ne => " <> ",
5176                CompareOp::Lt => " < ",
5177                CompareOp::Le => " <= ",
5178                CompareOp::Gt => " > ",
5179                CompareOp::Ge => " >= ",
5180                CompareOp::StartsWith => " STARTS WITH ",
5181                CompareOp::EndsWith => " ENDS WITH ",
5182                CompareOp::Contains => " CONTAINS ",
5183                CompareOp::RegexMatch => " =~ ",
5184            };
5185            format!(
5186                "{}{op_str}{}",
5187                render_expr_name(left)?,
5188                render_expr_name(right)?
5189            )
5190        }
5191        Expr::List(items) => {
5192            let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
5193            if inner.len() != items.len() {
5194                return None;
5195            }
5196            format!("[{}]", inner.join(", "))
5197        }
5198        Expr::Map(entries) => {
5199            let inner: Vec<String> = entries
5200                .iter()
5201                .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
5202                .collect::<Option<Vec<_>>>()?;
5203            format!("{{{}}}", inner.join(", "))
5204        }
5205        Expr::IndexAccess { base, index } => {
5206            format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
5207        }
5208        Expr::InList { element, list } => {
5209            format!(
5210                "{} IN {}",
5211                render_expr_name(element)?,
5212                render_expr_name(list)?
5213            )
5214        }
5215        Expr::HasLabels { expr, labels } => {
5216            let mut s = format!("({}", render_expr_name(expr)?);
5217            for l in labels {
5218                s.push(':');
5219                s.push_str(l);
5220            }
5221            s.push(')');
5222            s
5223        }
5224        _ => return None,
5225    })
5226}
5227
5228struct DistinctOp {
5229    input: Box<dyn Operator>,
5230    seen: HashSet<String>,
5231}
5232
5233impl DistinctOp {
5234    fn new(input: Box<dyn Operator>) -> Self {
5235        Self {
5236            input,
5237            seen: HashSet::new(),
5238        }
5239    }
5240}
5241
5242impl Operator for DistinctOp {
5243    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5244        while let Some(row) = self.input.next(ctx)? {
5245            let key = row_key(&row);
5246            if self.seen.insert(key) {
5247                return Ok(Some(row));
5248            }
5249        }
5250        Ok(None)
5251    }
5252}
5253
5254/// Assemble a `Value::Path` from a row's already-bound node and
5255/// edge variables. Emitted by `plan_pattern` when the source
5256/// pattern carries `path_var`. The operator pulls `node_vars[i]`
5257/// and `edge_vars[i]` out of every row, walks them in order, and
5258/// inserts the result into the row under `path_var` before
5259/// forwarding downstream.
5260///
5261/// Rows where any referenced variable is missing or not the
5262/// expected Node/Edge shape get a `Value::Null` at `path_var` —
5263/// matching how `OPTIONAL MATCH` flows null bindings through the
5264/// downstream projection without hard-erroring. Missing bindings
5265/// shouldn't normally happen (the planner fills in synthetic
5266/// names via `ensure_path_bindings`), but the null fallback
5267/// means an unexpected upstream shape degrades gracefully
5268/// instead of crashing the whole query.
5269struct BindPathOp {
5270    input: Box<dyn Operator>,
5271    path_var: String,
5272    node_vars: Vec<String>,
5273    edge_vars: Vec<String>,
5274}
5275
5276impl BindPathOp {
5277    fn new(
5278        input: Box<dyn Operator>,
5279        path_var: String,
5280        node_vars: Vec<String>,
5281        edge_vars: Vec<String>,
5282    ) -> Self {
5283        Self {
5284            input,
5285            path_var,
5286            node_vars,
5287            edge_vars,
5288        }
5289    }
5290}
5291
5292impl Operator for BindPathOp {
5293    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5294        let Some(mut row) = self.input.next(ctx)? else {
5295            return Ok(None);
5296        };
5297        // Extract the ordered node + edge sequence from the row.
5298        // Any missing or wrong-shaped entry collapses the whole
5299        // path binding to null and falls through.
5300        let mut nodes: Vec<meshdb_core::Node> = Vec::new();
5301        let mut edges: Vec<meshdb_core::Edge> = Vec::new();
5302        let mut abort = false;
5303        // Interleave node/edge vars. For each hop i:
5304        //   node_vars[i] = start/intermediate node
5305        //   edge_vars[i] = edge (or sub-path for var-length)
5306        //   node_vars[i+1] = target node
5307        // For var-length hops, edge_vars[i] may contain a
5308        // Value::Path — splice its interior into the running path.
5309        if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
5310            nodes.push(n.clone());
5311        } else {
5312            abort = true;
5313        }
5314        if !abort {
5315            for (i, ev) in self.edge_vars.iter().enumerate() {
5316                match row.get(ev) {
5317                    Some(Value::Edge(e)) => {
5318                        edges.push(e.clone());
5319                        match row.get(&self.node_vars[i + 1]) {
5320                            Some(Value::Node(n)) => nodes.push(n.clone()),
5321                            _ => {
5322                                abort = true;
5323                                break;
5324                            }
5325                        }
5326                    }
5327                    Some(Value::Path {
5328                        nodes: sub_nodes,
5329                        edges: sub_edges,
5330                    }) => {
5331                        // Splice the sub-path. The sub-path's first
5332                        // node is the same as nodes.last() (already
5333                        // pushed), so skip it. All sub-edges go in.
5334                        // Sub-path interior nodes go in. The sub-path's
5335                        // last node is the target for this hop.
5336                        edges.extend(sub_edges.iter().cloned());
5337                        if sub_nodes.len() > 1 {
5338                            nodes.extend(sub_nodes[1..].iter().cloned());
5339                        }
5340                    }
5341                    _ => {
5342                        abort = true;
5343                        break;
5344                    }
5345                }
5346            }
5347        }
5348        if abort {
5349            row.insert(self.path_var.clone(), Value::Null);
5350        } else {
5351            row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5352        }
5353        Ok(Some(row))
5354    }
5355}
5356
5357/// `MATCH p = shortestPath((a)-[:R*..N]->(b))`. For each input
5358/// row (which must already bind both `src_var` and `dst_var`
5359/// to `Value::Node`), runs a breadth-first search from the
5360/// source node toward the target, filtering edges by
5361/// `edge_type` and walking up to `max_hops` steps. Emits one
5362/// row per successful search with `path_var` set to a
5363/// `Value::Path` carrying the traversed node/edge sequence;
5364/// rows where BFS finds no path are dropped entirely (matching
5365/// Cypher's `MATCH` semantics for an unsatisfiable pattern).
5366///
5367/// The BFS uses classic parent-pointer reconstruction: each
5368/// visited node's `(parent_node_id, edge_id)` pair is stored
5369/// in a hashmap, and once the target is reached we walk back
5370/// to the source, reversing the accumulated edges to produce
5371/// a forward-order path. Cycle detection is a side-effect of
5372/// the visited set — the first time BFS sees a node is
5373/// necessarily via a shortest path, so later visits are
5374/// ignored.
5375struct ShortestPathOp {
5376    input: Box<dyn Operator>,
5377    src_var: String,
5378    dst_var: String,
5379    path_var: String,
5380    edge_types: Vec<String>,
5381    direction: meshdb_cypher::Direction,
5382    max_hops: u64,
5383    kind: meshdb_cypher::ShortestKind,
5384    /// Buffered `(base_row, path)` pairs from the current
5385    /// input row, used only by `AllShortest` mode where a
5386    /// single input can expand into multiple shortest paths
5387    /// of the same length. Each call to `next` drains one
5388    /// entry before pulling a fresh input row; in `Shortest`
5389    /// mode the buffer is always empty or has one element.
5390    pending: std::collections::VecDeque<(Row, Value)>,
5391}
5392
5393impl ShortestPathOp {
5394    #[allow(clippy::too_many_arguments)]
5395    fn new(
5396        input: Box<dyn Operator>,
5397        src_var: String,
5398        dst_var: String,
5399        path_var: String,
5400        edge_types: Vec<String>,
5401        direction: meshdb_cypher::Direction,
5402        max_hops: u64,
5403        kind: meshdb_cypher::ShortestKind,
5404    ) -> Self {
5405        Self {
5406            input,
5407            src_var,
5408            dst_var,
5409            path_var,
5410            edge_types,
5411            direction,
5412            max_hops,
5413            kind,
5414            pending: std::collections::VecDeque::new(),
5415        }
5416    }
5417}
5418
5419impl Operator for ShortestPathOp {
5420    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5421        loop {
5422            // Drain buffered paths from a prior input row
5423            // before pulling the next one. Only reachable in
5424            // `AllShortest` mode; `Shortest` mode's buffer
5425            // never accumulates more than one entry.
5426            if let Some((mut row, path)) = self.pending.pop_front() {
5427                row.insert(self.path_var.clone(), path);
5428                return Ok(Some(row));
5429            }
5430            let Some(row) = self.input.next(ctx)? else {
5431                return Ok(None);
5432            };
5433            let src = match row.get(&self.src_var) {
5434                Some(Value::Node(n)) => n.clone(),
5435                _ => continue,
5436            };
5437            let dst = match row.get(&self.dst_var) {
5438                Some(Value::Node(n)) => n.clone(),
5439                _ => continue,
5440            };
5441            let paths = bfs_shortest_paths(
5442                &src,
5443                &dst,
5444                &self.edge_types,
5445                self.direction,
5446                self.max_hops,
5447                self.kind,
5448                ctx.store,
5449            )?;
5450            if paths.is_empty() {
5451                // No path of length ≤ max_hops — drop the row.
5452                continue;
5453            }
5454            for path in paths {
5455                self.pending.push_back((row.clone(), path));
5456            }
5457        }
5458    }
5459}
5460
5461/// Layered breadth-first search from `src` toward `dst`,
5462/// constrained by edge type, direction, and max hop count.
5463/// Returns every path of minimum length when `kind` is
5464/// `AllShortest`, or just the first discovered shortest path
5465/// when `kind` is `Shortest`. An empty vector means no path
5466/// of length ≤ `max_hops` exists.
5467///
5468/// `src == dst` is a zero-hop special case that always
5469/// returns a singleton path regardless of `max_hops`.
5470///
5471/// The algorithm builds a parent DAG as it expands each
5472/// level: every node that was first reached at depth `d+1`
5473/// records every `(parent, edge)` pair from frontier[d] that
5474/// reaches it, not just the first one. This lets the
5475/// reconstruction walk enumerate all source-to-target paths
5476/// of the discovered shortest length. For `Shortest` mode
5477/// the reconstruction short-circuits on the first complete
5478/// path.
5479fn bfs_shortest_paths(
5480    src: &Node,
5481    dst: &Node,
5482    edge_types: &[String],
5483    direction: meshdb_cypher::Direction,
5484    max_hops: u64,
5485    kind: meshdb_cypher::ShortestKind,
5486    reader: &dyn crate::reader::GraphReader,
5487) -> Result<Vec<Value>> {
5488    use meshdb_cypher::Direction;
5489
5490    if src.id == dst.id {
5491        return Ok(vec![Value::Path {
5492            nodes: vec![src.clone()],
5493            edges: vec![],
5494        }]);
5495    }
5496
5497    // `dist` tracks the shortest distance from `src` to each
5498    // reached node. `parents` maps each reached node id to
5499    // every `(parent_id, edge_id)` pair that reached it at
5500    // that shortest distance — a node may have multiple
5501    // parents in the parent DAG for `AllShortest`.
5502    let mut dist: HashMap<NodeId, u64> = HashMap::new();
5503    dist.insert(src.id, 0);
5504    let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5505
5506    let mut frontier: Vec<NodeId> = vec![src.id];
5507    let mut depth: u64 = 0;
5508    let mut found = false;
5509
5510    while !frontier.is_empty() && depth < max_hops && !found {
5511        let mut next_frontier: Vec<NodeId> = Vec::new();
5512        for node_id in &frontier {
5513            let neighbors = match direction {
5514                Direction::Outgoing => reader.outgoing(*node_id)?,
5515                Direction::Incoming => reader.incoming(*node_id)?,
5516                Direction::Both => {
5517                    let mut out = reader.outgoing(*node_id)?;
5518                    out.extend(reader.incoming(*node_id)?);
5519                    out
5520                }
5521            };
5522            for (edge_id, neighbor_id) in neighbors {
5523                // Edge-type filter. Only fetch the edge record
5524                // when a type constraint is present.
5525                if !edge_types.is_empty() {
5526                    let edge = match reader.get_edge(edge_id)? {
5527                        Some(e) => e,
5528                        None => continue,
5529                    };
5530                    if !edge_types.iter().any(|t| t == &edge.edge_type) {
5531                        continue;
5532                    }
5533                }
5534                match dist.get(&neighbor_id) {
5535                    Some(&d) if d == depth + 1 => {
5536                        // Alternate parent at the same
5537                        // shortest-path level — record the
5538                        // additional edge so the reconstruction
5539                        // can enumerate it, but don't re-add to
5540                        // the next frontier.
5541                        parents
5542                            .entry(neighbor_id)
5543                            .or_default()
5544                            .push((*node_id, edge_id));
5545                    }
5546                    Some(_) => {
5547                        // Already reached at a strictly shorter
5548                        // depth; this edge isn't on a shortest
5549                        // path.
5550                    }
5551                    None => {
5552                        dist.insert(neighbor_id, depth + 1);
5553                        parents
5554                            .entry(neighbor_id)
5555                            .or_default()
5556                            .push((*node_id, edge_id));
5557                        if neighbor_id == dst.id {
5558                            found = true;
5559                        } else {
5560                            next_frontier.push(neighbor_id);
5561                        }
5562                    }
5563                }
5564            }
5565        }
5566        depth += 1;
5567        if !found {
5568            frontier = next_frontier;
5569        }
5570    }
5571
5572    if !found {
5573        return Ok(Vec::new());
5574    }
5575
5576    // Enumerate all shortest paths from the parent DAG.
5577    // `Shortest` mode short-circuits after the first complete
5578    // walk; `AllShortest` collects every distinct path.
5579    let mut out: Vec<Value> = Vec::new();
5580    let mut nodes_rev: Vec<Node> = Vec::new();
5581    let mut edges_rev: Vec<Edge> = Vec::new();
5582    let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5583    collect_shortest_paths(
5584        src,
5585        dst,
5586        &parents,
5587        reader,
5588        &mut nodes_rev,
5589        &mut edges_rev,
5590        &mut out,
5591        only_first,
5592    )?;
5593    Ok(out)
5594}
5595
5596/// Depth-first walk through the parent DAG built by
5597/// `bfs_shortest_paths`, collecting every source-to-target
5598/// path of the BFS-determined shortest length. The recursion
5599/// carries two scratch stacks (`nodes_rev`, `edges_rev`)
5600/// representing the partially-reconstructed path in reverse
5601/// order; on reaching `src` we copy the reversed accumulators
5602/// into a forward-order `Value::Path` and push it into `out`.
5603///
5604/// `only_first = true` short-circuits after the first
5605/// complete path, giving `Shortest` mode the optimal-case
5606/// early exit.
5607#[allow(clippy::too_many_arguments)]
5608fn collect_shortest_paths(
5609    src: &Node,
5610    current: &Node,
5611    parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5612    reader: &dyn crate::reader::GraphReader,
5613    nodes_rev: &mut Vec<Node>,
5614    edges_rev: &mut Vec<Edge>,
5615    out: &mut Vec<Value>,
5616    only_first: bool,
5617) -> Result<()> {
5618    if current.id == src.id {
5619        // Complete walk: `nodes_rev` holds dst, ..., (last
5620        // node before src) in reverse; prepend src and
5621        // reverse to produce the forward-order node list.
5622        // Edges are similarly reversed.
5623        let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5624        nodes.push(src.clone());
5625        nodes.extend(nodes_rev.iter().rev().cloned());
5626        let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5627        out.push(Value::Path { nodes, edges });
5628        return Ok(());
5629    }
5630    let Some(parent_edges) = parents.get(&current.id) else {
5631        // Unreachable in normal flow — BFS only inserts dst
5632        // into `parents` when it found at least one incoming
5633        // edge — but handled defensively.
5634        return Ok(());
5635    };
5636    for (parent_id, edge_id) in parent_edges {
5637        if only_first && !out.is_empty() {
5638            return Ok(());
5639        }
5640        let edge = reader
5641            .get_edge(*edge_id)?
5642            .expect("BFS inserted this edge id; it must still exist");
5643        let parent_node = reader
5644            .get_node(*parent_id)?
5645            .expect("BFS visited this node id; it must still exist");
5646        nodes_rev.push(current.clone());
5647        edges_rev.push(edge);
5648        collect_shortest_paths(
5649            src,
5650            &parent_node,
5651            parents,
5652            reader,
5653            nodes_rev,
5654            edges_rev,
5655            out,
5656            only_first,
5657        )?;
5658        nodes_rev.pop();
5659        edges_rev.pop();
5660    }
5661    Ok(())
5662}
5663
5664/// `UNION` / `UNION ALL`. Drains each branch in order, streaming
5665/// its rows through. For plain `UNION` (`all = false`) we
5666/// deduplicate across the combined stream using the same
5667/// `row_key` the `DistinctOp` uses, so the semantics match
5668/// Neo4j's "set union" shape regardless of whether duplicates
5669/// sit inside a single branch or straddle branches. For
5670/// `UNION ALL` the `seen` set stays `None` and every produced
5671/// row is forwarded as-is.
5672struct UnionOp {
5673    branches: Vec<Box<dyn Operator>>,
5674    current: usize,
5675    seen: Option<HashSet<String>>,
5676}
5677
5678impl UnionOp {
5679    fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5680        Self {
5681            branches,
5682            current: 0,
5683            seen: if all { None } else { Some(HashSet::new()) },
5684        }
5685    }
5686}
5687
5688impl Operator for UnionOp {
5689    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5690        while self.current < self.branches.len() {
5691            match self.branches[self.current].next(ctx)? {
5692                Some(row) => {
5693                    if let Some(seen) = self.seen.as_mut() {
5694                        let key = row_key(&row);
5695                        if !seen.insert(key) {
5696                            continue;
5697                        }
5698                    }
5699                    return Ok(Some(row));
5700                }
5701                None => {
5702                    self.current += 1;
5703                }
5704            }
5705        }
5706        Ok(None)
5707    }
5708}
5709
5710struct OrderByOp {
5711    input: Box<dyn Operator>,
5712    sort_items: Vec<SortItem>,
5713    sorted: Option<Vec<Row>>,
5714    cursor: usize,
5715}
5716
5717impl OrderByOp {
5718    fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5719        Self {
5720            input,
5721            sort_items,
5722            sorted: None,
5723            cursor: 0,
5724        }
5725    }
5726}
5727
5728impl Operator for OrderByOp {
5729    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5730        if self.sorted.is_none() {
5731            let mut rows: Vec<Row> = Vec::new();
5732            while let Some(row) = self.input.next(ctx)? {
5733                rows.push(row);
5734            }
5735            let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5736            for row in rows {
5737                let mut keys = Vec::with_capacity(self.sort_items.len());
5738                for item in &self.sort_items {
5739                    keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5740                }
5741                keyed.push((keys, row));
5742            }
5743            let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5744            keyed.sort_by(|a, b| {
5745                for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5746                    let ord = compare_values(va, vb);
5747                    let ord = if descs[i] { ord.reverse() } else { ord };
5748                    if ord != Ordering::Equal {
5749                        return ord;
5750                    }
5751                }
5752                Ordering::Equal
5753            });
5754            self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5755        }
5756        let rows = self.sorted.as_ref().unwrap();
5757        if self.cursor < rows.len() {
5758            let row = rows[self.cursor].clone();
5759            self.cursor += 1;
5760            Ok(Some(row))
5761        } else {
5762            Ok(None)
5763        }
5764    }
5765}
5766
5767struct AggregateOp {
5768    input: Box<dyn Operator>,
5769    group_keys: Vec<ReturnItem>,
5770    aggregates: Vec<AggregateSpec>,
5771    results: Option<Vec<Row>>,
5772    cursor: usize,
5773}
5774
5775impl AggregateOp {
5776    fn new(
5777        input: Box<dyn Operator>,
5778        group_keys: Vec<ReturnItem>,
5779        aggregates: Vec<AggregateSpec>,
5780    ) -> Self {
5781        Self {
5782            input,
5783            group_keys,
5784            aggregates,
5785            results: None,
5786            cursor: 0,
5787        }
5788    }
5789
5790    fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5791        let mut groups: HashMap<String, GroupState> = HashMap::new();
5792        let mut order: Vec<String> = Vec::new();
5793
5794        // If there are no input rows AND no group keys, we still emit one row
5795        // (e.g. `MATCH (n:Missing) RETURN count(*)` must yield one row with 0).
5796        let mut saw_any = false;
5797
5798        while let Some(row) = self.input.next(ctx)? {
5799            saw_any = true;
5800            let mut key_values = Vec::with_capacity(self.group_keys.len());
5801            for item in &self.group_keys {
5802                key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5803            }
5804            let mut hash_key = String::new();
5805            for v in &key_values {
5806                hash_key.push_str(&value_key(v));
5807                hash_key.push('|');
5808            }
5809            let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5810                order.push(hash_key.clone());
5811                GroupState {
5812                    key_values: key_values.clone(),
5813                    agg_states: self
5814                        .aggregates
5815                        .iter()
5816                        .map(|a| AggState::initial(a.function))
5817                        .collect(),
5818                    distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5819                }
5820            });
5821            for (i, spec) in self.aggregates.iter().enumerate() {
5822                if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5823                    let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5824                    if matches!(v, Value::Null) {
5825                        continue;
5826                    }
5827                    let key = value_key(&v);
5828                    let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5829                    if !seen.insert(key) {
5830                        continue;
5831                    }
5832                }
5833                // Resolve the extra-arg constant *before* update so
5834                // `apoc.agg.nth` sees its target index on row 0;
5835                // percentile aggregators just collect into a Vec and
5836                // don't read the extra arg during update, so the
5837                // ordering is safe for them too.
5838                if let Some(extra_expr) = &spec.extra_arg {
5839                    let need_resolve_percentile = matches!(
5840                        &entry.agg_states[i],
5841                        AggState::PercentileDisc {
5842                            percentile: None,
5843                            ..
5844                        } | AggState::PercentileCont {
5845                            percentile: None,
5846                            ..
5847                        }
5848                    );
5849                    let need_resolve_nth =
5850                        matches!(&entry.agg_states[i], AggState::ApocNth { target: None, .. });
5851                    if need_resolve_percentile {
5852                        let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5853                        let p = match pv {
5854                            Value::Property(Property::Float64(f)) => f,
5855                            Value::Property(Property::Int64(i)) => i as f64,
5856                            _ => 0.0,
5857                        };
5858                        // openCypher requires the percentile to
5859                        // lie in [0.0, 1.0]; anything else is an
5860                        // `ArgumentError: NumberOutOfRange`.
5861                        if !(0.0..=1.0).contains(&p) || p.is_nan() {
5862                            return Err(Error::Procedure(format!("percentile out of range: {p}")));
5863                        }
5864                        match &mut entry.agg_states[i] {
5865                            AggState::PercentileDisc { percentile, .. }
5866                            | AggState::PercentileCont { percentile, .. } => {
5867                                *percentile = Some(p);
5868                            }
5869                            _ => {}
5870                        }
5871                    }
5872                    if need_resolve_nth {
5873                        let nv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5874                        let n = match nv {
5875                            Value::Property(Property::Int64(i)) => i,
5876                            _ => {
5877                                return Err(Error::Procedure(
5878                                    "apoc.agg.nth expects an integer index".into(),
5879                                ))
5880                            }
5881                        };
5882                        if n < 0 {
5883                            return Err(Error::Procedure(format!(
5884                                "apoc.agg.nth index out of range: {n}"
5885                            )));
5886                        }
5887                        if let AggState::ApocNth { target, .. } = &mut entry.agg_states[i] {
5888                            *target = Some(n);
5889                        }
5890                    }
5891                }
5892                entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5893            }
5894        }
5895
5896        let mut out = Vec::new();
5897        if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
5898            // Empty group, single aggregate row
5899            let mut row = Row::new();
5900            for spec in &self.aggregates {
5901                row.insert(
5902                    spec.alias.clone(),
5903                    AggState::initial(spec.function).finalize(),
5904                );
5905            }
5906            out.push(row);
5907        } else {
5908            for key in order {
5909                let state = groups.remove(&key).unwrap();
5910                let mut row = Row::new();
5911                for (i, item) in self.group_keys.iter().enumerate() {
5912                    let name = item
5913                        .alias
5914                        .clone()
5915                        .unwrap_or_else(|| default_name(&item.expr, i));
5916                    row.insert(name, state.key_values[i].clone());
5917                }
5918                for (i, spec) in self.aggregates.iter().enumerate() {
5919                    row.insert(spec.alias.clone(), state.agg_states[i].finalize());
5920                }
5921                out.push(row);
5922            }
5923        }
5924        self.results = Some(out);
5925        Ok(())
5926    }
5927}
5928
5929impl Operator for AggregateOp {
5930    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5931        if self.results.is_none() {
5932            self.compute(ctx)?;
5933        }
5934        let rows = self.results.as_ref().unwrap();
5935        if self.cursor < rows.len() {
5936            let row = rows[self.cursor].clone();
5937            self.cursor += 1;
5938            Ok(Some(row))
5939        } else {
5940            Ok(None)
5941        }
5942    }
5943}
5944
5945struct GroupState {
5946    key_values: Vec<Value>,
5947    agg_states: Vec<AggState>,
5948    distinct_seen: Vec<Option<HashSet<String>>>,
5949}
5950
5951enum AggState {
5952    Count(i64),
5953    Sum {
5954        int_part: i64,
5955        float_part: f64,
5956        is_float: bool,
5957    },
5958    Avg {
5959        total: f64,
5960        count: i64,
5961    },
5962    Min(Option<Value>),
5963    Max(Option<Value>),
5964    Collect(Vec<Value>),
5965    StDev {
5966        sum: f64,
5967        sum_sq: f64,
5968        count: i64,
5969    },
5970    StDevP {
5971        sum: f64,
5972        sum_sq: f64,
5973        count: i64,
5974    },
5975    PercentileDisc {
5976        items: Vec<Value>,
5977        percentile: Option<f64>,
5978    },
5979    PercentileCont {
5980        items: Vec<Value>,
5981        percentile: Option<f64>,
5982    },
5983    /// `apoc.agg.first` — latches the first non-null value seen.
5984    ApocFirst(Option<Value>),
5985    /// `apoc.agg.last` — overwrites the slot on every non-null value.
5986    ApocLast(Option<Value>),
5987    /// `apoc.agg.nth(expr, n)` — `target` is the constant `n`, resolved
5988    /// once from the spec's `extra_arg` on the first row (same shape
5989    /// as percentiles). `count` tracks non-null inputs; `slot` is set
5990    /// exactly when `count` reaches `target`.
5991    ApocNth {
5992        target: Option<i64>,
5993        count: i64,
5994        slot: Option<Value>,
5995    },
5996    /// `apoc.agg.median` — collects numeric values, sort + pick middle
5997    /// (or average of the two middles) at finalize.
5998    ApocMedian(Vec<f64>),
5999    /// `apoc.agg.product` — running product. Mirrors the Sum
6000    /// int-vs-float split so integer-only streams stay in Int64.
6001    ApocProduct {
6002        int_part: i64,
6003        float_part: f64,
6004        is_float: bool,
6005        seen: bool,
6006    },
6007}
6008
6009impl AggState {
6010    fn initial(func: AggregateFn) -> Self {
6011        match func {
6012            AggregateFn::Count => AggState::Count(0),
6013            AggregateFn::Sum => AggState::Sum {
6014                int_part: 0,
6015                float_part: 0.0,
6016                is_float: false,
6017            },
6018            AggregateFn::Avg => AggState::Avg {
6019                total: 0.0,
6020                count: 0,
6021            },
6022            AggregateFn::Min => AggState::Min(None),
6023            AggregateFn::Max => AggState::Max(None),
6024            AggregateFn::Collect => AggState::Collect(Vec::new()),
6025            AggregateFn::StDev => AggState::StDev {
6026                sum: 0.0,
6027                sum_sq: 0.0,
6028                count: 0,
6029            },
6030            AggregateFn::StDevP => AggState::StDevP {
6031                sum: 0.0,
6032                sum_sq: 0.0,
6033                count: 0,
6034            },
6035            AggregateFn::PercentileDisc => AggState::PercentileDisc {
6036                items: Vec::new(),
6037                percentile: None,
6038            },
6039            AggregateFn::PercentileCont => AggState::PercentileCont {
6040                items: Vec::new(),
6041                percentile: None,
6042            },
6043            AggregateFn::ApocFirst => AggState::ApocFirst(None),
6044            AggregateFn::ApocLast => AggState::ApocLast(None),
6045            AggregateFn::ApocNth => AggState::ApocNth {
6046                target: None,
6047                count: 0,
6048                slot: None,
6049            },
6050            AggregateFn::ApocMedian => AggState::ApocMedian(Vec::new()),
6051            AggregateFn::ApocProduct => AggState::ApocProduct {
6052                int_part: 1,
6053                float_part: 1.0,
6054                is_float: false,
6055                seen: false,
6056            },
6057        }
6058    }
6059
6060    fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
6061        match self {
6062            AggState::Count(c) => match arg {
6063                AggregateArg::Star => *c += 1,
6064                AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
6065                    if !matches!(eval_expr(e, ctx)?, Value::Null) {
6066                        *c += 1;
6067                    }
6068                }
6069            },
6070            AggState::Sum {
6071                int_part,
6072                float_part,
6073                is_float,
6074            } => {
6075                let v = expr_arg_value(arg, ctx)?;
6076                match v {
6077                    Value::Null => {}
6078                    Value::Property(Property::Int64(i)) => *int_part += i,
6079                    Value::Property(Property::Float64(f)) => {
6080                        *float_part += f;
6081                        *is_float = true;
6082                    }
6083                    _ => return Err(Error::AggregateTypeError),
6084                }
6085            }
6086            AggState::Avg { total, count } => {
6087                let v = expr_arg_value(arg, ctx)?;
6088                match v {
6089                    Value::Null => {}
6090                    Value::Property(Property::Int64(i)) => {
6091                        *total += i as f64;
6092                        *count += 1;
6093                    }
6094                    Value::Property(Property::Float64(f)) => {
6095                        *total += f;
6096                        *count += 1;
6097                    }
6098                    _ => return Err(Error::AggregateTypeError),
6099                }
6100            }
6101            AggState::Min(slot) => {
6102                // `min` / `max` ignore null inputs and operate
6103                // over any `Value` (including lists, nodes etc.),
6104                // not just scalar Properties. The ordering is
6105                // `compare_values`: within a single type the
6106                // natural order, across types the
6107                // `type_order_value` rank.
6108                let v = expr_arg_value(arg, ctx)?;
6109                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6110                    // skip
6111                } else {
6112                    match slot {
6113                        None => *slot = Some(v),
6114                        Some(cur) => {
6115                            if compare_values(&v, cur) == Ordering::Less {
6116                                *cur = v;
6117                            }
6118                        }
6119                    }
6120                }
6121            }
6122            AggState::Max(slot) => {
6123                let v = expr_arg_value(arg, ctx)?;
6124                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6125                    // skip
6126                } else {
6127                    match slot {
6128                        None => *slot = Some(v),
6129                        Some(cur) => {
6130                            if compare_values(&v, cur) == Ordering::Greater {
6131                                *cur = v;
6132                            }
6133                        }
6134                    }
6135                }
6136            }
6137            AggState::Collect(items) => {
6138                let v = expr_arg_value(arg, ctx)?;
6139                if !matches!(v, Value::Null) {
6140                    items.push(v);
6141                }
6142            }
6143            AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
6144                let v = expr_arg_value(arg, ctx)?;
6145                if !matches!(v, Value::Null) {
6146                    items.push(v);
6147                }
6148            }
6149            AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
6150                let v = expr_arg_value(arg, ctx)?;
6151                match v {
6152                    Value::Null => {}
6153                    Value::Property(Property::Int64(i)) => {
6154                        let f = i as f64;
6155                        *sum += f;
6156                        *sum_sq += f * f;
6157                        *count += 1;
6158                    }
6159                    Value::Property(Property::Float64(f)) => {
6160                        *sum += f;
6161                        *sum_sq += f * f;
6162                        *count += 1;
6163                    }
6164                    _ => return Err(Error::AggregateTypeError),
6165                }
6166            }
6167            AggState::ApocFirst(slot) => {
6168                if slot.is_some() {
6169                    return Ok(());
6170                }
6171                let v = expr_arg_value(arg, ctx)?;
6172                if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6173                    *slot = Some(v);
6174                }
6175            }
6176            AggState::ApocLast(slot) => {
6177                let v = expr_arg_value(arg, ctx)?;
6178                if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6179                    *slot = Some(v);
6180                }
6181            }
6182            AggState::ApocNth {
6183                target,
6184                count,
6185                slot,
6186            } => {
6187                if slot.is_some() {
6188                    return Ok(());
6189                }
6190                let v = expr_arg_value(arg, ctx)?;
6191                if matches!(v, Value::Null | Value::Property(Property::Null)) {
6192                    return Ok(());
6193                }
6194                if let Some(t) = *target {
6195                    if *count == t {
6196                        *slot = Some(v);
6197                    }
6198                    *count += 1;
6199                }
6200            }
6201            AggState::ApocMedian(items) => {
6202                let v = expr_arg_value(arg, ctx)?;
6203                match v {
6204                    Value::Null | Value::Property(Property::Null) => {}
6205                    Value::Property(Property::Int64(i)) => items.push(i as f64),
6206                    Value::Property(Property::Float64(f)) => items.push(f),
6207                    _ => return Err(Error::AggregateTypeError),
6208                }
6209            }
6210            AggState::ApocProduct {
6211                int_part,
6212                float_part,
6213                is_float,
6214                seen,
6215            } => {
6216                let v = expr_arg_value(arg, ctx)?;
6217                match v {
6218                    Value::Null | Value::Property(Property::Null) => {}
6219                    Value::Property(Property::Int64(i)) => {
6220                        *int_part = int_part.saturating_mul(i);
6221                        *seen = true;
6222                    }
6223                    Value::Property(Property::Float64(f)) => {
6224                        *float_part *= f;
6225                        *is_float = true;
6226                        *seen = true;
6227                    }
6228                    _ => return Err(Error::AggregateTypeError),
6229                }
6230            }
6231        }
6232        Ok(())
6233    }
6234
6235    fn finalize(&self) -> Value {
6236        match self {
6237            AggState::Count(c) => Value::Property(Property::Int64(*c)),
6238            AggState::Sum {
6239                int_part,
6240                float_part,
6241                is_float,
6242            } => {
6243                if *is_float {
6244                    Value::Property(Property::Float64(*float_part + *int_part as f64))
6245                } else {
6246                    Value::Property(Property::Int64(*int_part))
6247                }
6248            }
6249            AggState::Avg { total, count } => {
6250                if *count == 0 {
6251                    Value::Null
6252                } else {
6253                    Value::Property(Property::Float64(*total / *count as f64))
6254                }
6255            }
6256            AggState::Min(slot) | AggState::Max(slot) => match slot {
6257                Some(v) => v.clone(),
6258                None => Value::Null,
6259            },
6260            AggState::Collect(items) => Value::List(items.clone()),
6261            AggState::StDevP { sum, sum_sq, count } => {
6262                if *count == 0 {
6263                    Value::Property(Property::Float64(0.0))
6264                } else {
6265                    let n = *count as f64;
6266                    let variance = *sum_sq / n - (*sum / n).powi(2);
6267                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6268                }
6269            }
6270            AggState::StDev { sum, sum_sq, count } => {
6271                if *count < 2 {
6272                    Value::Property(Property::Float64(0.0))
6273                } else {
6274                    let n = *count as f64;
6275                    let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
6276                    Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6277                }
6278            }
6279            AggState::PercentileDisc { items, percentile } => {
6280                percentile_disc(items, percentile.unwrap_or(0.0))
6281            }
6282            AggState::PercentileCont { items, percentile } => {
6283                percentile_cont(items, percentile.unwrap_or(0.0))
6284            }
6285            AggState::ApocFirst(slot) | AggState::ApocLast(slot) => match slot {
6286                Some(v) => v.clone(),
6287                None => Value::Null,
6288            },
6289            AggState::ApocNth { slot, .. } => match slot {
6290                Some(v) => v.clone(),
6291                None => Value::Null,
6292            },
6293            AggState::ApocMedian(items) => {
6294                if items.is_empty() {
6295                    return Value::Null;
6296                }
6297                let mut sorted = items.clone();
6298                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6299                let n = sorted.len();
6300                let median = if n % 2 == 1 {
6301                    sorted[n / 2]
6302                } else {
6303                    (sorted[n / 2 - 1] + sorted[n / 2]) / 2.0
6304                };
6305                Value::Property(Property::Float64(median))
6306            }
6307            AggState::ApocProduct {
6308                int_part,
6309                float_part,
6310                is_float,
6311                seen,
6312            } => {
6313                // Empty stream: null, matching the convention for
6314                // avg/min/max (and unlike sum, which folds to 0).
6315                if !*seen {
6316                    return Value::Null;
6317                }
6318                if *is_float {
6319                    Value::Property(Property::Float64(*float_part * *int_part as f64))
6320                } else {
6321                    Value::Property(Property::Int64(*int_part))
6322                }
6323            }
6324        }
6325    }
6326}
6327
6328fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
6329    match arg {
6330        AggregateArg::Star => Err(Error::AggregateTypeError),
6331        AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
6332    }
6333}
6334
6335/// Coerce a collected aggregate value into an f64 for percentile
6336/// math. Unhandled types fall back to NaN; the caller sorts NaN
6337/// out of the stream before computing the percentile.
6338fn value_to_f64(v: &Value) -> f64 {
6339    match v {
6340        Value::Property(Property::Int64(i)) => *i as f64,
6341        Value::Property(Property::Float64(f)) => *f,
6342        _ => f64::NAN,
6343    }
6344}
6345
6346/// `percentileDisc(expr, p)` — discrete percentile. Returns the
6347/// smallest value at or above the `p`-ranked position. Numbers only;
6348/// non-numeric values get sorted to the end and are effectively
6349/// ignored unless the percentile lands on one.
6350fn percentile_disc(items: &[Value], p: f64) -> Value {
6351    let mut nums: Vec<(f64, Value)> = items
6352        .iter()
6353        .map(|v| (value_to_f64(v), v.clone()))
6354        .filter(|(f, _)| !f.is_nan())
6355        .collect();
6356    if nums.is_empty() {
6357        return Value::Null;
6358    }
6359    nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
6360    let p = p.clamp(0.0, 1.0);
6361    let n = nums.len();
6362    // Neo4j spec: ceil(p * n) - 1, clamped at 0.
6363    let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
6364    nums[idx.min(n - 1)].1.clone()
6365}
6366
6367/// `percentileCont(expr, p)` — continuous percentile. Linearly
6368/// interpolates between the two ranks that bracket the fractional
6369/// position `p * (n - 1)`. Returns a Float64.
6370fn percentile_cont(items: &[Value], p: f64) -> Value {
6371    let mut nums: Vec<f64> = items
6372        .iter()
6373        .map(value_to_f64)
6374        .filter(|f| !f.is_nan())
6375        .collect();
6376    if nums.is_empty() {
6377        return Value::Null;
6378    }
6379    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6380    let p = p.clamp(0.0, 1.0);
6381    let n = nums.len();
6382    if n == 1 {
6383        return Value::Property(Property::Float64(nums[0]));
6384    }
6385    let pos = p * (n as f64 - 1.0);
6386    let lo = pos.floor() as usize;
6387    let hi = pos.ceil() as usize;
6388    let frac = pos - lo as f64;
6389    let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
6390    Value::Property(Property::Float64(v))
6391}
6392
6393struct SkipOp {
6394    input: Box<dyn Operator>,
6395    count_expr: Expr,
6396    remaining: Option<i64>,
6397}
6398
6399impl SkipOp {
6400    fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
6401        Self {
6402            input,
6403            count_expr,
6404            remaining: None,
6405        }
6406    }
6407}
6408
6409impl Operator for SkipOp {
6410    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6411        if self.remaining.is_none() {
6412            let empty = Row::new();
6413            let ectx = ctx.eval_ctx(&empty);
6414            let val = eval_expr(&self.count_expr, &ectx)?;
6415            self.remaining = Some(expr_to_count(val)?);
6416        }
6417        let rem = self.remaining.as_mut().unwrap();
6418        while *rem > 0 {
6419            if self.input.next(ctx)?.is_none() {
6420                return Ok(None);
6421            }
6422            *rem -= 1;
6423        }
6424        self.input.next(ctx)
6425    }
6426}
6427
6428struct LimitOp {
6429    input: Box<dyn Operator>,
6430    count_expr: Expr,
6431    remaining: Option<i64>,
6432    /// True when the input subtree carries a write operator
6433    /// (CREATE / SET / DELETE / etc.). After the limit is hit
6434    /// (including the `LIMIT 0` case where it's hit before the
6435    /// first pull), drain the input fully so eager side effects
6436    /// land — Cypher specifies writes happen regardless of how
6437    /// many rows downstream consumes.
6438    drain_on_complete: bool,
6439    /// Set after the post-limit drain runs so we don't re-drain
6440    /// on every subsequent `next()` call.
6441    drained: bool,
6442}
6443
6444impl LimitOp {
6445    fn new(input: Box<dyn Operator>, count_expr: Expr, drain_on_complete: bool) -> Self {
6446        Self {
6447            input,
6448            count_expr,
6449            remaining: None,
6450            drain_on_complete,
6451            drained: false,
6452        }
6453    }
6454
6455    /// Pull from input until exhausted, discarding rows. Used
6456    /// after the limit is satisfied to ensure write operators
6457    /// in the subtree run their full set of writes.
6458    fn drain_input(&mut self, ctx: &ExecCtx) -> Result<()> {
6459        if self.drained {
6460            return Ok(());
6461        }
6462        while self.input.next(ctx)?.is_some() {}
6463        self.drained = true;
6464        Ok(())
6465    }
6466}
6467
6468impl Operator for LimitOp {
6469    fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6470        if self.remaining.is_none() {
6471            let empty = Row::new();
6472            let ectx = ctx.eval_ctx(&empty);
6473            let val = eval_expr(&self.count_expr, &ectx)?;
6474            self.remaining = Some(expr_to_count(val)?);
6475        }
6476        let rem = self.remaining.as_mut().unwrap();
6477        if *rem <= 0 {
6478            if self.drain_on_complete {
6479                self.drain_input(ctx)?;
6480            }
6481            return Ok(None);
6482        }
6483        match self.input.next(ctx)? {
6484            Some(row) => {
6485                *rem -= 1;
6486                Ok(Some(row))
6487            }
6488            None => Ok(None),
6489        }
6490    }
6491}
6492
6493fn expr_to_count(val: Value) -> Result<i64> {
6494    match val {
6495        Value::Null | Value::Property(Property::Null) => Ok(0),
6496        Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
6497        // openCypher: SKIP/LIMIT require an integer. Float values
6498        // (including integer-valued floats) are rejected as
6499        // InvalidArgumentType — they'd only be valid after an
6500        // explicit cast, which the user must write themselves.
6501        _ => Err(Error::TypeMismatch),
6502    }
6503}
6504
6505/// Trivial source operator that yields a fixed list of rows in
6506/// order. Used by [`execute_with_in_tx_substitute`] to inject
6507/// the pre-materialised batched-execution outputs from a
6508/// `CALL { } IN TRANSACTIONS` into the regular operator pipeline
6509/// so wrapping clauses (Project, OrderBy, Limit, Aggregate, …)
6510/// can run untouched.
6511struct RowsLiteralOp {
6512    rows: Vec<Row>,
6513    cursor: usize,
6514}
6515
6516impl RowsLiteralOp {
6517    fn new(rows: Vec<Row>) -> Self {
6518        Self { rows, cursor: 0 }
6519    }
6520}
6521
6522impl Operator for RowsLiteralOp {
6523    fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
6524        if self.cursor < self.rows.len() {
6525            let row = self.rows[self.cursor].clone();
6526            self.cursor += 1;
6527            Ok(Some(row))
6528        } else {
6529            Ok(None)
6530        }
6531    }
6532}