Skip to main content

lora_executor/
executor.rs

1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7    symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8    ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use std::time::Instant;
18use tracing::{debug, error, trace};
19
20pub struct ExecutionContext<'a, S: GraphStorage> {
21    pub storage: &'a S,
22    pub params: std::collections::BTreeMap<String, LoraValue>,
23}
24
25pub struct Executor<'a, S: GraphStorage> {
26    ctx: ExecutionContext<'a, S>,
27    deadline: Option<Instant>,
28}
29
30impl<'a, S: GraphStorage> Executor<'a, S> {
31    pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
32        Self {
33            ctx,
34            deadline: None,
35        }
36    }
37
38    pub fn with_deadline(ctx: ExecutionContext<'a, S>, deadline: Option<Instant>) -> Self {
39        Self { ctx, deadline }
40    }
41
42    #[inline]
43    fn check_deadline(&self) -> ExecResult<()> {
44        if let Some(deadline) = self.deadline {
45            check_deadline_at(deadline)
46        } else {
47            Ok(())
48        }
49    }
50
51    #[inline]
52    fn check_loop_deadline(deadline: Option<Instant>) -> ExecResult<()> {
53        if let Some(deadline) = deadline {
54            check_deadline_at(deadline)
55        } else {
56            Ok(())
57        }
58    }
59}
60
61#[inline]
62fn check_deadline_at(deadline: Instant) -> ExecResult<()> {
63    if Instant::now() >= deadline {
64        Err(ExecutorError::QueryTimeout)
65    } else {
66        Ok(())
67    }
68}
69
70impl<'a, S: GraphStorage> Executor<'a, S> {
71    pub fn execute(
72        &self,
73        plan: &PhysicalPlan,
74        options: Option<ExecuteOptions>,
75    ) -> ExecResult<QueryResult> {
76        let rows = self.execute_rows(plan)?;
77        Ok(project_rows(rows, options.unwrap_or_default()))
78    }
79
80    pub fn execute_compiled(
81        &self,
82        compiled: &CompiledQuery,
83        options: Option<ExecuteOptions>,
84    ) -> ExecResult<QueryResult> {
85        let rows = self.execute_compiled_rows(compiled)?;
86        Ok(project_rows(rows, options.unwrap_or_default()))
87    }
88
89    pub fn execute_compiled_rows(&self, compiled: &CompiledQuery) -> ExecResult<Vec<Row>> {
90        self.check_deadline()?;
91        if compiled.unions.is_empty() {
92            return self.execute_rows(&compiled.physical);
93        }
94
95        let _ = take_eval_error();
96
97        let mut all_rows = self.execute_rows(&compiled.physical)?;
98        let mut needs_dedup = false;
99
100        for branch in &compiled.unions {
101            self.check_deadline()?;
102            let branch_rows = self.execute_rows(&branch.physical)?;
103            all_rows.extend(branch_rows);
104
105            if !branch.all {
106                needs_dedup = true;
107            }
108        }
109
110        if needs_dedup {
111            all_rows = dedup_rows(all_rows);
112        }
113
114        Ok(all_rows)
115    }
116
117    pub fn execute_rows(&self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
118        self.check_deadline()?;
119        // Clear any error residue that a previous query on this thread may have
120        // left in the thread-local eval-error slot.
121        let _ = take_eval_error();
122
123        let rows = self.execute_node(plan, plan.root)?;
124        Ok(rows
125            .into_iter()
126            .map(|row| self.hydrate_row(row))
127            .collect::<Vec<_>>())
128    }
129
130    fn hydrate_row(&self, row: Row) -> Row {
131        let mut out = Row::new();
132
133        for (var, name, value) in row.into_iter_named() {
134            out.insert_named(var, name, self.hydrate_value(value));
135        }
136
137        out
138    }
139
140    /// Buffered execution of an arbitrary subplan. Public to the
141    /// crate so the pull pipeline can fall back to materialized
142    /// execution for operators that have no streaming source yet.
143    pub(crate) fn execute_subtree(
144        &self,
145        plan: &PhysicalPlan,
146        node_id: PhysicalNodeId,
147    ) -> ExecResult<Vec<Row>> {
148        self.execute_node(plan, node_id)
149    }
150
151    fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
152        self.check_deadline()?;
153        trace!("read-only execute_node start: node_id={node_id:?}");
154
155        let result = match &plan.nodes[node_id] {
156            PhysicalOp::Argument(op) => self.exec_argument(op),
157            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
158            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
159            PhysicalOp::NodeByPropertyScan(op) => self.exec_node_by_property_scan(plan, op),
160            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
161            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
162            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
163            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
164            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
165            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
166            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
167            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
168            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
169            PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
170            PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
171            PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
172            PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
173            PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
174        };
175
176        match &result {
177            Ok(rows) => trace!(
178                "read-only execute_node ok: node_id={node_id:?}, rows={}",
179                rows.len()
180            ),
181            Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
182        }
183
184        result
185    }
186
187    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
188        Ok(vec![Row::new()])
189    }
190
191    fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
192        let base_rows = match op.input {
193            Some(input) => self.execute_node(plan, input)?,
194            None => vec![Row::new()],
195        };
196
197        let node_ids = self.ctx.storage.all_node_ids();
198        let mut out = Vec::new();
199
200        let deadline = self.deadline;
201        for row in base_rows {
202            Self::check_loop_deadline(deadline)?;
203            if let Some(existing) = row.get(op.var) {
204                match existing {
205                    LoraValue::Node(existing_id) => {
206                        if self.ctx.storage.has_node(*existing_id) {
207                            out.push(row);
208                        }
209                    }
210                    other => {
211                        return Err(ExecutorError::ExpectedNodeForExpand {
212                            var: format!("{:?}", op.var),
213                            found: value_kind(other),
214                        });
215                    }
216                }
217                continue;
218            }
219
220            for &id in &node_ids {
221                Self::check_loop_deadline(deadline)?;
222                let mut new_row = row.clone();
223                new_row.insert(op.var, LoraValue::Node(id));
224                out.push(new_row);
225            }
226        }
227
228        Ok(out)
229    }
230
231    fn exec_node_by_label_scan(
232        &self,
233        plan: &PhysicalPlan,
234        op: &NodeByLabelScanExec,
235    ) -> ExecResult<Vec<Row>> {
236        let base_rows = match op.input {
237            Some(input) => self.execute_node(plan, input)?,
238            None => vec![Row::new()],
239        };
240
241        let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
242        let candidates_prefiltered = label_group_candidates_prefiltered(&op.labels);
243        let mut out = Vec::new();
244
245        match self.deadline {
246            Some(deadline) => {
247                for row in base_rows {
248                    check_deadline_at(deadline)?;
249                    if let Some(existing) = row.get(op.var) {
250                        match existing {
251                            LoraValue::Node(existing_id) => {
252                                let labels_ok = self
253                                    .ctx
254                                    .storage
255                                    .with_node(*existing_id, |n| {
256                                        node_matches_label_groups(&n.labels, &op.labels)
257                                    })
258                                    .unwrap_or(false);
259                                if labels_ok {
260                                    out.push(row);
261                                }
262                            }
263                            other => {
264                                return Err(ExecutorError::ExpectedNodeForExpand {
265                                    var: format!("{:?}", op.var),
266                                    found: value_kind(other),
267                                });
268                            }
269                        }
270                        continue;
271                    }
272
273                    for &id in &candidate_ids {
274                        check_deadline_at(deadline)?;
275                        if !candidates_prefiltered {
276                            let labels_ok = self
277                                .ctx
278                                .storage
279                                .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
280                                .unwrap_or(false);
281                            if !labels_ok {
282                                continue;
283                            }
284                        }
285                        let mut new_row = row.clone();
286                        new_row.insert(op.var, LoraValue::Node(id));
287                        out.push(new_row);
288                    }
289                }
290            }
291            None => {
292                for row in base_rows {
293                    if let Some(existing) = row.get(op.var) {
294                        match existing {
295                            LoraValue::Node(existing_id) => {
296                                let labels_ok = self
297                                    .ctx
298                                    .storage
299                                    .with_node(*existing_id, |n| {
300                                        node_matches_label_groups(&n.labels, &op.labels)
301                                    })
302                                    .unwrap_or(false);
303                                if labels_ok {
304                                    out.push(row);
305                                }
306                            }
307                            other => {
308                                return Err(ExecutorError::ExpectedNodeForExpand {
309                                    var: format!("{:?}", op.var),
310                                    found: value_kind(other),
311                                });
312                            }
313                        }
314                        continue;
315                    }
316
317                    for &id in &candidate_ids {
318                        if !candidates_prefiltered {
319                            let labels_ok = self
320                                .ctx
321                                .storage
322                                .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
323                                .unwrap_or(false);
324                            if !labels_ok {
325                                continue;
326                            }
327                        }
328                        let mut new_row = row.clone();
329                        new_row.insert(op.var, LoraValue::Node(id));
330                        out.push(new_row);
331                    }
332                }
333            }
334        }
335
336        Ok(out)
337    }
338
339    fn exec_node_by_property_scan(
340        &self,
341        plan: &PhysicalPlan,
342        op: &NodeByPropertyScanExec,
343    ) -> ExecResult<Vec<Row>> {
344        let base_rows = match op.input {
345            Some(input) => self.execute_node(plan, input)?,
346            None => vec![Row::new()],
347        };
348
349        let eval_ctx = EvalContext {
350            storage: self.ctx.storage,
351            params: &self.ctx.params,
352        };
353        let mut out = Vec::new();
354
355        let deadline = self.deadline;
356        for row in base_rows {
357            Self::check_loop_deadline(deadline)?;
358            let expected = eval_expr(&op.value, &row, &eval_ctx);
359
360            if let Some(existing) = row.get(op.var) {
361                match existing {
362                    LoraValue::Node(existing_id) => {
363                        if node_matches_property_filter(
364                            self.ctx.storage,
365                            *existing_id,
366                            &op.labels,
367                            &op.key,
368                            &expected,
369                        ) {
370                            out.push(row);
371                        }
372                    }
373                    other => {
374                        return Err(ExecutorError::ExpectedNodeForExpand {
375                            var: format!("{:?}", op.var),
376                            found: value_kind(other),
377                        });
378                    }
379                }
380                continue;
381            }
382
383            let candidates =
384                indexed_node_property_candidates(self.ctx.storage, &op.labels, &op.key, &expected);
385            for id in candidates.ids {
386                Self::check_loop_deadline(deadline)?;
387                if !candidates.prefiltered
388                    && !node_matches_property_filter(
389                        self.ctx.storage,
390                        id,
391                        &op.labels,
392                        &op.key,
393                        &expected,
394                    )
395                {
396                    continue;
397                }
398                let mut new_row = row.clone();
399                new_row.insert(op.var, LoraValue::Node(id));
400                out.push(new_row);
401            }
402        }
403
404        Ok(out)
405    }
406
407    fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
408        // Variable-length expansion: delegate to iterative expander.
409        if let Some(range) = &op.range {
410            return self.exec_expand_var_len(plan, op, range);
411        }
412
413        let input_rows = self.execute_node(plan, op.input)?;
414        let mut out = Vec::new();
415
416        for row in input_rows {
417            let src_node_id = match row.get(op.src) {
418                Some(LoraValue::Node(id)) => *id,
419                Some(other) => {
420                    return Err(ExecutorError::ExpectedNodeForExpand {
421                        var: format!("{:?}", op.src),
422                        found: value_kind(other),
423                    });
424                }
425                None => continue,
426            };
427
428            for (rel_id, dst_id) in
429                self.ctx
430                    .storage
431                    .expand_ids(src_node_id, op.direction, &op.types)
432            {
433                if let Some(expr) = op.rel_properties.as_ref() {
434                    let actual_props = self
435                        .ctx
436                        .storage
437                        .with_relationship(rel_id, |rel| rel.properties.clone());
438                    let matches = match actual_props {
439                        Some(props) => {
440                            self.relationship_matches_properties(&props, Some(expr), &row)?
441                        }
442                        None => false,
443                    };
444                    if !matches {
445                        continue;
446                    }
447                }
448
449                if let Some(existing_dst) = row.get(op.dst) {
450                    match existing_dst {
451                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
452                        LoraValue::Node(_) => continue,
453                        other => {
454                            return Err(ExecutorError::ExpectedNodeForExpand {
455                                var: format!("{:?}", op.dst),
456                                found: value_kind(other),
457                            });
458                        }
459                    }
460                }
461
462                if let Some(rel_var) = op.rel {
463                    if let Some(existing_rel) = row.get(rel_var) {
464                        match existing_rel {
465                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
466                            LoraValue::Relationship(_) => continue,
467                            other => {
468                                return Err(ExecutorError::ExpectedRelationshipForExpand {
469                                    var: format!("{:?}", rel_var),
470                                    found: value_kind(other),
471                                });
472                            }
473                        }
474                    }
475                }
476
477                let mut new_row = row.clone();
478
479                if !new_row.contains_key(op.dst) {
480                    new_row.insert(op.dst, LoraValue::Node(dst_id));
481                }
482
483                if let Some(rel_var) = op.rel {
484                    if !new_row.contains_key(rel_var) {
485                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
486                    }
487                }
488
489                out.push(new_row);
490            }
491        }
492
493        Ok(out)
494    }
495
496    fn exec_expand_var_len(
497        &self,
498        plan: &PhysicalPlan,
499        op: &ExpandExec,
500        range: &RangeLiteral,
501    ) -> ExecResult<Vec<Row>> {
502        let input_rows = self.execute_node(plan, op.input)?;
503        let (min_hops, max_hops) = resolve_range(range);
504        let mut out = Vec::new();
505
506        for row in input_rows {
507            let src_node_id = match row.get(op.src) {
508                Some(LoraValue::Node(id)) => *id,
509                Some(other) => {
510                    return Err(ExecutorError::ExpectedNodeForExpand {
511                        var: format!("{:?}", op.src),
512                        found: value_kind(other),
513                    });
514                }
515                None => continue,
516            };
517
518            let expansions = variable_length_expand(
519                self.ctx.storage,
520                src_node_id,
521                op.direction,
522                &op.types,
523                min_hops,
524                max_hops,
525            );
526
527            for result in expansions {
528                let mut new_row = row.clone();
529                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
530
531                // For variable-length patterns, bind the relationship variable
532                // to a list of relationship IDs traversed.
533                if let Some(rel_var) = op.rel {
534                    // Consume rel_ids — it's owned and no longer needed after this.
535                    let rel_list = LoraValue::List(
536                        result
537                            .rel_ids
538                            .into_iter()
539                            .map(LoraValue::Relationship)
540                            .collect(),
541                    );
542                    new_row.insert(rel_var, rel_list);
543                }
544
545                out.push(new_row);
546            }
547        }
548
549        Ok(out)
550    }
551
552    fn relationship_matches_properties(
553        &self,
554        actual: &Properties,
555        expected_expr: Option<&ResolvedExpr>,
556        row: &Row,
557    ) -> ExecResult<bool> {
558        let Some(expr) = expected_expr else {
559            return Ok(true);
560        };
561
562        let eval_ctx = EvalContext {
563            storage: self.ctx.storage,
564            params: &self.ctx.params,
565        };
566
567        let expected = eval_expr(expr, row, &eval_ctx);
568
569        let LoraValue::Map(expected_map) = expected else {
570            return Err(ExecutorError::ExpectedPropertyMap {
571                found: value_kind(&expected),
572            });
573        };
574
575        Ok(expected_map.iter().all(|(key, expected_value)| {
576            actual
577                .get(key)
578                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
579                .unwrap_or(false)
580        }))
581    }
582
583    fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
584        let input_rows = self.execute_node(plan, op.input)?;
585        let eval_ctx = EvalContext {
586            storage: self.ctx.storage,
587            params: &self.ctx.params,
588        };
589
590        Ok(input_rows
591            .into_iter()
592            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
593            .collect())
594    }
595
596    fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
597        let input_rows = self.execute_node(plan, op.input)?;
598        let eval_ctx = EvalContext {
599            storage: self.ctx.storage,
600            params: &self.ctx.params,
601        };
602
603        let mut out = Vec::with_capacity(input_rows.len());
604
605        for row in input_rows {
606            // include_existing=true means we carry all upstream columns — move
607            // the row into the projection instead of cloning every value.
608            if op.include_existing {
609                let mut projected = row;
610                for item in &op.items {
611                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
612                    if let Some(err) = take_eval_error() {
613                        return Err(ExecutorError::RuntimeError(err));
614                    }
615                    projected.insert_named(item.output, item.name.clone(), value);
616                }
617                out.push(projected);
618            } else {
619                let mut projected = Row::new();
620                for item in &op.items {
621                    let value = eval_expr(&item.expr, &row, &eval_ctx);
622                    if let Some(err) = take_eval_error() {
623                        return Err(ExecutorError::RuntimeError(err));
624                    }
625                    projected.insert_named(item.output, item.name.clone(), value);
626                }
627                out.push(projected);
628            }
629        }
630
631        Ok(if op.distinct {
632            dedup_rows_by_vars(out)
633        } else {
634            out
635        })
636    }
637
638    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
639        match value {
640            LoraValue::Node(id) => self.hydrate_node(id),
641            LoraValue::Relationship(id) => self.hydrate_relationship(id),
642            LoraValue::List(values) => {
643                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
644            }
645            LoraValue::Map(map) => LoraValue::Map(
646                map.into_iter()
647                    .map(|(k, v)| (k, self.hydrate_value(v)))
648                    .collect(),
649            ),
650            other => other,
651        }
652    }
653
654    fn hydrate_node(&self, id: u64) -> LoraValue {
655        self.ctx
656            .storage
657            .with_node(id, hydrate_node_record)
658            .unwrap_or(LoraValue::Null)
659    }
660
661    fn hydrate_relationship(&self, id: u64) -> LoraValue {
662        self.ctx
663            .storage
664            .with_relationship(id, hydrate_relationship_record)
665            .unwrap_or(LoraValue::Null)
666    }
667
668    fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
669        let input_rows = self.execute_node(plan, op.input)?;
670        let eval_ctx = EvalContext {
671            storage: self.ctx.storage,
672            params: &self.ctx.params,
673        };
674
675        let mut out = Vec::new();
676
677        for row in input_rows {
678            match eval_expr(&op.expr, &row, &eval_ctx) {
679                LoraValue::List(values) => {
680                    for value in values {
681                        let mut new_row = row.clone();
682                        new_row.insert(op.alias, value);
683                        out.push(new_row);
684                    }
685                }
686                LoraValue::Null => {}
687                other => {
688                    let mut new_row = row;
689                    new_row.insert(op.alias, other);
690                    out.push(new_row);
691                }
692            }
693        }
694
695        Ok(out)
696    }
697
698    fn exec_hash_aggregation(
699        &self,
700        plan: &PhysicalPlan,
701        op: &HashAggregationExec,
702    ) -> ExecResult<Vec<Row>> {
703        let input_rows = self.execute_node(plan, op.input)?;
704        let eval_ctx = EvalContext {
705            storage: self.ctx.storage,
706            params: &self.ctx.params,
707        };
708
709        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
710
711        if op.group_by.is_empty() {
712            groups.insert(Vec::new(), input_rows);
713        } else {
714            for row in input_rows {
715                let key = op
716                    .group_by
717                    .iter()
718                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
719                    .collect::<Vec<_>>();
720
721                groups.entry(key).or_default().push(row);
722            }
723        }
724
725        let mut out = Vec::new();
726
727        for rows in groups.into_values() {
728            let mut result = Row::new();
729
730            if let Some(first) = rows.first() {
731                for proj in &op.group_by {
732                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
733                    result.insert_named(proj.output, proj.name.clone(), value);
734                }
735            }
736
737            for proj in &op.aggregates {
738                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
739                result.insert_named(proj.output, proj.name.clone(), value);
740            }
741
742            out.push(result);
743        }
744
745        Ok(out)
746    }
747
748    fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
749        let mut rows = self.execute_node(plan, op.input)?;
750        let eval_ctx = EvalContext {
751            storage: self.ctx.storage,
752            params: &self.ctx.params,
753        };
754
755        rows.sort_by(|a, b| {
756            for item in &op.items {
757                let ord = compare_sort_item(item, a, b, &eval_ctx);
758                if ord != Ordering::Equal {
759                    return ord;
760                }
761            }
762            Ordering::Equal
763        });
764
765        Ok(rows)
766    }
767
768    fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
769        let mut rows = self.execute_node(plan, op.input)?;
770        let eval_ctx = EvalContext {
771            storage: self.ctx.storage,
772            params: &self.ctx.params,
773        };
774
775        let limit = op
776            .limit
777            .as_ref()
778            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
779            .unwrap_or(rows.len() as i64)
780            .max(0) as usize;
781
782        let skip = op
783            .skip
784            .as_ref()
785            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
786            .unwrap_or(0)
787            .max(0) as usize;
788
789        if skip >= rows.len() {
790            return Ok(Vec::new());
791        }
792
793        rows.drain(0..skip);
794        rows.truncate(limit);
795        Ok(rows)
796    }
797
798    fn exec_optional_match(
799        &self,
800        plan: &PhysicalPlan,
801        op: &OptionalMatchExec,
802    ) -> ExecResult<Vec<Row>> {
803        let input_rows = self.execute_node(plan, op.input)?;
804
805        // The inner plan is built to start from Argument (an empty row) and is
806        // read-only, so its output does not depend on the upstream input. Execute
807        // it once and reuse the result across every input row, instead of
808        // producing |input_rows| × |inner_rows| allocations.
809        let inner_rows = self.execute_node(plan, op.inner)?;
810
811        let mut out = Vec::new();
812
813        for input_row in input_rows {
814            let mut matched = false;
815
816            for inner_row in &inner_rows {
817                // Each variable already bound in input_row must match.
818                let compatible = input_row
819                    .iter()
820                    .all(|(var, val)| match inner_row.get(*var) {
821                        Some(inner_val) => inner_val == val,
822                        None => true,
823                    });
824                if !compatible {
825                    continue;
826                }
827
828                let mut merged = input_row.clone();
829                for (var, name, val) in inner_row.iter_named() {
830                    if !merged.contains_key(*var) {
831                        merged.insert_named(*var, name.into_owned(), val.clone());
832                    }
833                }
834                out.push(merged);
835                matched = true;
836            }
837
838            if !matched {
839                let mut null_row = input_row;
840                for &var_id in &op.new_vars {
841                    if !null_row.contains_key(var_id) {
842                        null_row.insert(var_id, LoraValue::Null);
843                    }
844                }
845                out.push(null_row);
846            }
847        }
848
849        Ok(out)
850    }
851
852    fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
853        let input_rows = self.execute_node(plan, op.input)?;
854        let mut rows: Vec<Row> = input_rows
855            .into_iter()
856            .map(|mut row| {
857                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
858                row.insert(op.output, path);
859                row
860            })
861            .collect();
862
863        if let Some(all) = op.shortest_path_all {
864            rows = filter_shortest_paths(rows, op.output, all);
865        }
866        Ok(rows)
867    }
868}
869
870fn properties_to_value_map(props: &Properties) -> LoraValue {
871    let mut map = BTreeMap::new();
872    for (k, v) in props.iter() {
873        map.insert(k.clone(), LoraValue::from(v));
874    }
875    LoraValue::Map(map)
876}
877
878pub struct MutableExecutionContext<'a, S: GraphStorageMut> {
879    pub storage: &'a mut S,
880    pub params: std::collections::BTreeMap<String, LoraValue>,
881}
882
883pub struct MutableExecutor<'a, S: GraphStorageMut> {
884    ctx: MutableExecutionContext<'a, S>,
885    deadline: Option<Instant>,
886}
887
888impl<'a, S: GraphStorageMut> MutableExecutor<'a, S> {
889    pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
890        Self {
891            ctx,
892            deadline: None,
893        }
894    }
895
896    pub fn with_deadline(ctx: MutableExecutionContext<'a, S>, deadline: Option<Instant>) -> Self {
897        Self { ctx, deadline }
898    }
899
900    #[inline]
901    fn check_deadline(&self) -> ExecResult<()> {
902        if let Some(deadline) = self.deadline {
903            check_deadline_at(deadline)
904        } else {
905            Ok(())
906        }
907    }
908
909    #[inline]
910    fn check_loop_deadline(deadline: Option<Instant>) -> ExecResult<()> {
911        if let Some(deadline) = deadline {
912            check_deadline_at(deadline)
913        } else {
914            Ok(())
915        }
916    }
917
918    pub fn execute(
919        &mut self,
920        plan: &PhysicalPlan,
921        options: Option<ExecuteOptions>,
922    ) -> ExecResult<QueryResult> {
923        let rows = self.execute_rows(plan)?;
924        Ok(project_rows(rows, options.unwrap_or_default()))
925    }
926
927    pub fn execute_rows(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
928        self.check_deadline()?;
929        // Clear any error residue that a previous query on this thread may have
930        // left in the thread-local eval-error slot.
931        let _ = take_eval_error();
932
933        let rows = self.execute_node(plan, plan.root)?;
934        Ok(rows
935            .into_iter()
936            .map(|row| self.hydrate_row(row))
937            .collect::<Vec<_>>())
938    }
939
940    /// Execute a compiled query that may include UNION branches.
941    pub fn execute_compiled(
942        &mut self,
943        compiled: &CompiledQuery,
944        options: Option<ExecuteOptions>,
945    ) -> ExecResult<QueryResult> {
946        let rows = self.execute_compiled_rows(compiled)?;
947        Ok(project_rows(rows, options.unwrap_or_default()))
948    }
949
950    pub fn execute_compiled_rows(&mut self, compiled: &CompiledQuery) -> ExecResult<Vec<Row>> {
951        self.check_deadline()?;
952        if compiled.unions.is_empty() {
953            return self.execute_rows(&compiled.physical);
954        }
955
956        let _ = take_eval_error();
957
958        // Execute the head branch.
959        let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
960
961        // Execute each UNION branch and combine.
962        // Track whether any branch uses plain UNION (dedup needed).
963        let mut needs_dedup = false;
964
965        for branch in &compiled.unions {
966            self.check_deadline()?;
967            let branch_rows = self.execute_and_hydrate(&branch.physical)?;
968            all_rows.extend(branch_rows);
969
970            if !branch.all {
971                needs_dedup = true;
972            }
973        }
974
975        if needs_dedup {
976            all_rows = dedup_rows(all_rows);
977        }
978
979        Ok(all_rows)
980    }
981
982    fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
983        self.check_deadline()?;
984        let rows = self.execute_node(plan, plan.root)?;
985        Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
986    }
987
988    pub(crate) fn hydrate_row(&self, row: Row) -> Row {
989        let mut out = Row::new();
990
991        for (var, name, value) in row.into_iter_named() {
992            out.insert_named(var, name, self.hydrate_value(value));
993        }
994
995        out
996    }
997
998    fn execute_node(
999        &mut self,
1000        plan: &PhysicalPlan,
1001        node_id: PhysicalNodeId,
1002    ) -> ExecResult<Vec<Row>> {
1003        self.check_deadline()?;
1004        trace!("mutable execute_node start: node_id={node_id:?}");
1005
1006        let result = match &plan.nodes[node_id] {
1007            PhysicalOp::Argument(op) => self.exec_argument(op),
1008            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
1009            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
1010            PhysicalOp::NodeByPropertyScan(op) => self.exec_node_by_property_scan(plan, op),
1011            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
1012            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
1013            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
1014            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
1015            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
1016            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
1017            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
1018            PhysicalOp::Create(op) => self.exec_create(plan, op),
1019            PhysicalOp::Merge(op) => self.exec_merge(plan, op),
1020            PhysicalOp::Delete(op) => self.exec_delete(plan, op),
1021            PhysicalOp::Set(op) => self.exec_set(plan, op),
1022            PhysicalOp::Remove(op) => self.exec_remove(plan, op),
1023            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
1024            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
1025        };
1026
1027        match &result {
1028            Ok(rows) => trace!(
1029                "mutable execute_node ok: node_id={node_id:?}, rows={}",
1030                rows.len()
1031            ),
1032            Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
1033        }
1034
1035        result
1036    }
1037
1038    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
1039        Ok(vec![Row::new()])
1040    }
1041
1042    fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
1043        let base_rows = match op.input {
1044            Some(input) => self.execute_node(plan, input)?,
1045            None => vec![Row::new()],
1046        };
1047
1048        let node_ids = self.ctx.storage.all_node_ids();
1049        let mut out = Vec::new();
1050
1051        let deadline = self.deadline;
1052        for row in base_rows {
1053            Self::check_loop_deadline(deadline)?;
1054            if let Some(existing) = row.get(op.var) {
1055                match existing {
1056                    LoraValue::Node(existing_id) => {
1057                        if self.ctx.storage.has_node(*existing_id) {
1058                            out.push(row);
1059                        }
1060                    }
1061                    other => {
1062                        return Err(ExecutorError::ExpectedNodeForExpand {
1063                            var: format!("{:?}", op.var),
1064                            found: value_kind(other),
1065                        });
1066                    }
1067                }
1068                continue;
1069            }
1070
1071            for &id in &node_ids {
1072                Self::check_loop_deadline(deadline)?;
1073                let mut new_row = row.clone();
1074                new_row.insert(op.var, LoraValue::Node(id));
1075                out.push(new_row);
1076            }
1077        }
1078
1079        Ok(out)
1080    }
1081
1082    fn exec_node_by_label_scan(
1083        &mut self,
1084        plan: &PhysicalPlan,
1085        op: &NodeByLabelScanExec,
1086    ) -> ExecResult<Vec<Row>> {
1087        let base_rows = match op.input {
1088            Some(input) => self.execute_node(plan, input)?,
1089            None => vec![Row::new()],
1090        };
1091
1092        let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
1093        let candidates_prefiltered = label_group_candidates_prefiltered(&op.labels);
1094        let mut out = Vec::new();
1095
1096        let deadline = self.deadline;
1097        for row in base_rows {
1098            Self::check_loop_deadline(deadline)?;
1099            if let Some(existing) = row.get(op.var) {
1100                match existing {
1101                    LoraValue::Node(existing_id) => {
1102                        let labels_ok = self
1103                            .ctx
1104                            .storage
1105                            .with_node(*existing_id, |n| {
1106                                node_matches_label_groups(&n.labels, &op.labels)
1107                            })
1108                            .unwrap_or(false);
1109                        if labels_ok {
1110                            out.push(row);
1111                        }
1112                    }
1113                    other => {
1114                        return Err(ExecutorError::ExpectedNodeForExpand {
1115                            var: format!("{:?}", op.var),
1116                            found: value_kind(other),
1117                        });
1118                    }
1119                }
1120                continue;
1121            }
1122
1123            for &id in &candidate_ids {
1124                Self::check_loop_deadline(deadline)?;
1125                if !candidates_prefiltered {
1126                    let labels_ok = self
1127                        .ctx
1128                        .storage
1129                        .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
1130                        .unwrap_or(false);
1131                    if !labels_ok {
1132                        continue;
1133                    }
1134                }
1135                let mut new_row = row.clone();
1136                new_row.insert(op.var, LoraValue::Node(id));
1137                out.push(new_row);
1138            }
1139        }
1140
1141        Ok(out)
1142    }
1143
1144    fn exec_node_by_property_scan(
1145        &mut self,
1146        plan: &PhysicalPlan,
1147        op: &NodeByPropertyScanExec,
1148    ) -> ExecResult<Vec<Row>> {
1149        let base_rows = match op.input {
1150            Some(input) => self.execute_node(plan, input)?,
1151            None => vec![Row::new()],
1152        };
1153
1154        let mut out = Vec::new();
1155
1156        let deadline = self.deadline;
1157        for row in base_rows {
1158            Self::check_loop_deadline(deadline)?;
1159            let expected = {
1160                let eval_ctx = EvalContext {
1161                    storage: &*self.ctx.storage,
1162                    params: &self.ctx.params,
1163                };
1164                eval_expr(&op.value, &row, &eval_ctx)
1165            };
1166
1167            if let Some(existing) = row.get(op.var) {
1168                match existing {
1169                    LoraValue::Node(existing_id) => {
1170                        if node_matches_property_filter(
1171                            &*self.ctx.storage,
1172                            *existing_id,
1173                            &op.labels,
1174                            &op.key,
1175                            &expected,
1176                        ) {
1177                            out.push(row);
1178                        }
1179                    }
1180                    other => {
1181                        return Err(ExecutorError::ExpectedNodeForExpand {
1182                            var: format!("{:?}", op.var),
1183                            found: value_kind(other),
1184                        });
1185                    }
1186                }
1187                continue;
1188            }
1189
1190            let candidates = indexed_node_property_candidates(
1191                &*self.ctx.storage,
1192                &op.labels,
1193                &op.key,
1194                &expected,
1195            );
1196            for id in candidates.ids {
1197                Self::check_loop_deadline(deadline)?;
1198                if !candidates.prefiltered
1199                    && !node_matches_property_filter(
1200                        &*self.ctx.storage,
1201                        id,
1202                        &op.labels,
1203                        &op.key,
1204                        &expected,
1205                    )
1206                {
1207                    continue;
1208                }
1209                let mut new_row = row.clone();
1210                new_row.insert(op.var, LoraValue::Node(id));
1211                out.push(new_row);
1212            }
1213        }
1214
1215        Ok(out)
1216    }
1217
1218    fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
1219        // Variable-length expansion: delegate to iterative expander.
1220        if let Some(range) = &op.range {
1221            return self.exec_expand_var_len(plan, op, range);
1222        }
1223
1224        let input_rows = self.execute_node(plan, op.input)?;
1225        let mut out = Vec::new();
1226
1227        for row in input_rows {
1228            let src_node_id = match row.get(op.src) {
1229                Some(LoraValue::Node(id)) => *id,
1230                Some(other) => {
1231                    return Err(ExecutorError::ExpectedNodeForExpand {
1232                        var: format!("{:?}", op.src),
1233                        found: value_kind(other),
1234                    });
1235                }
1236                None => continue,
1237            };
1238
1239            for (rel_id, dst_id) in
1240                self.ctx
1241                    .storage
1242                    .expand_ids(src_node_id, op.direction, &op.types)
1243            {
1244                if let Some(expr) = op.rel_properties.as_ref() {
1245                    let actual_props = self
1246                        .ctx
1247                        .storage
1248                        .with_relationship(rel_id, |rel| rel.properties.clone());
1249                    let matches = match actual_props {
1250                        Some(props) => {
1251                            self.relationship_matches_properties(&props, Some(expr), &row)?
1252                        }
1253                        None => false,
1254                    };
1255                    if !matches {
1256                        continue;
1257                    }
1258                }
1259
1260                if let Some(existing_dst) = row.get(op.dst) {
1261                    match existing_dst {
1262                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
1263                        LoraValue::Node(_) => continue,
1264                        other => {
1265                            return Err(ExecutorError::ExpectedNodeForExpand {
1266                                var: format!("{:?}", op.dst),
1267                                found: value_kind(other),
1268                            });
1269                        }
1270                    }
1271                }
1272
1273                if let Some(rel_var) = op.rel {
1274                    if let Some(existing_rel) = row.get(rel_var) {
1275                        match existing_rel {
1276                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
1277                            LoraValue::Relationship(_) => continue,
1278                            other => {
1279                                return Err(ExecutorError::ExpectedRelationshipForExpand {
1280                                    var: format!("{:?}", rel_var),
1281                                    found: value_kind(other),
1282                                });
1283                            }
1284                        }
1285                    }
1286                }
1287
1288                let mut new_row = row.clone();
1289
1290                if !new_row.contains_key(op.dst) {
1291                    new_row.insert(op.dst, LoraValue::Node(dst_id));
1292                }
1293
1294                if let Some(rel_var) = op.rel {
1295                    if !new_row.contains_key(rel_var) {
1296                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
1297                    }
1298                }
1299
1300                out.push(new_row);
1301            }
1302        }
1303
1304        Ok(out)
1305    }
1306
1307    fn exec_expand_var_len(
1308        &mut self,
1309        plan: &PhysicalPlan,
1310        op: &ExpandExec,
1311        range: &RangeLiteral,
1312    ) -> ExecResult<Vec<Row>> {
1313        let input_rows = self.execute_node(plan, op.input)?;
1314        let (min_hops, max_hops) = resolve_range(range);
1315        let mut out = Vec::new();
1316
1317        for row in input_rows {
1318            let src_node_id = match row.get(op.src) {
1319                Some(LoraValue::Node(id)) => *id,
1320                Some(other) => {
1321                    return Err(ExecutorError::ExpectedNodeForExpand {
1322                        var: format!("{:?}", op.src),
1323                        found: value_kind(other),
1324                    });
1325                }
1326                None => continue,
1327            };
1328
1329            let expansions = variable_length_expand(
1330                &*self.ctx.storage,
1331                src_node_id,
1332                op.direction,
1333                &op.types,
1334                min_hops,
1335                max_hops,
1336            );
1337
1338            for result in expansions {
1339                let mut new_row = row.clone();
1340                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
1341
1342                if let Some(rel_var) = op.rel {
1343                    // Consume rel_ids — it's owned and no longer needed after this.
1344                    let rel_list = LoraValue::List(
1345                        result
1346                            .rel_ids
1347                            .into_iter()
1348                            .map(LoraValue::Relationship)
1349                            .collect(),
1350                    );
1351                    new_row.insert(rel_var, rel_list);
1352                }
1353
1354                out.push(new_row);
1355            }
1356        }
1357
1358        Ok(out)
1359    }
1360
1361    fn relationship_matches_properties(
1362        &self,
1363        actual: &Properties,
1364        expected_expr: Option<&ResolvedExpr>,
1365        row: &Row,
1366    ) -> ExecResult<bool> {
1367        let Some(expr) = expected_expr else {
1368            return Ok(true);
1369        };
1370
1371        let eval_ctx = EvalContext {
1372            storage: &*self.ctx.storage,
1373            params: &self.ctx.params,
1374        };
1375
1376        let expected = eval_expr(expr, row, &eval_ctx);
1377
1378        let LoraValue::Map(expected_map) = expected else {
1379            return Err(ExecutorError::ExpectedPropertyMap {
1380                found: value_kind(&expected),
1381            });
1382        };
1383
1384        Ok(expected_map.iter().all(|(key, expected_value)| {
1385            actual
1386                .get(key)
1387                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1388                .unwrap_or(false)
1389        }))
1390    }
1391
1392    fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1393        let input_rows = self.execute_node(plan, op.input)?;
1394        let eval_ctx = EvalContext {
1395            storage: &*self.ctx.storage,
1396            params: &self.ctx.params,
1397        };
1398
1399        Ok(input_rows
1400            .into_iter()
1401            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1402            .collect())
1403    }
1404
1405    fn exec_projection(
1406        &mut self,
1407        plan: &PhysicalPlan,
1408        op: &ProjectionExec,
1409    ) -> ExecResult<Vec<Row>> {
1410        let input_rows = self.execute_node(plan, op.input)?;
1411        let eval_ctx = EvalContext {
1412            storage: &*self.ctx.storage,
1413            params: &self.ctx.params,
1414        };
1415
1416        let mut out = Vec::with_capacity(input_rows.len());
1417
1418        for row in input_rows {
1419            if op.include_existing {
1420                let mut projected = row;
1421                for item in &op.items {
1422                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
1423                    if let Some(err) = take_eval_error() {
1424                        return Err(ExecutorError::RuntimeError(err));
1425                    }
1426                    projected.insert_named(item.output, item.name.clone(), value);
1427                }
1428                out.push(projected);
1429            } else {
1430                let mut projected = Row::new();
1431                for item in &op.items {
1432                    let value = eval_expr(&item.expr, &row, &eval_ctx);
1433                    if let Some(err) = take_eval_error() {
1434                        return Err(ExecutorError::RuntimeError(err));
1435                    }
1436                    projected.insert_named(item.output, item.name.clone(), value);
1437                }
1438                out.push(projected);
1439            }
1440        }
1441
1442        Ok(if op.distinct {
1443            dedup_rows_by_vars(out)
1444        } else {
1445            out
1446        })
1447    }
1448
1449    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1450        match value {
1451            LoraValue::Node(id) => self.hydrate_node(id),
1452            LoraValue::Relationship(id) => self.hydrate_relationship(id),
1453            LoraValue::List(values) => {
1454                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1455            }
1456            LoraValue::Map(map) => LoraValue::Map(
1457                map.into_iter()
1458                    .map(|(k, v)| (k, self.hydrate_value(v)))
1459                    .collect(),
1460            ),
1461            other => other,
1462        }
1463    }
1464
1465    fn hydrate_node(&self, id: u64) -> LoraValue {
1466        self.ctx
1467            .storage
1468            .with_node(id, hydrate_node_record)
1469            .unwrap_or(LoraValue::Null)
1470    }
1471
1472    fn hydrate_relationship(&self, id: u64) -> LoraValue {
1473        self.ctx
1474            .storage
1475            .with_relationship(id, hydrate_relationship_record)
1476            .unwrap_or(LoraValue::Null)
1477    }
1478
1479    fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1480        let input_rows = self.execute_node(plan, op.input)?;
1481        let eval_ctx = EvalContext {
1482            storage: &*self.ctx.storage,
1483            params: &self.ctx.params,
1484        };
1485
1486        let mut out = Vec::new();
1487
1488        for row in input_rows {
1489            match eval_expr(&op.expr, &row, &eval_ctx) {
1490                LoraValue::List(values) => {
1491                    for value in values {
1492                        let mut new_row = row.clone();
1493                        new_row.insert(op.alias, value);
1494                        out.push(new_row);
1495                    }
1496                }
1497                LoraValue::Null => {}
1498                other => {
1499                    let mut new_row = row;
1500                    new_row.insert(op.alias, other);
1501                    out.push(new_row);
1502                }
1503            }
1504        }
1505
1506        Ok(out)
1507    }
1508
1509    fn exec_hash_aggregation(
1510        &mut self,
1511        plan: &PhysicalPlan,
1512        op: &HashAggregationExec,
1513    ) -> ExecResult<Vec<Row>> {
1514        let input_rows = self.execute_node(plan, op.input)?;
1515        let eval_ctx = EvalContext {
1516            storage: &*self.ctx.storage,
1517            params: &self.ctx.params,
1518        };
1519
1520        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1521
1522        if op.group_by.is_empty() {
1523            groups.insert(Vec::new(), input_rows);
1524        } else {
1525            for row in input_rows {
1526                let key = op
1527                    .group_by
1528                    .iter()
1529                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1530                    .collect::<Vec<_>>();
1531
1532                groups.entry(key).or_default().push(row);
1533            }
1534        }
1535
1536        let mut out = Vec::new();
1537
1538        for rows in groups.into_values() {
1539            let mut result = Row::new();
1540
1541            if let Some(first) = rows.first() {
1542                for proj in &op.group_by {
1543                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1544                    result.insert_named(proj.output, proj.name.clone(), value);
1545                }
1546            }
1547
1548            for proj in &op.aggregates {
1549                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1550                result.insert_named(proj.output, proj.name.clone(), value);
1551            }
1552
1553            out.push(result);
1554        }
1555
1556        Ok(out)
1557    }
1558
1559    fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1560        let mut rows = self.execute_node(plan, op.input)?;
1561        let eval_ctx = EvalContext {
1562            storage: &*self.ctx.storage,
1563            params: &self.ctx.params,
1564        };
1565
1566        rows.sort_by(|a, b| {
1567            for item in &op.items {
1568                let ord = compare_sort_item(item, a, b, &eval_ctx);
1569                if ord != Ordering::Equal {
1570                    return ord;
1571                }
1572            }
1573            Ordering::Equal
1574        });
1575
1576        Ok(rows)
1577    }
1578
1579    fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1580        let mut rows = self.execute_node(plan, op.input)?;
1581        let eval_ctx = EvalContext {
1582            storage: &*self.ctx.storage,
1583            params: &self.ctx.params,
1584        };
1585
1586        let limit = op
1587            .limit
1588            .as_ref()
1589            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1590            .unwrap_or(rows.len() as i64)
1591            .max(0) as usize;
1592
1593        let skip = op
1594            .skip
1595            .as_ref()
1596            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1597            .unwrap_or(0)
1598            .max(0) as usize;
1599
1600        if skip >= rows.len() {
1601            return Ok(Vec::new());
1602        }
1603
1604        rows.drain(0..skip);
1605        rows.truncate(limit);
1606        Ok(rows)
1607    }
1608
1609    fn exec_optional_match(
1610        &mut self,
1611        plan: &PhysicalPlan,
1612        op: &OptionalMatchExec,
1613    ) -> ExecResult<Vec<Row>> {
1614        let input_rows = self.execute_node(plan, op.input)?;
1615
1616        // Inner plan is read-only and input-independent; execute once and reuse.
1617        let inner_rows = self.execute_node(plan, op.inner)?;
1618
1619        let mut out = Vec::new();
1620
1621        for input_row in input_rows {
1622            let mut matched = false;
1623
1624            for inner_row in &inner_rows {
1625                let compatible = input_row
1626                    .iter()
1627                    .all(|(var, val)| match inner_row.get(*var) {
1628                        Some(inner_val) => inner_val == val,
1629                        None => true,
1630                    });
1631                if !compatible {
1632                    continue;
1633                }
1634
1635                let mut merged = input_row.clone();
1636                for (var, name, val) in inner_row.iter_named() {
1637                    if !merged.contains_key(*var) {
1638                        merged.insert_named(*var, name.into_owned(), val.clone());
1639                    }
1640                }
1641                out.push(merged);
1642                matched = true;
1643            }
1644
1645            if !matched {
1646                let mut null_row = input_row;
1647                for &var_id in &op.new_vars {
1648                    if !null_row.contains_key(var_id) {
1649                        null_row.insert(var_id, LoraValue::Null);
1650                    }
1651                }
1652                out.push(null_row);
1653            }
1654        }
1655
1656        Ok(out)
1657    }
1658
1659    fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1660        let input_rows = self.execute_node(plan, op.input)?;
1661        let mut rows: Vec<Row> = input_rows
1662            .into_iter()
1663            .map(|mut row| {
1664                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1665                row.insert(op.output, path);
1666                row
1667            })
1668            .collect();
1669
1670        if let Some(all) = op.shortest_path_all {
1671            rows = filter_shortest_paths(rows, op.output, all);
1672        }
1673        Ok(rows)
1674    }
1675
1676    fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1677        // Fast path: if the input subtree is fully streamable (no
1678        // nested writes, no blocking operators), pull rows one at a
1679        // time and apply the create pattern per row, instead of
1680        // materializing the whole input. The output Vec still
1681        // accumulates — auto-commit-side output streaming is M1.b.
1682        if crate::pull::subtree_is_fully_streaming(plan, op.input) {
1683            return self.exec_create_streaming_input(plan, op);
1684        }
1685
1686        let input_rows = self.execute_node(plan, op.input)?;
1687        let mut out = Vec::with_capacity(input_rows.len());
1688
1689        for mut row in input_rows {
1690            self.apply_create_pattern(&mut row, &op.pattern)?;
1691            out.push(row);
1692        }
1693
1694        Ok(out)
1695    }
1696
1697    /// Generic streaming-input loop for write operators whose input
1698    /// subtree is fully streamable. Opens a pull-based read cursor
1699    /// over the input subtree, calls `apply` per row, and accumulates
1700    /// the resulting rows.
1701    ///
1702    /// # Safety
1703    ///
1704    /// The upstream [`crate::pull::RowSource`] needs `&S` while it
1705    /// lives; the per-row `apply` callback needs `&mut S` (via
1706    /// `&mut self`). The existing read-side `RowSource` impls
1707    /// materialize their iteration state into owned `Vec`s at
1708    /// construction time (see `NodeScanSource::cur_ids`,
1709    /// `ExpandSource::cur_edges`, etc. in `pull.rs`), so no live
1710    /// `&S` borrow into storage persists across `next_row` calls.
1711    /// We exploit that by deriving the read borrow from a raw
1712    /// pointer — Rust then doesn't see the shared/mutable conflict
1713    /// at compile time, and the dynamic access pattern is
1714    /// non-aliasing: read-only inside `next_row`, then mutable
1715    /// inside `apply`, never both at the same instant.
1716    fn streaming_apply<F>(
1717        &mut self,
1718        plan: &PhysicalPlan,
1719        input: PhysicalNodeId,
1720        mut apply: F,
1721    ) -> ExecResult<Vec<Row>>
1722    where
1723        F: FnMut(&mut Self, &mut Row) -> ExecResult<()>,
1724    {
1725        use std::sync::Arc;
1726
1727        let storage_ptr: *mut S = self.ctx.storage as *mut S;
1728        let params = Arc::new(self.ctx.params.clone());
1729
1730        // SAFETY: see method-level comment.
1731        let storage_ref: &S = unsafe { &*storage_ptr };
1732        let mut upstream = crate::pull::build_streaming(plan, input, storage_ref, params)?;
1733
1734        let mut out = Vec::new();
1735        while let Some(mut row) = upstream.next_row()? {
1736            apply(self, &mut row)?;
1737            out.push(row);
1738        }
1739
1740        Ok(out)
1741    }
1742
1743    /// Streaming-input variant of [`Self::exec_create`]. Delegates
1744    /// to [`Self::streaming_apply`].
1745    fn exec_create_streaming_input(
1746        &mut self,
1747        plan: &PhysicalPlan,
1748        op: &CreateExec,
1749    ) -> ExecResult<Vec<Row>> {
1750        self.streaming_apply(plan, op.input, |this, row| {
1751            this.apply_create_pattern(row, &op.pattern)
1752        })
1753    }
1754
1755    fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1756        match item {
1757            ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1758                Some(LoraValue::Node(node_id)) => {
1759                    let node_id = *node_id;
1760                    for label in labels {
1761                        self.ctx.storage.remove_node_label(node_id, label);
1762                    }
1763                    Ok(())
1764                }
1765                Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1766                    found: value_kind(other),
1767                }),
1768                None => Err(ExecutorError::UnboundVariableForRemove {
1769                    var: format!("{variable:?}"),
1770                }),
1771            },
1772
1773            ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1774        }
1775    }
1776
1777    fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1778        match value {
1779            LoraValue::Null => Ok(()),
1780
1781            LoraValue::Node(node_id) => {
1782                if detach {
1783                    self.ctx.storage.detach_delete_node(node_id);
1784                    Ok(())
1785                } else {
1786                    let ok = self.ctx.storage.delete_node(node_id);
1787                    if ok {
1788                        Ok(())
1789                    } else {
1790                        Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1791                    }
1792                }
1793            }
1794
1795            LoraValue::Relationship(rel_id) => {
1796                let ok = self.ctx.storage.delete_relationship(rel_id);
1797                if ok {
1798                    Ok(())
1799                } else {
1800                    Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1801                }
1802            }
1803
1804            LoraValue::List(values) => {
1805                for v in values {
1806                    self.delete_value(v, detach)?;
1807                }
1808                Ok(())
1809            }
1810
1811            other => Err(ExecutorError::InvalidDeleteTarget {
1812                found: value_kind(&other),
1813            }),
1814        }
1815    }
1816
1817    fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1818        // Streaming-input fast path when the input subtree is fully
1819        // streamable. Per-row work (probe → optionally create →
1820        // ON MATCH / ON CREATE actions) is identical to the
1821        // materialized branch below.
1822        if crate::pull::subtree_is_fully_streaming(plan, op.input) {
1823            return self.streaming_apply(plan, op.input, |this, row| {
1824                let already_bound = this.pattern_part_is_bound(row, &op.pattern_part);
1825                let matched = if already_bound {
1826                    true
1827                } else {
1828                    this.try_match_merge_pattern(row, &op.pattern_part)?
1829                };
1830                if !matched {
1831                    this.apply_create_pattern_part(row, &op.pattern_part)?;
1832                }
1833                for action in &op.actions {
1834                    if action.on_match == matched {
1835                        for item in &action.set.items {
1836                            this.apply_set_item(row, item)?;
1837                        }
1838                    }
1839                }
1840                Ok(())
1841            });
1842        }
1843
1844        let input_rows = self.execute_node(plan, op.input)?;
1845        let mut out = Vec::with_capacity(input_rows.len());
1846
1847        for mut row in input_rows {
1848            // First check if the pattern variable is already bound in the row.
1849            let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1850
1851            let matched = if already_bound {
1852                true
1853            } else {
1854                // Try to find an existing match in the graph.
1855                self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1856            };
1857
1858            if !matched {
1859                self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1860            }
1861
1862            for action in &op.actions {
1863                if action.on_match == matched {
1864                    for item in &action.set.items {
1865                        self.apply_set_item(&row, item)?;
1866                    }
1867                }
1868            }
1869
1870            out.push(row);
1871        }
1872
1873        Ok(out)
1874    }
1875
1876    /// Try to find an existing node/pattern in the graph matching the MERGE
1877    /// pattern. If found, bind the variable in the row and return true.
1878    fn try_match_merge_pattern(
1879        &self,
1880        row: &mut Row,
1881        part: &ResolvedPatternPart,
1882    ) -> ExecResult<bool> {
1883        match &part.element {
1884            ResolvedPatternElement::Node {
1885                var,
1886                labels,
1887                properties,
1888            } => {
1889                // ID-only candidate discovery; borrow the record during
1890                // label/property filtering to avoid cloning non-matches.
1891                let candidate_ids = if labels.is_empty() {
1892                    self.ctx.storage.all_node_ids()
1893                } else {
1894                    scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1895                };
1896
1897                // Filter by properties if specified
1898                let eval_ctx = EvalContext {
1899                    storage: &*self.ctx.storage,
1900                    params: &self.ctx.params,
1901                };
1902                let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1903
1904                for id in candidate_ids {
1905                    let matched = self
1906                        .ctx
1907                        .storage
1908                        .with_node(id, |node| {
1909                            if !node_matches_label_groups(&node.labels, labels) {
1910                                return false;
1911                            }
1912                            if let Some(LoraValue::Map(expected)) = &expected_props {
1913                                let all_match = expected.iter().all(|(key, expected_value)| {
1914                                    node.properties
1915                                        .get(key)
1916                                        .map(|actual| {
1917                                            value_matches_property_value(expected_value, actual)
1918                                        })
1919                                        .unwrap_or(false)
1920                                });
1921                                if !all_match {
1922                                    return false;
1923                                }
1924                            }
1925                            true
1926                        })
1927                        .unwrap_or(false);
1928
1929                    if !matched {
1930                        continue;
1931                    }
1932
1933                    // Found a match — bind the variable
1934                    if let Some(var_id) = var {
1935                        row.insert(*var_id, LoraValue::Node(id));
1936                    }
1937                    return Ok(true);
1938                }
1939
1940                Ok(false)
1941            }
1942
1943            ResolvedPatternElement::ShortestPath { .. } => {
1944                // ShortestPath is not valid in MERGE context
1945                Ok(false)
1946            }
1947
1948            ResolvedPatternElement::NodeChain { head, chain } => {
1949                // Resolve the head node — it should be already bound in the row.
1950                let head_node_id = if let Some(var_id) = head.var {
1951                    if let Some(LoraValue::Node(id)) = row.get(var_id) {
1952                        *id
1953                    } else {
1954                        // Try to match head node as a standalone node pattern.
1955                        let node_matched = self.try_match_merge_pattern(
1956                            row,
1957                            &ResolvedPatternPart {
1958                                binding: None,
1959                                element: ResolvedPatternElement::Node {
1960                                    var: head.var,
1961                                    labels: head.labels.clone(),
1962                                    properties: head.properties.clone(),
1963                                },
1964                            },
1965                        )?;
1966                        if !node_matched {
1967                            return Ok(false);
1968                        }
1969                        match row.get(var_id) {
1970                            Some(LoraValue::Node(id)) => *id,
1971                            _ => return Ok(false),
1972                        }
1973                    }
1974                } else {
1975                    return Ok(false);
1976                };
1977
1978                let mut current_node_id = head_node_id;
1979
1980                for step in chain {
1981                    let eval_ctx = EvalContext {
1982                        storage: &*self.ctx.storage,
1983                        params: &self.ctx.params,
1984                    };
1985
1986                    let _ = step.rel.types.first();
1987                    let direction = step.rel.direction;
1988
1989                    // ID-only traversal; look up records by reference only for
1990                    // candidates that pass the label/property filters.
1991                    let edges =
1992                        self.ctx
1993                            .storage
1994                            .expand_ids(current_node_id, direction, &step.rel.types);
1995
1996                    // Try to find a matching edge + target node
1997                    let mut found = false;
1998                    for (rel_id, node_id) in edges {
1999                        // Check target node labels and (optional) properties.
2000                        let node_ok = self
2001                            .ctx
2002                            .storage
2003                            .with_node(node_id, |node_rec| {
2004                                if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
2005                                    return false;
2006                                }
2007                                if let Some(props_expr) = &step.node.properties {
2008                                    let expected = eval_expr(props_expr, row, &eval_ctx);
2009                                    if let LoraValue::Map(expected_map) = &expected {
2010                                        let all_match =
2011                                            expected_map.iter().all(|(key, expected_val)| {
2012                                                node_rec
2013                                                    .properties
2014                                                    .get(key)
2015                                                    .map(|actual| {
2016                                                        value_matches_property_value(
2017                                                            expected_val,
2018                                                            actual,
2019                                                        )
2020                                                    })
2021                                                    .unwrap_or(false)
2022                                            });
2023                                        if !all_match {
2024                                            return false;
2025                                        }
2026                                    }
2027                                }
2028                                true
2029                            })
2030                            .unwrap_or(false);
2031                        if !node_ok {
2032                            continue;
2033                        }
2034
2035                        // Check relationship properties.
2036                        let rel_ok = self
2037                            .ctx
2038                            .storage
2039                            .with_relationship(rel_id, |rel_rec| {
2040                                if let Some(rel_props_expr) = &step.rel.properties {
2041                                    let expected = eval_expr(rel_props_expr, row, &eval_ctx);
2042                                    if let LoraValue::Map(expected_map) = &expected {
2043                                        let all_match =
2044                                            expected_map.iter().all(|(key, expected_val)| {
2045                                                rel_rec
2046                                                    .properties
2047                                                    .get(key)
2048                                                    .map(|actual| {
2049                                                        value_matches_property_value(
2050                                                            expected_val,
2051                                                            actual,
2052                                                        )
2053                                                    })
2054                                                    .unwrap_or(false)
2055                                            });
2056                                        if !all_match {
2057                                            return false;
2058                                        }
2059                                    }
2060                                }
2061                                true
2062                            })
2063                            .unwrap_or(false);
2064                        if !rel_ok {
2065                            continue;
2066                        }
2067
2068                        // Match found — bind variables
2069                        if let Some(rel_var) = step.rel.var {
2070                            row.insert(rel_var, LoraValue::Relationship(rel_id));
2071                        }
2072                        if let Some(node_var) = step.node.var {
2073                            row.insert(node_var, LoraValue::Node(node_id));
2074                        }
2075                        current_node_id = node_id;
2076                        found = true;
2077                        break;
2078                    }
2079
2080                    if !found {
2081                        return Ok(false);
2082                    }
2083                }
2084
2085                Ok(true)
2086            }
2087        }
2088    }
2089
2090    fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
2091        if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2092            let detach = op.detach;
2093            return self.streaming_apply(plan, op.input, |this, row| {
2094                for expr in &op.expressions {
2095                    let value = {
2096                        let eval_ctx = EvalContext {
2097                            storage: &*this.ctx.storage,
2098                            params: &this.ctx.params,
2099                        };
2100                        eval_expr(expr, row, &eval_ctx)
2101                    };
2102                    this.delete_value(value, detach)?;
2103                }
2104                Ok(())
2105            });
2106        }
2107
2108        let input_rows = self.execute_node(plan, op.input)?;
2109
2110        for row in &input_rows {
2111            for expr in &op.expressions {
2112                let value = {
2113                    let eval_ctx = EvalContext {
2114                        storage: &*self.ctx.storage,
2115                        params: &self.ctx.params,
2116                    };
2117                    eval_expr(expr, row, &eval_ctx)
2118                };
2119
2120                self.delete_value(value, op.detach)?;
2121            }
2122        }
2123
2124        Ok(input_rows)
2125    }
2126
2127    fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
2128        if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2129            return self.streaming_apply(plan, op.input, |this, row| {
2130                for item in &op.items {
2131                    this.apply_set_item(row, item)?;
2132                }
2133                Ok(())
2134            });
2135        }
2136
2137        let input_rows = self.execute_node(plan, op.input)?;
2138
2139        for row in &input_rows {
2140            for item in &op.items {
2141                self.apply_set_item(row, item)?;
2142            }
2143        }
2144
2145        Ok(input_rows)
2146    }
2147
2148    fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
2149        if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2150            return self.streaming_apply(plan, op.input, |this, row| {
2151                for item in &op.items {
2152                    this.apply_remove_item(row, item)?;
2153                }
2154                Ok(())
2155            });
2156        }
2157
2158        let input_rows = self.execute_node(plan, op.input)?;
2159
2160        for row in &input_rows {
2161            for item in &op.items {
2162                self.apply_remove_item(row, item)?;
2163            }
2164        }
2165
2166        Ok(input_rows)
2167    }
2168
2169    fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
2170        match item {
2171            ResolvedSetItem::SetProperty { target, value } => {
2172                let new_value = {
2173                    let eval_ctx = EvalContext {
2174                        storage: &*self.ctx.storage,
2175                        params: &self.ctx.params,
2176                    };
2177                    eval_expr(value, row, &eval_ctx)
2178                };
2179
2180                self.set_property_from_expr(row, target, new_value)
2181            }
2182
2183            ResolvedSetItem::SetVariable { variable, value } => {
2184                // Only need the entity's id — peek at the binding by reference.
2185                let entity_ref =
2186                    row.get(*variable)
2187                        .ok_or(ExecutorError::UnboundVariableForSet {
2188                            var: format!("{variable:?}"),
2189                        })?;
2190                let entity_target = entity_target_from_value(entity_ref)?;
2191
2192                let new_value = {
2193                    let eval_ctx = EvalContext {
2194                        storage: &*self.ctx.storage,
2195                        params: &self.ctx.params,
2196                    };
2197                    eval_expr(value, row, &eval_ctx)
2198                };
2199
2200                self.overwrite_entity_target(entity_target, new_value)
2201            }
2202
2203            ResolvedSetItem::MutateVariable { variable, value } => {
2204                let entity_ref =
2205                    row.get(*variable)
2206                        .ok_or(ExecutorError::UnboundVariableForSet {
2207                            var: format!("{variable:?}"),
2208                        })?;
2209                let entity_target = entity_target_from_value(entity_ref)?;
2210
2211                let patch = {
2212                    let eval_ctx = EvalContext {
2213                        storage: &*self.ctx.storage,
2214                        params: &self.ctx.params,
2215                    };
2216                    eval_expr(value, row, &eval_ctx)
2217                };
2218
2219                self.mutate_entity_target(entity_target, patch)
2220            }
2221
2222            ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
2223                Some(LoraValue::Node(node_id)) => {
2224                    let node_id = *node_id;
2225                    for label in labels {
2226                        self.ctx.storage.add_node_label(node_id, label);
2227                    }
2228                    Ok(())
2229                }
2230                Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
2231                    found: value_kind(other),
2232                }),
2233                None => Err(ExecutorError::UnboundVariableForSet {
2234                    var: format!("{variable:?}"),
2235                }),
2236            },
2237        }
2238    }
2239
2240    fn set_property_from_expr(
2241        &mut self,
2242        row: &Row,
2243        target_expr: &ResolvedExpr,
2244        new_value: LoraValue,
2245    ) -> ExecResult<()> {
2246        let ResolvedExpr::Property { expr, property } = target_expr else {
2247            return Err(ExecutorError::UnsupportedSetTarget);
2248        };
2249
2250        let owner = {
2251            let eval_ctx = EvalContext {
2252                storage: &*self.ctx.storage,
2253                params: &self.ctx.params,
2254            };
2255            eval_expr(expr, row, &eval_ctx)
2256        };
2257
2258        match owner {
2259            LoraValue::Node(node_id) => {
2260                let prop = lora_value_to_property(new_value)
2261                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2262                self.ctx
2263                    .storage
2264                    .set_node_property(node_id, property.clone(), prop);
2265                Ok(())
2266            }
2267            LoraValue::Relationship(rel_id) => {
2268                let prop = lora_value_to_property(new_value)
2269                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2270                self.ctx
2271                    .storage
2272                    .set_relationship_property(rel_id, property.clone(), prop);
2273                Ok(())
2274            }
2275            other => Err(ExecutorError::InvalidSetTarget {
2276                found: value_kind(&other),
2277            }),
2278        }
2279    }
2280
2281    fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
2282        let ResolvedExpr::Property {
2283            expr: owner_expr,
2284            property,
2285        } = expr
2286        else {
2287            return Err(ExecutorError::UnsupportedRemoveTarget);
2288        };
2289
2290        let owner = {
2291            let eval_ctx = EvalContext {
2292                storage: &*self.ctx.storage,
2293                params: &self.ctx.params,
2294            };
2295            eval_expr(owner_expr, row, &eval_ctx)
2296        };
2297
2298        match owner {
2299            LoraValue::Node(node_id) => {
2300                self.ctx.storage.remove_node_property(node_id, property);
2301                Ok(())
2302            }
2303            LoraValue::Relationship(rel_id) => {
2304                self.ctx
2305                    .storage
2306                    .remove_relationship_property(rel_id, property);
2307                Ok(())
2308            }
2309            other => Err(ExecutorError::InvalidRemoveTarget {
2310                found: value_kind(&other),
2311            }),
2312        }
2313    }
2314
2315    fn overwrite_entity_target(
2316        &mut self,
2317        target: EntityTarget,
2318        new_value: LoraValue,
2319    ) -> ExecResult<()> {
2320        let LoraValue::Map(map) = new_value else {
2321            return Err(ExecutorError::ExpectedPropertyMap {
2322                found: value_kind(&new_value),
2323            });
2324        };
2325
2326        let mut props: Properties = Properties::new();
2327        for (k, v) in map {
2328            let prop = lora_value_to_property(v)
2329                .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2330            props.insert(k, prop);
2331        }
2332
2333        match target {
2334            EntityTarget::Node(node_id) => {
2335                self.ctx.storage.replace_node_properties(node_id, props);
2336            }
2337            EntityTarget::Relationship(rel_id) => {
2338                self.ctx
2339                    .storage
2340                    .replace_relationship_properties(rel_id, props);
2341            }
2342        }
2343        Ok(())
2344    }
2345
2346    fn mutate_entity_target(
2347        &mut self,
2348        target: EntityTarget,
2349        patch_value: LoraValue,
2350    ) -> ExecResult<()> {
2351        let LoraValue::Map(map) = patch_value else {
2352            return Err(ExecutorError::ExpectedPropertyMap {
2353                found: value_kind(&patch_value),
2354            });
2355        };
2356
2357        match target {
2358            EntityTarget::Node(node_id) => {
2359                for (k, v) in map {
2360                    let prop = lora_value_to_property(v)
2361                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2362                    self.ctx.storage.set_node_property(node_id, k, prop);
2363                }
2364            }
2365            EntityTarget::Relationship(rel_id) => {
2366                for (k, v) in map {
2367                    let prop = lora_value_to_property(v)
2368                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2369                    self.ctx.storage.set_relationship_property(rel_id, k, prop);
2370                }
2371            }
2372        }
2373        Ok(())
2374    }
2375
2376    pub(crate) fn apply_create_pattern(
2377        &mut self,
2378        row: &mut Row,
2379        pattern: &ResolvedPattern,
2380    ) -> ExecResult<()> {
2381        for part in &pattern.parts {
2382            self.apply_create_pattern_part(row, part)?;
2383        }
2384        Ok(())
2385    }
2386
2387    /// Apply a single per-row write for any of the streamable write
2388    /// operators (Create / Set / Delete / Remove / Merge). Used by
2389    /// the [`crate::pull::StreamingWriteCursor`] auto-commit fast
2390    /// path: the cursor pulls one input row from a read upstream,
2391    /// hands it here for the side effect, and emits the row back.
2392    pub(crate) fn apply_write_op(&mut self, op: &PhysicalOp, row: &mut Row) -> ExecResult<()> {
2393        match op {
2394            PhysicalOp::Create(c) => self.apply_create_pattern(row, &c.pattern),
2395            PhysicalOp::Set(s) => {
2396                for item in &s.items {
2397                    self.apply_set_item(row, item)?;
2398                }
2399                Ok(())
2400            }
2401            PhysicalOp::Delete(d) => {
2402                let detach = d.detach;
2403                for expr in &d.expressions {
2404                    let value = {
2405                        let eval_ctx = EvalContext {
2406                            storage: &*self.ctx.storage,
2407                            params: &self.ctx.params,
2408                        };
2409                        eval_expr(expr, row, &eval_ctx)
2410                    };
2411                    self.delete_value(value, detach)?;
2412                }
2413                Ok(())
2414            }
2415            PhysicalOp::Remove(r) => {
2416                for item in &r.items {
2417                    self.apply_remove_item(row, item)?;
2418                }
2419                Ok(())
2420            }
2421            PhysicalOp::Merge(m) => {
2422                let already_bound = self.pattern_part_is_bound(row, &m.pattern_part);
2423                let matched = if already_bound {
2424                    true
2425                } else {
2426                    self.try_match_merge_pattern(row, &m.pattern_part)?
2427                };
2428                if !matched {
2429                    self.apply_create_pattern_part(row, &m.pattern_part)?;
2430                }
2431                for action in &m.actions {
2432                    if action.on_match == matched {
2433                        for item in &action.set.items {
2434                            self.apply_set_item(row, item)?;
2435                        }
2436                    }
2437                }
2438                Ok(())
2439            }
2440            other => Err(ExecutorError::RuntimeError(format!(
2441                "apply_write_op called on non-write op: {other:?}"
2442            ))),
2443        }
2444    }
2445
2446    fn apply_create_pattern_part(
2447        &mut self,
2448        row: &mut Row,
2449        part: &ResolvedPatternPart,
2450    ) -> ExecResult<()> {
2451        if part.binding.is_some() {
2452            trace!("create pattern part has path binding; path materialization not implemented");
2453        }
2454
2455        let _ = self.apply_create_pattern_element(row, &part.element)?;
2456        Ok(())
2457    }
2458
2459    fn apply_create_pattern_element(
2460        &mut self,
2461        row: &mut Row,
2462        element: &ResolvedPatternElement,
2463    ) -> ExecResult<Option<LoraValue>> {
2464        match element {
2465            ResolvedPatternElement::Node {
2466                var,
2467                labels,
2468                properties,
2469            } => {
2470                let node_id =
2471                    self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
2472                Ok(Some(LoraValue::Node(node_id)))
2473            }
2474
2475            ResolvedPatternElement::NodeChain { head, chain } => {
2476                let mut current_node_id = self.materialize_node_pattern(
2477                    row,
2478                    head.var,
2479                    &head.labels,
2480                    head.properties.as_ref(),
2481                )?;
2482
2483                for link in chain {
2484                    let next_node_id = self.materialize_node_pattern(
2485                        row,
2486                        link.node.var,
2487                        &link.node.labels,
2488                        link.node.properties.as_ref(),
2489                    )?;
2490
2491                    let _ = self.materialize_relationship_pattern(
2492                        row,
2493                        current_node_id,
2494                        next_node_id,
2495                        &link.rel,
2496                    )?;
2497
2498                    current_node_id = next_node_id;
2499                }
2500
2501                Ok(Some(LoraValue::Node(current_node_id)))
2502            }
2503
2504            ResolvedPatternElement::ShortestPath { .. } => {
2505                // ShortestPath is not valid in CREATE context
2506                Ok(None)
2507            }
2508        }
2509    }
2510
2511    fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
2512        match &part.element {
2513            ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
2514
2515            ResolvedPatternElement::ShortestPath { .. } => false,
2516
2517            ResolvedPatternElement::NodeChain { head, chain } => {
2518                let head_ok = head.var.and_then(|v| row.get(v)).is_some();
2519
2520                let chain_ok = chain.iter().all(|link| {
2521                    let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
2522                    // For MERGE, anonymous relationships cannot be considered
2523                    // "bound" because we have no variable to check. The merge
2524                    // must search the graph to see if the relationship exists.
2525                    let rel_ok = match link.rel.var {
2526                        Some(v) => row.get(v).is_some(),
2527                        None => false,
2528                    };
2529                    node_ok && rel_ok
2530                });
2531
2532                head_ok && chain_ok
2533            }
2534        }
2535    }
2536
2537    fn materialize_node_pattern(
2538        &mut self,
2539        row: &mut Row,
2540        var: Option<lora_analyzer::symbols::VarId>,
2541        labels: &[Vec<String>],
2542        properties: Option<&ResolvedExpr>,
2543    ) -> ExecResult<u64> {
2544        if let Some(var_id) = var {
2545            if let Some(LoraValue::Node(id)) = row.get(var_id) {
2546                return Ok(*id);
2547            }
2548        }
2549
2550        let properties = match properties {
2551            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2552            None => Properties::new(),
2553        };
2554
2555        let flat_labels = flatten_label_groups(labels);
2556        debug!("creating node with labels={flat_labels:?}");
2557        let created = self.ctx.storage.create_node(flat_labels, properties);
2558
2559        if let Some(var_id) = var {
2560            row.insert(var_id, LoraValue::Node(created.id));
2561        }
2562
2563        Ok(created.id)
2564    }
2565
2566    fn materialize_relationship_pattern(
2567        &mut self,
2568        row: &mut Row,
2569        left_node_id: u64,
2570        right_node_id: u64,
2571        rel: &lora_analyzer::ResolvedRel,
2572    ) -> ExecResult<u64> {
2573        if let Some(var_id) = rel.var {
2574            if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2575                let id = *id;
2576                if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2577                    let endpoints_match = match rel.direction {
2578                        Direction::Right | Direction::Undirected => {
2579                            src == left_node_id && dst == right_node_id
2580                        }
2581                        Direction::Left => src == right_node_id && dst == left_node_id,
2582                    };
2583
2584                    if endpoints_match {
2585                        return Ok(id);
2586                    }
2587                }
2588            }
2589        }
2590
2591        if rel.range.is_some() {
2592            return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2593        }
2594
2595        let (src, dst) = match rel.direction {
2596            Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2597            Direction::Left => (right_node_id, left_node_id),
2598        };
2599
2600        let rel_type = rel
2601            .types
2602            .first()
2603            .ok_or(ExecutorError::MissingRelationshipType)?;
2604
2605        if rel_type.is_empty() {
2606            return Err(ExecutorError::MissingRelationshipType);
2607        }
2608
2609        let properties = match rel.properties.as_ref() {
2610            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2611            None => Properties::new(),
2612        };
2613
2614        debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2615
2616        let created = self
2617            .ctx
2618            .storage
2619            .create_relationship(src, dst, rel_type, properties)
2620            .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2621                src,
2622                dst,
2623                rel_type: rel_type.clone(),
2624            })?;
2625
2626        if let Some(var_id) = rel.var {
2627            row.insert(var_id, LoraValue::Relationship(created.id));
2628        }
2629
2630        Ok(created.id)
2631    }
2632}
2633
2634/// Lightweight target for SET property-mutation paths. Lets the SET logic
2635/// borrow the row entry (just pulling out the id) instead of cloning the
2636/// whole `LoraValue`.
2637#[derive(Clone, Copy)]
2638enum EntityTarget {
2639    Node(NodeId),
2640    Relationship(u64),
2641}
2642
2643fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2644    match value {
2645        LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2646        LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2647        other => Err(ExecutorError::InvalidSetTarget {
2648            found: value_kind(other),
2649        }),
2650    }
2651}
2652
2653/// Dedup rows that share the same schema (same VarId set). Compares rows by
2654/// a Vec<GroupValueKey> keyed on VarId iteration order — avoids the per-row
2655/// column-name String clones of `dedup_rows`. Used by DISTINCT projection.
2656pub(crate) fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2657    let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2658    let mut out = Vec::new();
2659
2660    for row in rows {
2661        let key: Vec<GroupValueKey> = row
2662            .iter()
2663            .map(|(_, val)| GroupValueKey::from_value(val))
2664            .collect();
2665        if seen.insert(key) {
2666            out.push(row);
2667        }
2668    }
2669
2670    out
2671}
2672
2673/// Dedup rows using named entries so rows with different VarIds but the same
2674/// column name + value are collapsed. Needed for UNION where each branch has
2675/// its own VarIds.
2676pub(crate) fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2677    let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2678    let mut out = Vec::new();
2679
2680    for row in rows {
2681        let key: Vec<(String, GroupValueKey)> = row
2682            .iter_named()
2683            .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2684            .collect();
2685        if seen.insert(key) {
2686            out.push(row);
2687        }
2688    }
2689
2690    out
2691}
2692
2693fn eval_properties_expr<S: GraphStorage>(
2694    expr: &ResolvedExpr,
2695    row: &Row,
2696    storage: &S,
2697    params: &std::collections::BTreeMap<String, LoraValue>,
2698) -> ExecResult<Properties> {
2699    let eval_ctx = EvalContext { storage, params };
2700
2701    match eval_expr(expr, row, &eval_ctx) {
2702        LoraValue::Map(map) => {
2703            let mut out = Properties::new();
2704            for (k, v) in map {
2705                let prop = lora_value_to_property(v)
2706                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2707                out.insert(k, prop);
2708            }
2709            Ok(out)
2710        }
2711        other => Err(ExecutorError::ExpectedPropertyMap {
2712            found: value_kind(&other),
2713        }),
2714    }
2715}
2716
2717pub(crate) fn compute_aggregate_expr<S: GraphStorage>(
2718    expr: &ResolvedExpr,
2719    rows: &[Row],
2720    eval_ctx: &EvalContext<'_, S>,
2721) -> LoraValue {
2722    match expr {
2723        ResolvedExpr::Function {
2724            name,
2725            distinct,
2726            args,
2727        } => {
2728            let func = name.to_ascii_lowercase();
2729
2730            match func.as_str() {
2731                "count" => {
2732                    if args.is_empty() {
2733                        return LoraValue::Int(rows.len() as i64);
2734                    }
2735
2736                    let mut values = rows
2737                        .iter()
2738                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2739                        .filter(|v| !matches!(v, LoraValue::Null))
2740                        .collect::<Vec<_>>();
2741
2742                    if *distinct {
2743                        values = dedup_values(values);
2744                    }
2745
2746                    LoraValue::Int(values.len() as i64)
2747                }
2748
2749                "collect" => {
2750                    if args.is_empty() {
2751                        return LoraValue::List(Vec::new());
2752                    }
2753
2754                    let mut values = rows
2755                        .iter()
2756                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2757                        .collect::<Vec<_>>();
2758
2759                    if *distinct {
2760                        values = dedup_values(values);
2761                    }
2762
2763                    LoraValue::List(values)
2764                }
2765
2766                "sum" => {
2767                    if args.is_empty() {
2768                        return LoraValue::Null;
2769                    }
2770
2771                    let mut values = rows
2772                        .iter()
2773                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2774                        .collect::<Vec<_>>();
2775
2776                    if *distinct {
2777                        values = dedup_values(values);
2778                    }
2779
2780                    let nums = values
2781                        .into_iter()
2782                        .filter_map(as_f64_lossy)
2783                        .collect::<Vec<_>>();
2784
2785                    if nums.is_empty() {
2786                        LoraValue::Null
2787                    } else if nums.iter().all(|n| n.fract() == 0.0) {
2788                        LoraValue::Int(nums.iter().sum::<f64>() as i64)
2789                    } else {
2790                        LoraValue::Float(nums.iter().sum::<f64>())
2791                    }
2792                }
2793
2794                "avg" => {
2795                    if args.is_empty() {
2796                        return LoraValue::Null;
2797                    }
2798
2799                    let mut values = rows
2800                        .iter()
2801                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2802                        .collect::<Vec<_>>();
2803
2804                    if *distinct {
2805                        values = dedup_values(values);
2806                    }
2807
2808                    let nums = values
2809                        .into_iter()
2810                        .filter_map(as_f64_lossy)
2811                        .collect::<Vec<_>>();
2812
2813                    if nums.is_empty() {
2814                        LoraValue::Null
2815                    } else {
2816                        LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2817                    }
2818                }
2819
2820                "min" => {
2821                    if args.is_empty() {
2822                        return LoraValue::Null;
2823                    }
2824
2825                    let mut values = rows
2826                        .iter()
2827                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2828                        .filter(|v| !matches!(v, LoraValue::Null))
2829                        .collect::<Vec<_>>();
2830
2831                    if *distinct {
2832                        values = dedup_values(values);
2833                    }
2834
2835                    values
2836                        .into_iter()
2837                        .min_by(compare_values_total)
2838                        .unwrap_or(LoraValue::Null)
2839                }
2840
2841                "max" => {
2842                    if args.is_empty() {
2843                        return LoraValue::Null;
2844                    }
2845
2846                    let mut values = rows
2847                        .iter()
2848                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2849                        .filter(|v| !matches!(v, LoraValue::Null))
2850                        .collect::<Vec<_>>();
2851
2852                    if *distinct {
2853                        values = dedup_values(values);
2854                    }
2855
2856                    values
2857                        .into_iter()
2858                        .max_by(compare_values_total)
2859                        .unwrap_or(LoraValue::Null)
2860                }
2861
2862                "stdev" | "stdevp" => {
2863                    if args.is_empty() {
2864                        return LoraValue::Null;
2865                    }
2866
2867                    let nums: Vec<f64> = rows
2868                        .iter()
2869                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2870                        .filter_map(as_f64_lossy)
2871                        .collect();
2872
2873                    let is_population = func == "stdevp";
2874
2875                    if nums.is_empty() || (!is_population && nums.len() < 2) {
2876                        return LoraValue::Float(0.0);
2877                    }
2878
2879                    let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2880                    let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2881                    let denom = if is_population {
2882                        nums.len() as f64
2883                    } else {
2884                        (nums.len() - 1) as f64
2885                    };
2886                    LoraValue::Float((variance_sum / denom).sqrt())
2887                }
2888
2889                "percentilecont" => {
2890                    if args.len() < 2 {
2891                        return LoraValue::Null;
2892                    }
2893
2894                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2895                        .as_f64()
2896                        .unwrap_or(0.5);
2897                    let mut nums: Vec<f64> = rows
2898                        .iter()
2899                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2900                        .filter_map(as_f64_lossy)
2901                        .collect();
2902
2903                    if nums.is_empty() {
2904                        return LoraValue::Null;
2905                    }
2906
2907                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2908
2909                    let index = percentile * (nums.len() - 1) as f64;
2910                    let lower = index.floor() as usize;
2911                    let upper = index.ceil() as usize;
2912                    let fraction = index - lower as f64;
2913
2914                    if lower == upper || upper >= nums.len() {
2915                        LoraValue::Float(nums[lower])
2916                    } else {
2917                        LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2918                    }
2919                }
2920
2921                "percentiledisc" => {
2922                    if args.len() < 2 {
2923                        return LoraValue::Null;
2924                    }
2925
2926                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2927                        .as_f64()
2928                        .unwrap_or(0.5);
2929                    let mut nums: Vec<f64> = rows
2930                        .iter()
2931                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2932                        .filter_map(as_f64_lossy)
2933                        .collect();
2934
2935                    if nums.is_empty() {
2936                        return LoraValue::Null;
2937                    }
2938
2939                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2940
2941                    let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2942                    let index = index.min(nums.len() - 1);
2943                    LoraValue::Float(nums[index])
2944                }
2945
2946                _ => rows
2947                    .first()
2948                    .map(|r| eval_expr(expr, r, eval_ctx))
2949                    .unwrap_or(LoraValue::Null),
2950            }
2951        }
2952
2953        _ => rows
2954            .first()
2955            .map(|r| eval_expr(expr, r, eval_ctx))
2956            .unwrap_or(LoraValue::Null),
2957    }
2958}
2959
2960pub(crate) fn compare_sort_item<S: GraphStorage>(
2961    item: &ResolvedSortItem,
2962    a: &Row,
2963    b: &Row,
2964    eval_ctx: &EvalContext<'_, S>,
2965) -> Ordering {
2966    let av = eval_expr(&item.expr, a, eval_ctx);
2967    let bv = eval_expr(&item.expr, b, eval_ctx);
2968
2969    let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2970    compare_values_for_sort(&av, &bv, ascending)
2971}
2972
2973fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2974    let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2975    let mut out = Vec::new();
2976
2977    for value in values {
2978        let key = GroupValueKey::from_value(&value);
2979        if seen.insert(key) {
2980            out.push(value);
2981        }
2982    }
2983
2984    out
2985}
2986
2987fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2988    match v {
2989        LoraValue::Int(i) => Some(i as f64),
2990        LoraValue::Float(f) => Some(f),
2991        _ => None,
2992    }
2993}
2994
2995fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2996    let ord = match (a, b) {
2997        (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2998        (LoraValue::Null, _) => Ordering::Greater,
2999        (_, LoraValue::Null) => Ordering::Less,
3000        _ => compare_values_total(a, b),
3001    };
3002
3003    if ascending {
3004        ord
3005    } else {
3006        ord.reverse()
3007    }
3008}
3009
3010fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
3011    use LoraValue::*;
3012
3013    match (a, b) {
3014        (Bool(x), Bool(y)) => x.cmp(y),
3015        (Int(x), Int(y)) => x.cmp(y),
3016        (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
3017        (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
3018        (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
3019        (String(x), String(y)) => x.cmp(y),
3020        (Node(x), Node(y)) => x.cmp(y),
3021        (Relationship(x), Relationship(y)) => x.cmp(y),
3022        (Date(x), Date(y)) => x.cmp(y),
3023        (DateTime(x), DateTime(y)) => x.cmp(y),
3024        (Duration(x), Duration(y)) => x.cmp(y),
3025        (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
3026        _ => type_rank(a)
3027            .cmp(&type_rank(b))
3028            .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
3029    }
3030}
3031
3032pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
3033    match (expected, actual) {
3034        (LoraValue::Null, PropertyValue::Null) => true,
3035        (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
3036        (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
3037        (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
3038        (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
3039        (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
3040        (LoraValue::String(a), PropertyValue::String(b)) => a == b,
3041
3042        (LoraValue::List(xs), PropertyValue::List(ys)) => {
3043            xs.len() == ys.len()
3044                && xs
3045                    .iter()
3046                    .zip(ys.iter())
3047                    .all(|(x, y)| value_matches_property_value(x, y))
3048        }
3049
3050        (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
3051            ym.get(k)
3052                .map(|yv| value_matches_property_value(xv, yv))
3053                .unwrap_or(false)
3054        }),
3055
3056        (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
3057        (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
3058        (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
3059        (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
3060        (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
3061        (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
3062        (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
3063        (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
3064
3065        _ => false,
3066    }
3067}
3068
3069pub(crate) fn node_matches_property_filter<S: GraphStorage>(
3070    storage: &S,
3071    node_id: NodeId,
3072    labels: &[Vec<String>],
3073    key: &str,
3074    expected: &LoraValue,
3075) -> bool {
3076    storage
3077        .with_node(node_id, |node| {
3078            node_matches_label_groups(&node.labels, labels)
3079                && node
3080                    .properties
3081                    .get(key)
3082                    .map(|actual| value_matches_property_value(expected, actual))
3083                    .unwrap_or(false)
3084        })
3085        .unwrap_or(false)
3086}
3087
3088fn single_label_hint(labels: &[Vec<String>]) -> Option<&str> {
3089    if labels.len() == 1 && labels[0].len() == 1 {
3090        Some(labels[0][0].as_str())
3091    } else {
3092        None
3093    }
3094}
3095
3096fn property_lookup_values(expected: &LoraValue) -> Option<Vec<PropertyValue>> {
3097    let property = lora_value_to_property(expected.clone()).ok()?;
3098    let mut values = vec![property.clone()];
3099
3100    match property {
3101        PropertyValue::Int(i) => {
3102            values.push(PropertyValue::Float(i as f64));
3103        }
3104        PropertyValue::Float(f)
3105            if f.is_finite()
3106                && f.fract() == 0.0
3107                && f >= i64::MIN as f64
3108                && f <= i64::MAX as f64 =>
3109        {
3110            values.push(PropertyValue::Int(f as i64));
3111        }
3112        _ => {}
3113    }
3114
3115    Some(values)
3116}
3117
3118pub(crate) struct NodePropertyCandidates {
3119    pub(crate) ids: Vec<NodeId>,
3120    pub(crate) prefiltered: bool,
3121}
3122
3123pub(crate) fn indexed_node_property_candidates<S: GraphStorage>(
3124    storage: &S,
3125    labels: &[Vec<String>],
3126    key: &str,
3127    expected: &LoraValue,
3128) -> NodePropertyCandidates {
3129    let Some(values) = property_lookup_values(expected) else {
3130        return NodePropertyCandidates {
3131            ids: scan_node_ids_for_label_groups(storage, labels),
3132            prefiltered: false,
3133        };
3134    };
3135
3136    let label_hint = single_label_hint(labels);
3137    let mut seen = BTreeSet::new();
3138    let mut out = Vec::new();
3139    for value in values {
3140        for id in storage.find_node_ids_by_property(label_hint, key, &value) {
3141            if seen.insert(id) {
3142                out.push(id);
3143            }
3144        }
3145    }
3146    NodePropertyCandidates {
3147        ids: out,
3148        prefiltered: labels.is_empty() || label_hint.is_some(),
3149    }
3150}
3151
3152/// Build a LoraPath from the node and relationship variables currently in a row.
3153///
3154/// For variable-length relationships (stored as a List of Relationship values),
3155/// intermediate nodes are reconstructed from the storage by walking the
3156/// relationship chain.
3157pub(crate) fn build_path_value<S: GraphStorage>(
3158    row: &Row,
3159    node_vars: &[VarId],
3160    rel_vars: &[VarId],
3161    storage: &S,
3162) -> LoraValue {
3163    let mut raw_nodes = Vec::new();
3164    let mut rels = Vec::new();
3165    let mut has_var_len = false;
3166
3167    for &nv in node_vars {
3168        match row.get(nv) {
3169            Some(LoraValue::Node(id)) => raw_nodes.push(*id),
3170            Some(LoraValue::List(items)) => {
3171                for item in items {
3172                    if let LoraValue::Node(id) = item {
3173                        raw_nodes.push(*id);
3174                    }
3175                }
3176            }
3177            _ => {}
3178        }
3179    }
3180
3181    for &rv in rel_vars {
3182        match row.get(rv) {
3183            Some(LoraValue::Relationship(id)) => rels.push(*id),
3184            Some(LoraValue::List(items)) => {
3185                has_var_len = true;
3186                for item in items {
3187                    if let LoraValue::Relationship(id) = item {
3188                        rels.push(*id);
3189                    }
3190                }
3191            }
3192            _ => {}
3193        }
3194    }
3195
3196    // For variable-length paths, reconstruct the full node sequence from the
3197    // relationship chain. raw_nodes typically only has [start, end] but the
3198    // path needs all intermediate nodes as well.
3199    let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
3200        let start = raw_nodes[0];
3201        let mut ordered = Vec::with_capacity(rels.len() + 1);
3202        ordered.push(start);
3203        let mut current = start;
3204        for &rel_id in &rels {
3205            if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
3206                let next = if src == current { dst } else { src };
3207                ordered.push(next);
3208                current = next;
3209            }
3210        }
3211        ordered
3212    } else {
3213        raw_nodes
3214    };
3215
3216    LoraValue::Path(LoraPath { nodes, rels })
3217}
3218
3219fn type_rank(v: &LoraValue) -> u8 {
3220    match v {
3221        LoraValue::Null => 0,
3222        LoraValue::Bool(_) => 1,
3223        LoraValue::Int(_) | LoraValue::Float(_) => 2,
3224        LoraValue::String(_) => 3,
3225        LoraValue::Date(_) => 4,
3226        LoraValue::DateTime(_) => 5,
3227        LoraValue::LocalDateTime(_) => 6,
3228        LoraValue::Time(_) => 7,
3229        LoraValue::LocalTime(_) => 8,
3230        LoraValue::Duration(_) => 9,
3231        LoraValue::Point(_) => 10,
3232        LoraValue::Vector(_) => 11,
3233        LoraValue::List(_) => 12,
3234        LoraValue::Map(_) => 13,
3235        LoraValue::Node(_) => 14,
3236        LoraValue::Relationship(_) => 15,
3237        LoraValue::Path(_) => 16,
3238    }
3239}
3240
3241/// Check whether a node's labels satisfy all label groups.
3242/// Each group is a disjunction (OR): the node must have at least one label
3243/// from the group.  Groups are conjunctive (AND): all groups must be satisfied.
3244pub(crate) fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
3245    groups
3246        .iter()
3247        .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
3248}
3249
3250/// Scan the graph for candidate node IDs matching the label groups. Uses the
3251/// label index for the pick-first-label phase and avoids cloning NodeRecords.
3252pub(crate) fn scan_node_ids_for_label_groups<S: GraphStorage>(
3253    storage: &S,
3254    groups: &[Vec<String>],
3255) -> Vec<lora_store::NodeId> {
3256    if groups.is_empty() {
3257        storage.all_node_ids()
3258    } else if groups.len() == 1 && groups[0].len() == 1 {
3259        storage.node_ids_by_label(&groups[0][0])
3260    } else if groups.len() == 1 && groups[0].len() > 1 {
3261        let mut seen = std::collections::BTreeSet::new();
3262        let mut out = Vec::new();
3263        for label in &groups[0] {
3264            for id in storage.node_ids_by_label(label) {
3265                if seen.insert(id) {
3266                    out.push(id);
3267                }
3268            }
3269        }
3270        out
3271    } else {
3272        storage.node_ids_by_label(&groups[0][0])
3273    }
3274}
3275
3276pub(crate) fn label_group_candidates_prefiltered(groups: &[Vec<String>]) -> bool {
3277    groups.len() <= 1
3278}
3279
3280pub(crate) fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
3281    let mut map = BTreeMap::new();
3282    map.insert("kind".to_string(), LoraValue::String("node".to_string()));
3283    map.insert("id".to_string(), LoraValue::Int(node.id as i64));
3284    map.insert(
3285        "labels".to_string(),
3286        LoraValue::List(
3287            node.labels
3288                .iter()
3289                .map(|s| LoraValue::String(s.clone()))
3290                .collect(),
3291        ),
3292    );
3293    map.insert(
3294        "properties".to_string(),
3295        properties_to_value_map(&node.properties),
3296    );
3297    LoraValue::Map(map)
3298}
3299
3300pub(crate) fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
3301    let mut map = BTreeMap::new();
3302    map.insert(
3303        "kind".to_string(),
3304        LoraValue::String("relationship".to_string()),
3305    );
3306    map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
3307    map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
3308    map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
3309    map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
3310    map.insert(
3311        "properties".to_string(),
3312        properties_to_value_map(&rel.properties),
3313    );
3314    LoraValue::Map(map)
3315}
3316
3317/// Flatten label groups into a simple Vec<String> (for CREATE/MERGE where
3318/// disjunction doesn't apply — all labels are created).
3319fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
3320    groups.iter().flat_map(|g| g.iter().cloned()).collect()
3321}
3322
3323#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3324pub(crate) enum GroupValueKey {
3325    Null,
3326    Bool(bool),
3327    Int(i64),
3328    Float(String),
3329    String(String),
3330    List(Vec<GroupValueKey>),
3331    Map(Vec<(String, GroupValueKey)>),
3332    Node(u64),
3333    Relationship(u64),
3334}
3335
3336impl GroupValueKey {
3337    pub(crate) fn from_value(v: &LoraValue) -> Self {
3338        match v {
3339            LoraValue::Null => Self::Null,
3340            LoraValue::Bool(x) => Self::Bool(*x),
3341            LoraValue::Int(x) => Self::Int(*x),
3342            LoraValue::Float(x) => Self::Float(x.to_string()),
3343            LoraValue::String(x) => Self::String(x.clone()),
3344            LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
3345            LoraValue::Map(m) => Self::Map(
3346                m.iter()
3347                    .map(|(k, v)| (k.clone(), Self::from_value(v)))
3348                    .collect(),
3349            ),
3350            LoraValue::Node(id) => Self::Node(*id),
3351            LoraValue::Relationship(id) => Self::Relationship(*id),
3352            LoraValue::Path(_) => Self::Null,
3353            // Temporal types: use their string representation as group key
3354            LoraValue::Date(d) => Self::String(d.to_string()),
3355            LoraValue::DateTime(dt) => Self::String(dt.to_string()),
3356            LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
3357            LoraValue::Time(t) => Self::String(t.to_string()),
3358            LoraValue::LocalTime(t) => Self::String(t.to_string()),
3359            LoraValue::Duration(dur) => Self::String(dur.to_string()),
3360            LoraValue::Point(p) => Self::String(p.to_string()),
3361            LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
3362        }
3363    }
3364}
3365
3366/// Compute effective (min_hops, max_hops) from a `RangeLiteral`.
3367///
3368/// Lora semantics:
3369/// - `*`       → 1..∞   (start=None, end=None)
3370/// - `*2..5`   → 2..5   (start=Some(2), end=Some(5))
3371/// - `*..3`    → 1..3   (start=None, end=Some(3))
3372/// - `*2..`    → 2..∞   (start=Some(2), end=None)
3373/// - `*3`      → 3..3   (start=Some(3), end=None, no dots → exactly 3)
3374/// - `*0..1`   → 0..1
3375///
3376/// For unbounded upper, we cap at `MAX_VAR_LEN_HOPS` to prevent runaway.
3377const MAX_VAR_LEN_HOPS: u64 = 100;
3378
3379pub(crate) fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
3380    let min_hops = range.start.unwrap_or(1);
3381    let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
3382    (min_hops, max_hops)
3383}
3384
3385/// An entry produced during BFS variable-length expansion.
3386pub(crate) struct VarLenResult {
3387    /// The destination node at the end of this path.
3388    pub(crate) dst_node_id: NodeId,
3389    /// The relationship IDs traversed (in order).
3390    pub(crate) rel_ids: Vec<u64>,
3391}
3392
3393/// Perform variable-length expansion from `start_node_id` following
3394/// relationships of the given `types` and `direction`, collecting all
3395/// reachable nodes at hop distances in `[min_hops, max_hops]`.
3396///
3397/// Uses BFS with relationship-uniqueness per path (each path does not
3398/// reuse the same relationship, but may revisit nodes).
3399pub(crate) fn variable_length_expand<S: GraphStorage>(
3400    storage: &S,
3401    start_node_id: NodeId,
3402    direction: Direction,
3403    types: &[String],
3404    min_hops: u64,
3405    max_hops: u64,
3406) -> Vec<VarLenResult> {
3407    let mut results = Vec::new();
3408
3409    // Each frontier entry: (current_node_id, relationships_used_so_far)
3410    let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
3411
3412    for depth in 1..=max_hops {
3413        // On the final hop we don't need to build next_frontier at all; every
3414        // path gets recorded and then the loop terminates. Avoids one full
3415        // pass of Vec clones on deep traversals.
3416        let is_last_hop = depth == max_hops;
3417        let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
3418
3419        for (current_node, rels_used) in &frontier {
3420            // ID-only expand avoids cloning full records/properties for every
3421            // neighbour on every hop.
3422            for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
3423                // Relationship-uniqueness: skip if this relationship was already
3424                // traversed on this particular path.
3425                if rels_used.contains(&rel_id) {
3426                    continue;
3427                }
3428
3429                if is_last_hop {
3430                    // Terminal hop: just record the result. Allocate rel_ids
3431                    // once (no duplicate clone) by extending a fresh copy.
3432                    if depth >= min_hops {
3433                        let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
3434                        rel_ids.extend_from_slice(rels_used);
3435                        rel_ids.push(rel_id);
3436                        results.push(VarLenResult {
3437                            dst_node_id: neighbor_id,
3438                            rel_ids,
3439                        });
3440                    }
3441                    continue;
3442                }
3443
3444                let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
3445                new_rels.extend_from_slice(rels_used);
3446                new_rels.push(rel_id);
3447
3448                if depth >= min_hops {
3449                    results.push(VarLenResult {
3450                        dst_node_id: neighbor_id,
3451                        rel_ids: new_rels.clone(),
3452                    });
3453                }
3454
3455                next_frontier.push((neighbor_id, new_rels));
3456            }
3457        }
3458
3459        if is_last_hop || next_frontier.is_empty() {
3460            break;
3461        }
3462
3463        frontier = next_frontier;
3464    }
3465
3466    // Handle min_hops == 0: include the start node itself at depth 0.
3467    if min_hops == 0 {
3468        results.insert(
3469            0,
3470            VarLenResult {
3471                dst_node_id: start_node_id,
3472                rel_ids: Vec::new(),
3473            },
3474        );
3475    }
3476
3477    results
3478}
3479
3480/// Filter rows to keep only shortest paths.
3481/// `all` = false → keep one shortest path; `all` = true → keep all shortest.
3482pub(crate) fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
3483    if rows.is_empty() {
3484        return rows;
3485    }
3486
3487    // Compute path length for each row
3488    let lengths: Vec<usize> = rows
3489        .iter()
3490        .map(|row| match row.get(path_var) {
3491            Some(LoraValue::Path(p)) => p.rels.len(),
3492            _ => usize::MAX,
3493        })
3494        .collect();
3495
3496    let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
3497
3498    let mut result: Vec<Row> = rows
3499        .into_iter()
3500        .zip(lengths.iter())
3501        .filter(|(_, len)| **len == min_len)
3502        .map(|(row, _)| row)
3503        .collect();
3504
3505    if !all && result.len() > 1 {
3506        result.truncate(1);
3507    }
3508
3509    result
3510}