Skip to main content

lora_executor/
executor.rs

1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7    symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8    ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use tracing::{debug, error, trace};
18
19pub struct ExecutionContext<'a, S: GraphStorage + ?Sized> {
20    pub storage: &'a S,
21    pub params: std::collections::BTreeMap<String, LoraValue>,
22}
23
24pub struct Executor<'a, S: GraphStorage + ?Sized> {
25    ctx: ExecutionContext<'a, S>,
26}
27
28impl<'a, S: GraphStorage + ?Sized> Executor<'a, S> {
29    pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
30        Self { ctx }
31    }
32
33    pub fn execute(
34        &self,
35        plan: &PhysicalPlan,
36        options: Option<ExecuteOptions>,
37    ) -> ExecResult<QueryResult> {
38        // Clear any error residue that a previous query on this thread may have
39        // left in the thread-local eval-error slot.
40        let _ = take_eval_error();
41
42        let rows = self.execute_node(plan, plan.root)?;
43        let rows = rows
44            .into_iter()
45            .map(|row| self.hydrate_row(row))
46            .collect::<Vec<_>>();
47        Ok(project_rows(rows, options.unwrap_or_default()))
48    }
49
50    fn hydrate_row(&self, row: Row) -> Row {
51        let mut out = Row::new();
52
53        for (var, name, value) in row.into_iter_named() {
54            out.insert_named(var, name, self.hydrate_value(value));
55        }
56
57        out
58    }
59
60    fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
61        trace!("read-only execute_node start: node_id={node_id:?}");
62
63        let result = match &plan.nodes[node_id] {
64            PhysicalOp::Argument(op) => self.exec_argument(op),
65            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
66            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
67            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
68            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
69            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
70            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
71            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
72            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
73            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
74            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
75            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
76            PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
77            PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
78            PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
79            PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
80            PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
81        };
82
83        match &result {
84            Ok(rows) => trace!(
85                "read-only execute_node ok: node_id={node_id:?}, rows={}",
86                rows.len()
87            ),
88            Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
89        }
90
91        result
92    }
93
94    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
95        Ok(vec![Row::new()])
96    }
97
98    fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
99        let base_rows = match op.input {
100            Some(input) => self.execute_node(plan, input)?,
101            None => vec![Row::new()],
102        };
103
104        let node_ids = self.ctx.storage.all_node_ids();
105        let mut out = Vec::new();
106
107        for row in base_rows {
108            if let Some(existing) = row.get(op.var) {
109                match existing {
110                    LoraValue::Node(existing_id) => {
111                        if self.ctx.storage.has_node(*existing_id) {
112                            out.push(row);
113                        }
114                    }
115                    other => {
116                        return Err(ExecutorError::ExpectedNodeForExpand {
117                            var: format!("{:?}", op.var),
118                            found: value_kind(other),
119                        });
120                    }
121                }
122                continue;
123            }
124
125            for &id in &node_ids {
126                let mut new_row = row.clone();
127                new_row.insert(op.var, LoraValue::Node(id));
128                out.push(new_row);
129            }
130        }
131
132        Ok(out)
133    }
134
135    fn exec_node_by_label_scan(
136        &self,
137        plan: &PhysicalPlan,
138        op: &NodeByLabelScanExec,
139    ) -> ExecResult<Vec<Row>> {
140        let base_rows = match op.input {
141            Some(input) => self.execute_node(plan, input)?,
142            None => vec![Row::new()],
143        };
144
145        let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
146        let mut out = Vec::new();
147
148        for row in base_rows {
149            if let Some(existing) = row.get(op.var) {
150                match existing {
151                    LoraValue::Node(existing_id) => {
152                        let labels_ok = self
153                            .ctx
154                            .storage
155                            .node_ref(*existing_id)
156                            .map(|n| node_matches_label_groups(&n.labels, &op.labels))
157                            .unwrap_or(false);
158                        if labels_ok {
159                            out.push(row);
160                        }
161                    }
162                    other => {
163                        return Err(ExecutorError::ExpectedNodeForExpand {
164                            var: format!("{:?}", op.var),
165                            found: value_kind(other),
166                        });
167                    }
168                }
169                continue;
170            }
171
172            for &id in &candidate_ids {
173                let labels_ok = self
174                    .ctx
175                    .storage
176                    .node_ref(id)
177                    .map(|n| node_matches_label_groups(&n.labels, &op.labels))
178                    .unwrap_or(false);
179                if !labels_ok {
180                    continue;
181                }
182                let mut new_row = row.clone();
183                new_row.insert(op.var, LoraValue::Node(id));
184                out.push(new_row);
185            }
186        }
187
188        Ok(out)
189    }
190
191    fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
192        // Variable-length expansion: delegate to iterative expander.
193        if let Some(range) = &op.range {
194            return self.exec_expand_var_len(plan, op, range);
195        }
196
197        let input_rows = self.execute_node(plan, op.input)?;
198        let mut out = Vec::new();
199
200        for row in input_rows {
201            let src_node_id = match row.get(op.src) {
202                Some(LoraValue::Node(id)) => *id,
203                Some(other) => {
204                    return Err(ExecutorError::ExpectedNodeForExpand {
205                        var: format!("{:?}", op.src),
206                        found: value_kind(other),
207                    });
208                }
209                None => continue,
210            };
211
212            for (rel_id, dst_id) in
213                self.ctx
214                    .storage
215                    .expand_ids(src_node_id, op.direction, &op.types)
216            {
217                if let Some(expr) = op.rel_properties.as_ref() {
218                    let matches = match self.ctx.storage.relationship_ref(rel_id) {
219                        Some(rel) => {
220                            self.relationship_matches_properties(&rel.properties, Some(expr), &row)?
221                        }
222                        None => false,
223                    };
224                    if !matches {
225                        continue;
226                    }
227                }
228
229                if let Some(existing_dst) = row.get(op.dst) {
230                    match existing_dst {
231                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
232                        LoraValue::Node(_) => continue,
233                        other => {
234                            return Err(ExecutorError::ExpectedNodeForExpand {
235                                var: format!("{:?}", op.dst),
236                                found: value_kind(other),
237                            });
238                        }
239                    }
240                }
241
242                if let Some(rel_var) = op.rel {
243                    if let Some(existing_rel) = row.get(rel_var) {
244                        match existing_rel {
245                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
246                            LoraValue::Relationship(_) => continue,
247                            other => {
248                                return Err(ExecutorError::ExpectedRelationshipForExpand {
249                                    var: format!("{:?}", rel_var),
250                                    found: value_kind(other),
251                                });
252                            }
253                        }
254                    }
255                }
256
257                let mut new_row = row.clone();
258
259                if !new_row.contains_key(op.dst) {
260                    new_row.insert(op.dst, LoraValue::Node(dst_id));
261                }
262
263                if let Some(rel_var) = op.rel {
264                    if !new_row.contains_key(rel_var) {
265                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
266                    }
267                }
268
269                out.push(new_row);
270            }
271        }
272
273        Ok(out)
274    }
275
276    fn exec_expand_var_len(
277        &self,
278        plan: &PhysicalPlan,
279        op: &ExpandExec,
280        range: &RangeLiteral,
281    ) -> ExecResult<Vec<Row>> {
282        let input_rows = self.execute_node(plan, op.input)?;
283        let (min_hops, max_hops) = resolve_range(range);
284        let mut out = Vec::new();
285
286        for row in input_rows {
287            let src_node_id = match row.get(op.src) {
288                Some(LoraValue::Node(id)) => *id,
289                Some(other) => {
290                    return Err(ExecutorError::ExpectedNodeForExpand {
291                        var: format!("{:?}", op.src),
292                        found: value_kind(other),
293                    });
294                }
295                None => continue,
296            };
297
298            let expansions = variable_length_expand(
299                self.ctx.storage,
300                src_node_id,
301                op.direction,
302                &op.types,
303                min_hops,
304                max_hops,
305            );
306
307            for result in expansions {
308                let mut new_row = row.clone();
309                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
310
311                // For variable-length patterns, bind the relationship variable
312                // to a list of relationship IDs traversed.
313                if let Some(rel_var) = op.rel {
314                    // Consume rel_ids — it's owned and no longer needed after this.
315                    let rel_list = LoraValue::List(
316                        result
317                            .rel_ids
318                            .into_iter()
319                            .map(LoraValue::Relationship)
320                            .collect(),
321                    );
322                    new_row.insert(rel_var, rel_list);
323                }
324
325                out.push(new_row);
326            }
327        }
328
329        Ok(out)
330    }
331
332    fn relationship_matches_properties(
333        &self,
334        actual: &Properties,
335        expected_expr: Option<&ResolvedExpr>,
336        row: &Row,
337    ) -> ExecResult<bool> {
338        let Some(expr) = expected_expr else {
339            return Ok(true);
340        };
341
342        let eval_ctx = EvalContext {
343            storage: self.ctx.storage,
344            params: &self.ctx.params,
345        };
346
347        let expected = eval_expr(expr, row, &eval_ctx);
348
349        let LoraValue::Map(expected_map) = expected else {
350            return Err(ExecutorError::ExpectedPropertyMap {
351                found: value_kind(&expected),
352            });
353        };
354
355        Ok(expected_map.iter().all(|(key, expected_value)| {
356            actual
357                .get(key)
358                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
359                .unwrap_or(false)
360        }))
361    }
362
363    fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
364        let input_rows = self.execute_node(plan, op.input)?;
365        let eval_ctx = EvalContext {
366            storage: self.ctx.storage,
367            params: &self.ctx.params,
368        };
369
370        Ok(input_rows
371            .into_iter()
372            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
373            .collect())
374    }
375
376    fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
377        let input_rows = self.execute_node(plan, op.input)?;
378        let eval_ctx = EvalContext {
379            storage: self.ctx.storage,
380            params: &self.ctx.params,
381        };
382
383        let mut out = Vec::with_capacity(input_rows.len());
384
385        for row in input_rows {
386            // include_existing=true means we carry all upstream columns — move
387            // the row into the projection instead of cloning every value.
388            if op.include_existing {
389                let mut projected = row;
390                for item in &op.items {
391                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
392                    if let Some(err) = take_eval_error() {
393                        return Err(ExecutorError::RuntimeError(err));
394                    }
395                    projected.insert_named(item.output, item.name.clone(), value);
396                }
397                out.push(projected);
398            } else {
399                let mut projected = Row::new();
400                for item in &op.items {
401                    let value = eval_expr(&item.expr, &row, &eval_ctx);
402                    if let Some(err) = take_eval_error() {
403                        return Err(ExecutorError::RuntimeError(err));
404                    }
405                    projected.insert_named(item.output, item.name.clone(), value);
406                }
407                out.push(projected);
408            }
409        }
410
411        Ok(if op.distinct {
412            dedup_rows_by_vars(out)
413        } else {
414            out
415        })
416    }
417
418    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
419        match value {
420            LoraValue::Node(id) => self.hydrate_node(id),
421            LoraValue::Relationship(id) => self.hydrate_relationship(id),
422            LoraValue::List(values) => {
423                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
424            }
425            LoraValue::Map(map) => LoraValue::Map(
426                map.into_iter()
427                    .map(|(k, v)| (k, self.hydrate_value(v)))
428                    .collect(),
429            ),
430            other => other,
431        }
432    }
433
434    fn hydrate_node(&self, id: u64) -> LoraValue {
435        match self.ctx.storage.node_ref(id) {
436            Some(node) => hydrate_node_record(node),
437            None => LoraValue::Null,
438        }
439    }
440
441    fn hydrate_relationship(&self, id: u64) -> LoraValue {
442        match self.ctx.storage.relationship_ref(id) {
443            Some(rel) => hydrate_relationship_record(rel),
444            None => LoraValue::Null,
445        }
446    }
447
448    fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
449        let input_rows = self.execute_node(plan, op.input)?;
450        let eval_ctx = EvalContext {
451            storage: self.ctx.storage,
452            params: &self.ctx.params,
453        };
454
455        let mut out = Vec::new();
456
457        for row in input_rows {
458            match eval_expr(&op.expr, &row, &eval_ctx) {
459                LoraValue::List(values) => {
460                    for value in values {
461                        let mut new_row = row.clone();
462                        new_row.insert(op.alias, value);
463                        out.push(new_row);
464                    }
465                }
466                LoraValue::Null => {}
467                other => {
468                    let mut new_row = row;
469                    new_row.insert(op.alias, other);
470                    out.push(new_row);
471                }
472            }
473        }
474
475        Ok(out)
476    }
477
478    fn exec_hash_aggregation(
479        &self,
480        plan: &PhysicalPlan,
481        op: &HashAggregationExec,
482    ) -> ExecResult<Vec<Row>> {
483        let input_rows = self.execute_node(plan, op.input)?;
484        let eval_ctx = EvalContext {
485            storage: self.ctx.storage,
486            params: &self.ctx.params,
487        };
488
489        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
490
491        if op.group_by.is_empty() {
492            groups.insert(Vec::new(), input_rows);
493        } else {
494            for row in input_rows {
495                let key = op
496                    .group_by
497                    .iter()
498                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
499                    .collect::<Vec<_>>();
500
501                groups.entry(key).or_default().push(row);
502            }
503        }
504
505        let mut out = Vec::new();
506
507        for rows in groups.into_values() {
508            let mut result = Row::new();
509
510            if let Some(first) = rows.first() {
511                for proj in &op.group_by {
512                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
513                    result.insert_named(proj.output, proj.name.clone(), value);
514                }
515            }
516
517            for proj in &op.aggregates {
518                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
519                result.insert_named(proj.output, proj.name.clone(), value);
520            }
521
522            out.push(result);
523        }
524
525        Ok(out)
526    }
527
528    fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
529        let mut rows = self.execute_node(plan, op.input)?;
530        let eval_ctx = EvalContext {
531            storage: self.ctx.storage,
532            params: &self.ctx.params,
533        };
534
535        rows.sort_by(|a, b| {
536            for item in &op.items {
537                let ord = compare_sort_item(item, a, b, &eval_ctx);
538                if ord != Ordering::Equal {
539                    return ord;
540                }
541            }
542            Ordering::Equal
543        });
544
545        Ok(rows)
546    }
547
548    fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
549        let mut rows = self.execute_node(plan, op.input)?;
550        let eval_ctx = EvalContext {
551            storage: self.ctx.storage,
552            params: &self.ctx.params,
553        };
554
555        let limit = op
556            .limit
557            .as_ref()
558            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
559            .unwrap_or(rows.len() as i64)
560            .max(0) as usize;
561
562        let skip = op
563            .skip
564            .as_ref()
565            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
566            .unwrap_or(0)
567            .max(0) as usize;
568
569        if skip >= rows.len() {
570            return Ok(Vec::new());
571        }
572
573        rows.drain(0..skip);
574        rows.truncate(limit);
575        Ok(rows)
576    }
577
578    fn exec_optional_match(
579        &self,
580        plan: &PhysicalPlan,
581        op: &OptionalMatchExec,
582    ) -> ExecResult<Vec<Row>> {
583        let input_rows = self.execute_node(plan, op.input)?;
584
585        // The inner plan is built to start from Argument (an empty row) and is
586        // read-only, so its output does not depend on the upstream input. Execute
587        // it once and reuse the result across every input row, instead of
588        // producing |input_rows| × |inner_rows| allocations.
589        let inner_rows = self.execute_node(plan, op.inner)?;
590
591        let mut out = Vec::new();
592
593        for input_row in input_rows {
594            let mut matched = false;
595
596            for inner_row in &inner_rows {
597                // Each variable already bound in input_row must match.
598                let compatible = input_row
599                    .iter()
600                    .all(|(var, val)| match inner_row.get(*var) {
601                        Some(inner_val) => inner_val == val,
602                        None => true,
603                    });
604                if !compatible {
605                    continue;
606                }
607
608                let mut merged = input_row.clone();
609                for (var, name, val) in inner_row.iter_named() {
610                    if !merged.contains_key(*var) {
611                        merged.insert_named(*var, name.into_owned(), val.clone());
612                    }
613                }
614                out.push(merged);
615                matched = true;
616            }
617
618            if !matched {
619                let mut null_row = input_row;
620                for &var_id in &op.new_vars {
621                    if !null_row.contains_key(var_id) {
622                        null_row.insert(var_id, LoraValue::Null);
623                    }
624                }
625                out.push(null_row);
626            }
627        }
628
629        Ok(out)
630    }
631
632    fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
633        let input_rows = self.execute_node(plan, op.input)?;
634        let mut rows: Vec<Row> = input_rows
635            .into_iter()
636            .map(|mut row| {
637                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
638                row.insert(op.output, path);
639                row
640            })
641            .collect();
642
643        if let Some(all) = op.shortest_path_all {
644            rows = filter_shortest_paths(rows, op.output, all);
645        }
646        Ok(rows)
647    }
648}
649
650fn properties_to_value_map(props: &Properties) -> LoraValue {
651    let mut map = BTreeMap::new();
652    for (k, v) in props.iter() {
653        map.insert(k.clone(), LoraValue::from(v));
654    }
655    LoraValue::Map(map)
656}
657
658pub struct MutableExecutionContext<'a, S: GraphStorageMut + ?Sized> {
659    pub storage: &'a mut S,
660    pub params: std::collections::BTreeMap<String, LoraValue>,
661}
662
663pub struct MutableExecutor<'a, S: GraphStorageMut + ?Sized> {
664    ctx: MutableExecutionContext<'a, S>,
665}
666
667impl<'a, S: GraphStorageMut + ?Sized> MutableExecutor<'a, S> {
668    pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
669        Self { ctx }
670    }
671
672    pub fn execute(
673        &mut self,
674        plan: &PhysicalPlan,
675        options: Option<ExecuteOptions>,
676    ) -> ExecResult<QueryResult> {
677        // Clear any error residue that a previous query on this thread may have
678        // left in the thread-local eval-error slot.
679        let _ = take_eval_error();
680
681        let rows = self.execute_node(plan, plan.root)?;
682        let rows = rows
683            .into_iter()
684            .map(|row| self.hydrate_row(row))
685            .collect::<Vec<_>>();
686        Ok(project_rows(rows, options.unwrap_or_default()))
687    }
688
689    /// Execute a compiled query that may include UNION branches.
690    pub fn execute_compiled(
691        &mut self,
692        compiled: &CompiledQuery,
693        options: Option<ExecuteOptions>,
694    ) -> ExecResult<QueryResult> {
695        if compiled.unions.is_empty() {
696            return self.execute(&compiled.physical, options);
697        }
698
699        let _ = take_eval_error();
700
701        // Execute the head branch.
702        let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
703
704        // Execute each UNION branch and combine.
705        // Track whether any branch uses plain UNION (dedup needed).
706        let mut needs_dedup = false;
707
708        for branch in &compiled.unions {
709            let branch_rows = self.execute_and_hydrate(&branch.physical)?;
710            all_rows.extend(branch_rows);
711
712            if !branch.all {
713                needs_dedup = true;
714            }
715        }
716
717        if needs_dedup {
718            all_rows = dedup_rows(all_rows);
719        }
720
721        Ok(project_rows(all_rows, options.unwrap_or_default()))
722    }
723
724    fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
725        let rows = self.execute_node(plan, plan.root)?;
726        Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
727    }
728
729    fn hydrate_row(&self, row: Row) -> Row {
730        let mut out = Row::new();
731
732        for (var, name, value) in row.into_iter_named() {
733            out.insert_named(var, name, self.hydrate_value(value));
734        }
735
736        out
737    }
738
739    fn execute_node(
740        &mut self,
741        plan: &PhysicalPlan,
742        node_id: PhysicalNodeId,
743    ) -> ExecResult<Vec<Row>> {
744        trace!("mutable execute_node start: node_id={node_id:?}");
745
746        let result = match &plan.nodes[node_id] {
747            PhysicalOp::Argument(op) => self.exec_argument(op),
748            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
749            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
750            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
751            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
752            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
753            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
754            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
755            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
756            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
757            PhysicalOp::Create(op) => self.exec_create(plan, op),
758            PhysicalOp::Merge(op) => self.exec_merge(plan, op),
759            PhysicalOp::Delete(op) => self.exec_delete(plan, op),
760            PhysicalOp::Set(op) => self.exec_set(plan, op),
761            PhysicalOp::Remove(op) => self.exec_remove(plan, op),
762            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
763            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
764        };
765
766        match &result {
767            Ok(rows) => trace!(
768                "mutable execute_node ok: node_id={node_id:?}, rows={}",
769                rows.len()
770            ),
771            Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
772        }
773
774        result
775    }
776
777    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
778        Ok(vec![Row::new()])
779    }
780
781    fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
782        let base_rows = match op.input {
783            Some(input) => self.execute_node(plan, input)?,
784            None => vec![Row::new()],
785        };
786
787        let node_ids = self.ctx.storage.all_node_ids();
788        let mut out = Vec::new();
789
790        for row in base_rows {
791            if let Some(existing) = row.get(op.var) {
792                match existing {
793                    LoraValue::Node(existing_id) => {
794                        if self.ctx.storage.has_node(*existing_id) {
795                            out.push(row);
796                        }
797                    }
798                    other => {
799                        return Err(ExecutorError::ExpectedNodeForExpand {
800                            var: format!("{:?}", op.var),
801                            found: value_kind(other),
802                        });
803                    }
804                }
805                continue;
806            }
807
808            for &id in &node_ids {
809                let mut new_row = row.clone();
810                new_row.insert(op.var, LoraValue::Node(id));
811                out.push(new_row);
812            }
813        }
814
815        Ok(out)
816    }
817
818    fn exec_node_by_label_scan(
819        &mut self,
820        plan: &PhysicalPlan,
821        op: &NodeByLabelScanExec,
822    ) -> ExecResult<Vec<Row>> {
823        let base_rows = match op.input {
824            Some(input) => self.execute_node(plan, input)?,
825            None => vec![Row::new()],
826        };
827
828        let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
829        let mut out = Vec::new();
830
831        for row in base_rows {
832            if let Some(existing) = row.get(op.var) {
833                match existing {
834                    LoraValue::Node(existing_id) => {
835                        let labels_ok = self
836                            .ctx
837                            .storage
838                            .node_ref(*existing_id)
839                            .map(|n| node_matches_label_groups(&n.labels, &op.labels))
840                            .unwrap_or(false);
841                        if labels_ok {
842                            out.push(row);
843                        }
844                    }
845                    other => {
846                        return Err(ExecutorError::ExpectedNodeForExpand {
847                            var: format!("{:?}", op.var),
848                            found: value_kind(other),
849                        });
850                    }
851                }
852                continue;
853            }
854
855            for &id in &candidate_ids {
856                let labels_ok = self
857                    .ctx
858                    .storage
859                    .node_ref(id)
860                    .map(|n| node_matches_label_groups(&n.labels, &op.labels))
861                    .unwrap_or(false);
862                if !labels_ok {
863                    continue;
864                }
865                let mut new_row = row.clone();
866                new_row.insert(op.var, LoraValue::Node(id));
867                out.push(new_row);
868            }
869        }
870
871        Ok(out)
872    }
873
874    fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
875        // Variable-length expansion: delegate to iterative expander.
876        if let Some(range) = &op.range {
877            return self.exec_expand_var_len(plan, op, range);
878        }
879
880        let input_rows = self.execute_node(plan, op.input)?;
881        let mut out = Vec::new();
882
883        for row in input_rows {
884            let src_node_id = match row.get(op.src) {
885                Some(LoraValue::Node(id)) => *id,
886                Some(other) => {
887                    return Err(ExecutorError::ExpectedNodeForExpand {
888                        var: format!("{:?}", op.src),
889                        found: value_kind(other),
890                    });
891                }
892                None => continue,
893            };
894
895            for (rel_id, dst_id) in
896                self.ctx
897                    .storage
898                    .expand_ids(src_node_id, op.direction, &op.types)
899            {
900                if let Some(expr) = op.rel_properties.as_ref() {
901                    let matches = match self.ctx.storage.relationship_ref(rel_id) {
902                        Some(rel) => {
903                            self.relationship_matches_properties(&rel.properties, Some(expr), &row)?
904                        }
905                        None => false,
906                    };
907                    if !matches {
908                        continue;
909                    }
910                }
911
912                if let Some(existing_dst) = row.get(op.dst) {
913                    match existing_dst {
914                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
915                        LoraValue::Node(_) => continue,
916                        other => {
917                            return Err(ExecutorError::ExpectedNodeForExpand {
918                                var: format!("{:?}", op.dst),
919                                found: value_kind(other),
920                            });
921                        }
922                    }
923                }
924
925                if let Some(rel_var) = op.rel {
926                    if let Some(existing_rel) = row.get(rel_var) {
927                        match existing_rel {
928                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
929                            LoraValue::Relationship(_) => continue,
930                            other => {
931                                return Err(ExecutorError::ExpectedRelationshipForExpand {
932                                    var: format!("{:?}", rel_var),
933                                    found: value_kind(other),
934                                });
935                            }
936                        }
937                    }
938                }
939
940                let mut new_row = row.clone();
941
942                if !new_row.contains_key(op.dst) {
943                    new_row.insert(op.dst, LoraValue::Node(dst_id));
944                }
945
946                if let Some(rel_var) = op.rel {
947                    if !new_row.contains_key(rel_var) {
948                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
949                    }
950                }
951
952                out.push(new_row);
953            }
954        }
955
956        Ok(out)
957    }
958
959    fn exec_expand_var_len(
960        &mut self,
961        plan: &PhysicalPlan,
962        op: &ExpandExec,
963        range: &RangeLiteral,
964    ) -> ExecResult<Vec<Row>> {
965        let input_rows = self.execute_node(plan, op.input)?;
966        let (min_hops, max_hops) = resolve_range(range);
967        let mut out = Vec::new();
968
969        for row in input_rows {
970            let src_node_id = match row.get(op.src) {
971                Some(LoraValue::Node(id)) => *id,
972                Some(other) => {
973                    return Err(ExecutorError::ExpectedNodeForExpand {
974                        var: format!("{:?}", op.src),
975                        found: value_kind(other),
976                    });
977                }
978                None => continue,
979            };
980
981            let expansions = variable_length_expand(
982                &*self.ctx.storage,
983                src_node_id,
984                op.direction,
985                &op.types,
986                min_hops,
987                max_hops,
988            );
989
990            for result in expansions {
991                let mut new_row = row.clone();
992                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
993
994                if let Some(rel_var) = op.rel {
995                    // Consume rel_ids — it's owned and no longer needed after this.
996                    let rel_list = LoraValue::List(
997                        result
998                            .rel_ids
999                            .into_iter()
1000                            .map(LoraValue::Relationship)
1001                            .collect(),
1002                    );
1003                    new_row.insert(rel_var, rel_list);
1004                }
1005
1006                out.push(new_row);
1007            }
1008        }
1009
1010        Ok(out)
1011    }
1012
1013    fn relationship_matches_properties(
1014        &self,
1015        actual: &Properties,
1016        expected_expr: Option<&ResolvedExpr>,
1017        row: &Row,
1018    ) -> ExecResult<bool> {
1019        let Some(expr) = expected_expr else {
1020            return Ok(true);
1021        };
1022
1023        let eval_ctx = EvalContext {
1024            storage: &*self.ctx.storage,
1025            params: &self.ctx.params,
1026        };
1027
1028        let expected = eval_expr(expr, row, &eval_ctx);
1029
1030        let LoraValue::Map(expected_map) = expected else {
1031            return Err(ExecutorError::ExpectedPropertyMap {
1032                found: value_kind(&expected),
1033            });
1034        };
1035
1036        Ok(expected_map.iter().all(|(key, expected_value)| {
1037            actual
1038                .get(key)
1039                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1040                .unwrap_or(false)
1041        }))
1042    }
1043
1044    fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1045        let input_rows = self.execute_node(plan, op.input)?;
1046        let eval_ctx = EvalContext {
1047            storage: &*self.ctx.storage,
1048            params: &self.ctx.params,
1049        };
1050
1051        Ok(input_rows
1052            .into_iter()
1053            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1054            .collect())
1055    }
1056
1057    fn exec_projection(
1058        &mut self,
1059        plan: &PhysicalPlan,
1060        op: &ProjectionExec,
1061    ) -> ExecResult<Vec<Row>> {
1062        let input_rows = self.execute_node(plan, op.input)?;
1063        let eval_ctx = EvalContext {
1064            storage: &*self.ctx.storage,
1065            params: &self.ctx.params,
1066        };
1067
1068        let mut out = Vec::with_capacity(input_rows.len());
1069
1070        for row in input_rows {
1071            if op.include_existing {
1072                let mut projected = row;
1073                for item in &op.items {
1074                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
1075                    if let Some(err) = take_eval_error() {
1076                        return Err(ExecutorError::RuntimeError(err));
1077                    }
1078                    projected.insert_named(item.output, item.name.clone(), value);
1079                }
1080                out.push(projected);
1081            } else {
1082                let mut projected = Row::new();
1083                for item in &op.items {
1084                    let value = eval_expr(&item.expr, &row, &eval_ctx);
1085                    if let Some(err) = take_eval_error() {
1086                        return Err(ExecutorError::RuntimeError(err));
1087                    }
1088                    projected.insert_named(item.output, item.name.clone(), value);
1089                }
1090                out.push(projected);
1091            }
1092        }
1093
1094        Ok(if op.distinct {
1095            dedup_rows_by_vars(out)
1096        } else {
1097            out
1098        })
1099    }
1100
1101    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1102        match value {
1103            LoraValue::Node(id) => self.hydrate_node(id),
1104            LoraValue::Relationship(id) => self.hydrate_relationship(id),
1105            LoraValue::List(values) => {
1106                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1107            }
1108            LoraValue::Map(map) => LoraValue::Map(
1109                map.into_iter()
1110                    .map(|(k, v)| (k, self.hydrate_value(v)))
1111                    .collect(),
1112            ),
1113            other => other,
1114        }
1115    }
1116
1117    fn hydrate_node(&self, id: u64) -> LoraValue {
1118        match self.ctx.storage.node_ref(id) {
1119            Some(node) => hydrate_node_record(node),
1120            None => LoraValue::Null,
1121        }
1122    }
1123
1124    fn hydrate_relationship(&self, id: u64) -> LoraValue {
1125        match self.ctx.storage.relationship_ref(id) {
1126            Some(rel) => hydrate_relationship_record(rel),
1127            None => LoraValue::Null,
1128        }
1129    }
1130
1131    fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1132        let input_rows = self.execute_node(plan, op.input)?;
1133        let eval_ctx = EvalContext {
1134            storage: &*self.ctx.storage,
1135            params: &self.ctx.params,
1136        };
1137
1138        let mut out = Vec::new();
1139
1140        for row in input_rows {
1141            match eval_expr(&op.expr, &row, &eval_ctx) {
1142                LoraValue::List(values) => {
1143                    for value in values {
1144                        let mut new_row = row.clone();
1145                        new_row.insert(op.alias, value);
1146                        out.push(new_row);
1147                    }
1148                }
1149                LoraValue::Null => {}
1150                other => {
1151                    let mut new_row = row;
1152                    new_row.insert(op.alias, other);
1153                    out.push(new_row);
1154                }
1155            }
1156        }
1157
1158        Ok(out)
1159    }
1160
1161    fn exec_hash_aggregation(
1162        &mut self,
1163        plan: &PhysicalPlan,
1164        op: &HashAggregationExec,
1165    ) -> ExecResult<Vec<Row>> {
1166        let input_rows = self.execute_node(plan, op.input)?;
1167        let eval_ctx = EvalContext {
1168            storage: &*self.ctx.storage,
1169            params: &self.ctx.params,
1170        };
1171
1172        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1173
1174        if op.group_by.is_empty() {
1175            groups.insert(Vec::new(), input_rows);
1176        } else {
1177            for row in input_rows {
1178                let key = op
1179                    .group_by
1180                    .iter()
1181                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1182                    .collect::<Vec<_>>();
1183
1184                groups.entry(key).or_default().push(row);
1185            }
1186        }
1187
1188        let mut out = Vec::new();
1189
1190        for rows in groups.into_values() {
1191            let mut result = Row::new();
1192
1193            if let Some(first) = rows.first() {
1194                for proj in &op.group_by {
1195                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1196                    result.insert_named(proj.output, proj.name.clone(), value);
1197                }
1198            }
1199
1200            for proj in &op.aggregates {
1201                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1202                result.insert_named(proj.output, proj.name.clone(), value);
1203            }
1204
1205            out.push(result);
1206        }
1207
1208        Ok(out)
1209    }
1210
1211    fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1212        let mut rows = self.execute_node(plan, op.input)?;
1213        let eval_ctx = EvalContext {
1214            storage: &*self.ctx.storage,
1215            params: &self.ctx.params,
1216        };
1217
1218        rows.sort_by(|a, b| {
1219            for item in &op.items {
1220                let ord = compare_sort_item(item, a, b, &eval_ctx);
1221                if ord != Ordering::Equal {
1222                    return ord;
1223                }
1224            }
1225            Ordering::Equal
1226        });
1227
1228        Ok(rows)
1229    }
1230
1231    fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1232        let mut rows = self.execute_node(plan, op.input)?;
1233        let eval_ctx = EvalContext {
1234            storage: &*self.ctx.storage,
1235            params: &self.ctx.params,
1236        };
1237
1238        let limit = op
1239            .limit
1240            .as_ref()
1241            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1242            .unwrap_or(rows.len() as i64)
1243            .max(0) as usize;
1244
1245        let skip = op
1246            .skip
1247            .as_ref()
1248            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1249            .unwrap_or(0)
1250            .max(0) as usize;
1251
1252        if skip >= rows.len() {
1253            return Ok(Vec::new());
1254        }
1255
1256        rows.drain(0..skip);
1257        rows.truncate(limit);
1258        Ok(rows)
1259    }
1260
1261    fn exec_optional_match(
1262        &mut self,
1263        plan: &PhysicalPlan,
1264        op: &OptionalMatchExec,
1265    ) -> ExecResult<Vec<Row>> {
1266        let input_rows = self.execute_node(plan, op.input)?;
1267
1268        // Inner plan is read-only and input-independent; execute once and reuse.
1269        let inner_rows = self.execute_node(plan, op.inner)?;
1270
1271        let mut out = Vec::new();
1272
1273        for input_row in input_rows {
1274            let mut matched = false;
1275
1276            for inner_row in &inner_rows {
1277                let compatible = input_row
1278                    .iter()
1279                    .all(|(var, val)| match inner_row.get(*var) {
1280                        Some(inner_val) => inner_val == val,
1281                        None => true,
1282                    });
1283                if !compatible {
1284                    continue;
1285                }
1286
1287                let mut merged = input_row.clone();
1288                for (var, name, val) in inner_row.iter_named() {
1289                    if !merged.contains_key(*var) {
1290                        merged.insert_named(*var, name.into_owned(), val.clone());
1291                    }
1292                }
1293                out.push(merged);
1294                matched = true;
1295            }
1296
1297            if !matched {
1298                let mut null_row = input_row;
1299                for &var_id in &op.new_vars {
1300                    if !null_row.contains_key(var_id) {
1301                        null_row.insert(var_id, LoraValue::Null);
1302                    }
1303                }
1304                out.push(null_row);
1305            }
1306        }
1307
1308        Ok(out)
1309    }
1310
1311    fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1312        let input_rows = self.execute_node(plan, op.input)?;
1313        let mut rows: Vec<Row> = input_rows
1314            .into_iter()
1315            .map(|mut row| {
1316                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1317                row.insert(op.output, path);
1318                row
1319            })
1320            .collect();
1321
1322        if let Some(all) = op.shortest_path_all {
1323            rows = filter_shortest_paths(rows, op.output, all);
1324        }
1325        Ok(rows)
1326    }
1327
1328    fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1329        let input_rows = self.execute_node(plan, op.input)?;
1330        let mut out = Vec::with_capacity(input_rows.len());
1331
1332        for mut row in input_rows {
1333            self.apply_create_pattern(&mut row, &op.pattern)?;
1334            out.push(row);
1335        }
1336
1337        Ok(out)
1338    }
1339
1340    fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1341        match item {
1342            ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1343                Some(LoraValue::Node(node_id)) => {
1344                    let node_id = *node_id;
1345                    for label in labels {
1346                        self.ctx.storage.remove_node_label(node_id, label);
1347                    }
1348                    Ok(())
1349                }
1350                Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1351                    found: value_kind(other),
1352                }),
1353                None => Err(ExecutorError::UnboundVariableForRemove {
1354                    var: format!("{variable:?}"),
1355                }),
1356            },
1357
1358            ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1359        }
1360    }
1361
1362    fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1363        match value {
1364            LoraValue::Null => Ok(()),
1365
1366            LoraValue::Node(node_id) => {
1367                if detach {
1368                    self.ctx.storage.detach_delete_node(node_id);
1369                    Ok(())
1370                } else {
1371                    let ok = self.ctx.storage.delete_node(node_id);
1372                    if ok {
1373                        Ok(())
1374                    } else {
1375                        Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1376                    }
1377                }
1378            }
1379
1380            LoraValue::Relationship(rel_id) => {
1381                let ok = self.ctx.storage.delete_relationship(rel_id);
1382                if ok {
1383                    Ok(())
1384                } else {
1385                    Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1386                }
1387            }
1388
1389            LoraValue::List(values) => {
1390                for v in values {
1391                    self.delete_value(v, detach)?;
1392                }
1393                Ok(())
1394            }
1395
1396            other => Err(ExecutorError::InvalidDeleteTarget {
1397                found: value_kind(&other),
1398            }),
1399        }
1400    }
1401
1402    fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1403        let input_rows = self.execute_node(plan, op.input)?;
1404        let mut out = Vec::with_capacity(input_rows.len());
1405
1406        for mut row in input_rows {
1407            // First check if the pattern variable is already bound in the row.
1408            let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1409
1410            let matched = if already_bound {
1411                true
1412            } else {
1413                // Try to find an existing match in the graph.
1414                self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1415            };
1416
1417            if !matched {
1418                self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1419            }
1420
1421            for action in &op.actions {
1422                if action.on_match == matched {
1423                    for item in &action.set.items {
1424                        self.apply_set_item(&row, item)?;
1425                    }
1426                }
1427            }
1428
1429            out.push(row);
1430        }
1431
1432        Ok(out)
1433    }
1434
1435    /// Try to find an existing node/pattern in the graph matching the MERGE
1436    /// pattern. If found, bind the variable in the row and return true.
1437    fn try_match_merge_pattern(
1438        &self,
1439        row: &mut Row,
1440        part: &ResolvedPatternPart,
1441    ) -> ExecResult<bool> {
1442        match &part.element {
1443            ResolvedPatternElement::Node {
1444                var,
1445                labels,
1446                properties,
1447            } => {
1448                // ID-only candidate discovery; borrow the record during
1449                // label/property filtering to avoid cloning non-matches.
1450                let candidate_ids = if labels.is_empty() {
1451                    self.ctx.storage.all_node_ids()
1452                } else {
1453                    scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1454                };
1455
1456                // Filter by properties if specified
1457                let eval_ctx = EvalContext {
1458                    storage: &*self.ctx.storage,
1459                    params: &self.ctx.params,
1460                };
1461                let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1462
1463                for id in candidate_ids {
1464                    let Some(node) = self.ctx.storage.node_ref(id) else {
1465                        continue;
1466                    };
1467                    if !node_matches_label_groups(&node.labels, labels) {
1468                        continue;
1469                    }
1470                    if let Some(LoraValue::Map(expected)) = &expected_props {
1471                        let all_match = expected.iter().all(|(key, expected_value)| {
1472                            node.properties
1473                                .get(key)
1474                                .map(|actual| value_matches_property_value(expected_value, actual))
1475                                .unwrap_or(false)
1476                        });
1477                        if !all_match {
1478                            continue;
1479                        }
1480                    }
1481
1482                    // Found a match — bind the variable
1483                    if let Some(var_id) = var {
1484                        row.insert(*var_id, LoraValue::Node(node.id));
1485                    }
1486                    return Ok(true);
1487                }
1488
1489                Ok(false)
1490            }
1491
1492            ResolvedPatternElement::ShortestPath { .. } => {
1493                // ShortestPath is not valid in MERGE context
1494                Ok(false)
1495            }
1496
1497            ResolvedPatternElement::NodeChain { head, chain } => {
1498                // Resolve the head node — it should be already bound in the row.
1499                let head_node_id = if let Some(var_id) = head.var {
1500                    if let Some(LoraValue::Node(id)) = row.get(var_id) {
1501                        *id
1502                    } else {
1503                        // Try to match head node as a standalone node pattern.
1504                        let node_matched = self.try_match_merge_pattern(
1505                            row,
1506                            &ResolvedPatternPart {
1507                                binding: None,
1508                                element: ResolvedPatternElement::Node {
1509                                    var: head.var,
1510                                    labels: head.labels.clone(),
1511                                    properties: head.properties.clone(),
1512                                },
1513                            },
1514                        )?;
1515                        if !node_matched {
1516                            return Ok(false);
1517                        }
1518                        match row.get(var_id) {
1519                            Some(LoraValue::Node(id)) => *id,
1520                            _ => return Ok(false),
1521                        }
1522                    }
1523                } else {
1524                    return Ok(false);
1525                };
1526
1527                let mut current_node_id = head_node_id;
1528
1529                for step in chain {
1530                    let eval_ctx = EvalContext {
1531                        storage: &*self.ctx.storage,
1532                        params: &self.ctx.params,
1533                    };
1534
1535                    let _ = step.rel.types.first();
1536                    let direction = step.rel.direction;
1537
1538                    // ID-only traversal; look up records by reference only for
1539                    // candidates that pass the label/property filters.
1540                    let edges =
1541                        self.ctx
1542                            .storage
1543                            .expand_ids(current_node_id, direction, &step.rel.types);
1544
1545                    // Try to find a matching edge + target node
1546                    let mut found = false;
1547                    for (rel_id, node_id) in edges {
1548                        let Some(node_rec) = self.ctx.storage.node_ref(node_id) else {
1549                            continue;
1550                        };
1551                        let Some(rel_rec) = self.ctx.storage.relationship_ref(rel_id) else {
1552                            continue;
1553                        };
1554
1555                        // Check target node labels
1556                        if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
1557                            continue;
1558                        }
1559
1560                        // Check target node properties
1561                        if let Some(props_expr) = &step.node.properties {
1562                            let expected = eval_expr(props_expr, row, &eval_ctx);
1563                            if let LoraValue::Map(expected_map) = &expected {
1564                                let all_match = expected_map.iter().all(|(key, expected_val)| {
1565                                    node_rec
1566                                        .properties
1567                                        .get(key)
1568                                        .map(|actual| {
1569                                            value_matches_property_value(expected_val, actual)
1570                                        })
1571                                        .unwrap_or(false)
1572                                });
1573                                if !all_match {
1574                                    continue;
1575                                }
1576                            }
1577                        }
1578
1579                        // Check relationship properties
1580                        if let Some(rel_props_expr) = &step.rel.properties {
1581                            let expected = eval_expr(rel_props_expr, row, &eval_ctx);
1582                            if let LoraValue::Map(expected_map) = &expected {
1583                                let all_match = expected_map.iter().all(|(key, expected_val)| {
1584                                    rel_rec
1585                                        .properties
1586                                        .get(key)
1587                                        .map(|actual| {
1588                                            value_matches_property_value(expected_val, actual)
1589                                        })
1590                                        .unwrap_or(false)
1591                                });
1592                                if !all_match {
1593                                    continue;
1594                                }
1595                            }
1596                        }
1597
1598                        // Match found — bind variables
1599                        if let Some(rel_var) = step.rel.var {
1600                            row.insert(rel_var, LoraValue::Relationship(rel_id));
1601                        }
1602                        if let Some(node_var) = step.node.var {
1603                            row.insert(node_var, LoraValue::Node(node_id));
1604                        }
1605                        current_node_id = node_id;
1606                        found = true;
1607                        break;
1608                    }
1609
1610                    if !found {
1611                        return Ok(false);
1612                    }
1613                }
1614
1615                Ok(true)
1616            }
1617        }
1618    }
1619
1620    fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
1621        let input_rows = self.execute_node(plan, op.input)?;
1622
1623        for row in &input_rows {
1624            for expr in &op.expressions {
1625                let value = {
1626                    let eval_ctx = EvalContext {
1627                        storage: &*self.ctx.storage,
1628                        params: &self.ctx.params,
1629                    };
1630                    eval_expr(expr, row, &eval_ctx)
1631                };
1632
1633                self.delete_value(value, op.detach)?;
1634            }
1635        }
1636
1637        Ok(input_rows)
1638    }
1639
1640    fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
1641        let input_rows = self.execute_node(plan, op.input)?;
1642
1643        for row in &input_rows {
1644            for item in &op.items {
1645                self.apply_set_item(row, item)?;
1646            }
1647        }
1648
1649        Ok(input_rows)
1650    }
1651
1652    fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
1653        let input_rows = self.execute_node(plan, op.input)?;
1654
1655        for row in &input_rows {
1656            for item in &op.items {
1657                self.apply_remove_item(row, item)?;
1658            }
1659        }
1660
1661        Ok(input_rows)
1662    }
1663
1664    fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
1665        match item {
1666            ResolvedSetItem::SetProperty { target, value } => {
1667                let new_value = {
1668                    let eval_ctx = EvalContext {
1669                        storage: &*self.ctx.storage,
1670                        params: &self.ctx.params,
1671                    };
1672                    eval_expr(value, row, &eval_ctx)
1673                };
1674
1675                self.set_property_from_expr(row, target, new_value)
1676            }
1677
1678            ResolvedSetItem::SetVariable { variable, value } => {
1679                // Only need the entity's id — peek at the binding by reference.
1680                let entity_ref =
1681                    row.get(*variable)
1682                        .ok_or(ExecutorError::UnboundVariableForSet {
1683                            var: format!("{variable:?}"),
1684                        })?;
1685                let entity_target = entity_target_from_value(entity_ref)?;
1686
1687                let new_value = {
1688                    let eval_ctx = EvalContext {
1689                        storage: &*self.ctx.storage,
1690                        params: &self.ctx.params,
1691                    };
1692                    eval_expr(value, row, &eval_ctx)
1693                };
1694
1695                self.overwrite_entity_target(entity_target, new_value)
1696            }
1697
1698            ResolvedSetItem::MutateVariable { variable, value } => {
1699                let entity_ref =
1700                    row.get(*variable)
1701                        .ok_or(ExecutorError::UnboundVariableForSet {
1702                            var: format!("{variable:?}"),
1703                        })?;
1704                let entity_target = entity_target_from_value(entity_ref)?;
1705
1706                let patch = {
1707                    let eval_ctx = EvalContext {
1708                        storage: &*self.ctx.storage,
1709                        params: &self.ctx.params,
1710                    };
1711                    eval_expr(value, row, &eval_ctx)
1712                };
1713
1714                self.mutate_entity_target(entity_target, patch)
1715            }
1716
1717            ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
1718                Some(LoraValue::Node(node_id)) => {
1719                    let node_id = *node_id;
1720                    for label in labels {
1721                        self.ctx.storage.add_node_label(node_id, label);
1722                    }
1723                    Ok(())
1724                }
1725                Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
1726                    found: value_kind(other),
1727                }),
1728                None => Err(ExecutorError::UnboundVariableForSet {
1729                    var: format!("{variable:?}"),
1730                }),
1731            },
1732        }
1733    }
1734
1735    fn set_property_from_expr(
1736        &mut self,
1737        row: &Row,
1738        target_expr: &ResolvedExpr,
1739        new_value: LoraValue,
1740    ) -> ExecResult<()> {
1741        let ResolvedExpr::Property { expr, property } = target_expr else {
1742            return Err(ExecutorError::UnsupportedSetTarget);
1743        };
1744
1745        let owner = {
1746            let eval_ctx = EvalContext {
1747                storage: &*self.ctx.storage,
1748                params: &self.ctx.params,
1749            };
1750            eval_expr(expr, row, &eval_ctx)
1751        };
1752
1753        match owner {
1754            LoraValue::Node(node_id) => {
1755                let prop = lora_value_to_property(new_value)
1756                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1757                self.ctx
1758                    .storage
1759                    .set_node_property(node_id, property.clone(), prop);
1760                Ok(())
1761            }
1762            LoraValue::Relationship(rel_id) => {
1763                let prop = lora_value_to_property(new_value)
1764                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1765                self.ctx
1766                    .storage
1767                    .set_relationship_property(rel_id, property.clone(), prop);
1768                Ok(())
1769            }
1770            other => Err(ExecutorError::InvalidSetTarget {
1771                found: value_kind(&other),
1772            }),
1773        }
1774    }
1775
1776    fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
1777        let ResolvedExpr::Property {
1778            expr: owner_expr,
1779            property,
1780        } = expr
1781        else {
1782            return Err(ExecutorError::UnsupportedRemoveTarget);
1783        };
1784
1785        let owner = {
1786            let eval_ctx = EvalContext {
1787                storage: &*self.ctx.storage,
1788                params: &self.ctx.params,
1789            };
1790            eval_expr(owner_expr, row, &eval_ctx)
1791        };
1792
1793        match owner {
1794            LoraValue::Node(node_id) => {
1795                self.ctx.storage.remove_node_property(node_id, property);
1796                Ok(())
1797            }
1798            LoraValue::Relationship(rel_id) => {
1799                self.ctx
1800                    .storage
1801                    .remove_relationship_property(rel_id, property);
1802                Ok(())
1803            }
1804            other => Err(ExecutorError::InvalidRemoveTarget {
1805                found: value_kind(&other),
1806            }),
1807        }
1808    }
1809
1810    fn overwrite_entity_target(
1811        &mut self,
1812        target: EntityTarget,
1813        new_value: LoraValue,
1814    ) -> ExecResult<()> {
1815        let LoraValue::Map(map) = new_value else {
1816            return Err(ExecutorError::ExpectedPropertyMap {
1817                found: value_kind(&new_value),
1818            });
1819        };
1820
1821        let mut props: Properties = Properties::new();
1822        for (k, v) in map {
1823            let prop = lora_value_to_property(v)
1824                .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1825            props.insert(k, prop);
1826        }
1827
1828        match target {
1829            EntityTarget::Node(node_id) => {
1830                self.ctx.storage.replace_node_properties(node_id, props);
1831            }
1832            EntityTarget::Relationship(rel_id) => {
1833                self.ctx
1834                    .storage
1835                    .replace_relationship_properties(rel_id, props);
1836            }
1837        }
1838        Ok(())
1839    }
1840
1841    fn mutate_entity_target(
1842        &mut self,
1843        target: EntityTarget,
1844        patch_value: LoraValue,
1845    ) -> ExecResult<()> {
1846        let LoraValue::Map(map) = patch_value else {
1847            return Err(ExecutorError::ExpectedPropertyMap {
1848                found: value_kind(&patch_value),
1849            });
1850        };
1851
1852        match target {
1853            EntityTarget::Node(node_id) => {
1854                for (k, v) in map {
1855                    let prop = lora_value_to_property(v)
1856                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1857                    self.ctx.storage.set_node_property(node_id, k, prop);
1858                }
1859            }
1860            EntityTarget::Relationship(rel_id) => {
1861                for (k, v) in map {
1862                    let prop = lora_value_to_property(v)
1863                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1864                    self.ctx.storage.set_relationship_property(rel_id, k, prop);
1865                }
1866            }
1867        }
1868        Ok(())
1869    }
1870
1871    fn apply_create_pattern(&mut self, row: &mut Row, pattern: &ResolvedPattern) -> ExecResult<()> {
1872        for part in &pattern.parts {
1873            self.apply_create_pattern_part(row, part)?;
1874        }
1875        Ok(())
1876    }
1877
1878    fn apply_create_pattern_part(
1879        &mut self,
1880        row: &mut Row,
1881        part: &ResolvedPatternPart,
1882    ) -> ExecResult<()> {
1883        if part.binding.is_some() {
1884            trace!("create pattern part has path binding; path materialization not implemented");
1885        }
1886
1887        let _ = self.apply_create_pattern_element(row, &part.element)?;
1888        Ok(())
1889    }
1890
1891    fn apply_create_pattern_element(
1892        &mut self,
1893        row: &mut Row,
1894        element: &ResolvedPatternElement,
1895    ) -> ExecResult<Option<LoraValue>> {
1896        match element {
1897            ResolvedPatternElement::Node {
1898                var,
1899                labels,
1900                properties,
1901            } => {
1902                let node_id =
1903                    self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
1904                Ok(Some(LoraValue::Node(node_id)))
1905            }
1906
1907            ResolvedPatternElement::NodeChain { head, chain } => {
1908                let mut current_node_id = self.materialize_node_pattern(
1909                    row,
1910                    head.var,
1911                    &head.labels,
1912                    head.properties.as_ref(),
1913                )?;
1914
1915                for link in chain {
1916                    let next_node_id = self.materialize_node_pattern(
1917                        row,
1918                        link.node.var,
1919                        &link.node.labels,
1920                        link.node.properties.as_ref(),
1921                    )?;
1922
1923                    let _ = self.materialize_relationship_pattern(
1924                        row,
1925                        current_node_id,
1926                        next_node_id,
1927                        &link.rel,
1928                    )?;
1929
1930                    current_node_id = next_node_id;
1931                }
1932
1933                Ok(Some(LoraValue::Node(current_node_id)))
1934            }
1935
1936            ResolvedPatternElement::ShortestPath { .. } => {
1937                // ShortestPath is not valid in CREATE context
1938                Ok(None)
1939            }
1940        }
1941    }
1942
1943    fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
1944        match &part.element {
1945            ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
1946
1947            ResolvedPatternElement::ShortestPath { .. } => false,
1948
1949            ResolvedPatternElement::NodeChain { head, chain } => {
1950                let head_ok = head.var.and_then(|v| row.get(v)).is_some();
1951
1952                let chain_ok = chain.iter().all(|link| {
1953                    let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
1954                    // For MERGE, anonymous relationships cannot be considered
1955                    // "bound" because we have no variable to check. The merge
1956                    // must search the graph to see if the relationship exists.
1957                    let rel_ok = match link.rel.var {
1958                        Some(v) => row.get(v).is_some(),
1959                        None => false,
1960                    };
1961                    node_ok && rel_ok
1962                });
1963
1964                head_ok && chain_ok
1965            }
1966        }
1967    }
1968
1969    fn materialize_node_pattern(
1970        &mut self,
1971        row: &mut Row,
1972        var: Option<lora_analyzer::symbols::VarId>,
1973        labels: &[Vec<String>],
1974        properties: Option<&ResolvedExpr>,
1975    ) -> ExecResult<u64> {
1976        if let Some(var_id) = var {
1977            if let Some(LoraValue::Node(id)) = row.get(var_id) {
1978                return Ok(*id);
1979            }
1980        }
1981
1982        let properties = match properties {
1983            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
1984            None => Properties::new(),
1985        };
1986
1987        let flat_labels = flatten_label_groups(labels);
1988        debug!("creating node with labels={flat_labels:?}");
1989        let created = self.ctx.storage.create_node(flat_labels, properties);
1990
1991        if let Some(var_id) = var {
1992            row.insert(var_id, LoraValue::Node(created.id));
1993        }
1994
1995        Ok(created.id)
1996    }
1997
1998    fn materialize_relationship_pattern(
1999        &mut self,
2000        row: &mut Row,
2001        left_node_id: u64,
2002        right_node_id: u64,
2003        rel: &lora_analyzer::ResolvedRel,
2004    ) -> ExecResult<u64> {
2005        if let Some(var_id) = rel.var {
2006            if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2007                let id = *id;
2008                if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2009                    let endpoints_match = match rel.direction {
2010                        Direction::Right | Direction::Undirected => {
2011                            src == left_node_id && dst == right_node_id
2012                        }
2013                        Direction::Left => src == right_node_id && dst == left_node_id,
2014                    };
2015
2016                    if endpoints_match {
2017                        return Ok(id);
2018                    }
2019                }
2020            }
2021        }
2022
2023        if rel.range.is_some() {
2024            return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2025        }
2026
2027        let (src, dst) = match rel.direction {
2028            Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2029            Direction::Left => (right_node_id, left_node_id),
2030        };
2031
2032        let rel_type = rel
2033            .types
2034            .first()
2035            .ok_or(ExecutorError::MissingRelationshipType)?;
2036
2037        if rel_type.is_empty() {
2038            return Err(ExecutorError::MissingRelationshipType);
2039        }
2040
2041        let properties = match rel.properties.as_ref() {
2042            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2043            None => Properties::new(),
2044        };
2045
2046        debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2047
2048        let created = self
2049            .ctx
2050            .storage
2051            .create_relationship(src, dst, rel_type, properties)
2052            .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2053                src,
2054                dst,
2055                rel_type: rel_type.clone(),
2056            })?;
2057
2058        if let Some(var_id) = rel.var {
2059            row.insert(var_id, LoraValue::Relationship(created.id));
2060        }
2061
2062        Ok(created.id)
2063    }
2064}
2065
2066/// Lightweight target for SET property-mutation paths. Lets the SET logic
2067/// borrow the row entry (just pulling out the id) instead of cloning the
2068/// whole `LoraValue`.
2069#[derive(Clone, Copy)]
2070enum EntityTarget {
2071    Node(NodeId),
2072    Relationship(u64),
2073}
2074
2075fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2076    match value {
2077        LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2078        LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2079        other => Err(ExecutorError::InvalidSetTarget {
2080            found: value_kind(other),
2081        }),
2082    }
2083}
2084
2085/// Dedup rows that share the same schema (same VarId set). Compares rows by
2086/// a Vec<GroupValueKey> keyed on VarId iteration order — avoids the per-row
2087/// column-name String clones of `dedup_rows`. Used by DISTINCT projection.
2088fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2089    let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2090    let mut out = Vec::new();
2091
2092    for row in rows {
2093        let key: Vec<GroupValueKey> = row
2094            .iter()
2095            .map(|(_, val)| GroupValueKey::from_value(val))
2096            .collect();
2097        if seen.insert(key) {
2098            out.push(row);
2099        }
2100    }
2101
2102    out
2103}
2104
2105/// Dedup rows using named entries so rows with different VarIds but the same
2106/// column name + value are collapsed. Needed for UNION where each branch has
2107/// its own VarIds.
2108fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2109    let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2110    let mut out = Vec::new();
2111
2112    for row in rows {
2113        let key: Vec<(String, GroupValueKey)> = row
2114            .iter_named()
2115            .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2116            .collect();
2117        if seen.insert(key) {
2118            out.push(row);
2119        }
2120    }
2121
2122    out
2123}
2124
2125fn eval_properties_expr<S: GraphStorage + ?Sized>(
2126    expr: &ResolvedExpr,
2127    row: &Row,
2128    storage: &S,
2129    params: &std::collections::BTreeMap<String, LoraValue>,
2130) -> ExecResult<Properties> {
2131    let eval_ctx = EvalContext { storage, params };
2132
2133    match eval_expr(expr, row, &eval_ctx) {
2134        LoraValue::Map(map) => {
2135            let mut out = Properties::new();
2136            for (k, v) in map {
2137                let prop = lora_value_to_property(v)
2138                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2139                out.insert(k, prop);
2140            }
2141            Ok(out)
2142        }
2143        other => Err(ExecutorError::ExpectedPropertyMap {
2144            found: value_kind(&other),
2145        }),
2146    }
2147}
2148
2149fn compute_aggregate_expr<S: GraphStorage + ?Sized>(
2150    expr: &ResolvedExpr,
2151    rows: &[Row],
2152    eval_ctx: &EvalContext<'_, S>,
2153) -> LoraValue {
2154    match expr {
2155        ResolvedExpr::Function {
2156            name,
2157            distinct,
2158            args,
2159        } => {
2160            let func = name.to_ascii_lowercase();
2161
2162            match func.as_str() {
2163                "count" => {
2164                    if args.is_empty() {
2165                        return LoraValue::Int(rows.len() as i64);
2166                    }
2167
2168                    let mut values = rows
2169                        .iter()
2170                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2171                        .filter(|v| !matches!(v, LoraValue::Null))
2172                        .collect::<Vec<_>>();
2173
2174                    if *distinct {
2175                        values = dedup_values(values);
2176                    }
2177
2178                    LoraValue::Int(values.len() as i64)
2179                }
2180
2181                "collect" => {
2182                    if args.is_empty() {
2183                        return LoraValue::List(Vec::new());
2184                    }
2185
2186                    let mut values = rows
2187                        .iter()
2188                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2189                        .collect::<Vec<_>>();
2190
2191                    if *distinct {
2192                        values = dedup_values(values);
2193                    }
2194
2195                    LoraValue::List(values)
2196                }
2197
2198                "sum" => {
2199                    if args.is_empty() {
2200                        return LoraValue::Null;
2201                    }
2202
2203                    let mut values = rows
2204                        .iter()
2205                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2206                        .collect::<Vec<_>>();
2207
2208                    if *distinct {
2209                        values = dedup_values(values);
2210                    }
2211
2212                    let nums = values
2213                        .into_iter()
2214                        .filter_map(as_f64_lossy)
2215                        .collect::<Vec<_>>();
2216
2217                    if nums.is_empty() {
2218                        LoraValue::Null
2219                    } else if nums.iter().all(|n| n.fract() == 0.0) {
2220                        LoraValue::Int(nums.iter().sum::<f64>() as i64)
2221                    } else {
2222                        LoraValue::Float(nums.iter().sum::<f64>())
2223                    }
2224                }
2225
2226                "avg" => {
2227                    if args.is_empty() {
2228                        return LoraValue::Null;
2229                    }
2230
2231                    let mut values = rows
2232                        .iter()
2233                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2234                        .collect::<Vec<_>>();
2235
2236                    if *distinct {
2237                        values = dedup_values(values);
2238                    }
2239
2240                    let nums = values
2241                        .into_iter()
2242                        .filter_map(as_f64_lossy)
2243                        .collect::<Vec<_>>();
2244
2245                    if nums.is_empty() {
2246                        LoraValue::Null
2247                    } else {
2248                        LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2249                    }
2250                }
2251
2252                "min" => {
2253                    if args.is_empty() {
2254                        return LoraValue::Null;
2255                    }
2256
2257                    let mut values = rows
2258                        .iter()
2259                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2260                        .filter(|v| !matches!(v, LoraValue::Null))
2261                        .collect::<Vec<_>>();
2262
2263                    if *distinct {
2264                        values = dedup_values(values);
2265                    }
2266
2267                    values
2268                        .into_iter()
2269                        .min_by(compare_values_total)
2270                        .unwrap_or(LoraValue::Null)
2271                }
2272
2273                "max" => {
2274                    if args.is_empty() {
2275                        return LoraValue::Null;
2276                    }
2277
2278                    let mut values = rows
2279                        .iter()
2280                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2281                        .filter(|v| !matches!(v, LoraValue::Null))
2282                        .collect::<Vec<_>>();
2283
2284                    if *distinct {
2285                        values = dedup_values(values);
2286                    }
2287
2288                    values
2289                        .into_iter()
2290                        .max_by(compare_values_total)
2291                        .unwrap_or(LoraValue::Null)
2292                }
2293
2294                "stdev" | "stdevp" => {
2295                    if args.is_empty() {
2296                        return LoraValue::Null;
2297                    }
2298
2299                    let nums: Vec<f64> = rows
2300                        .iter()
2301                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2302                        .filter_map(as_f64_lossy)
2303                        .collect();
2304
2305                    let is_population = func == "stdevp";
2306
2307                    if nums.is_empty() || (!is_population && nums.len() < 2) {
2308                        return LoraValue::Float(0.0);
2309                    }
2310
2311                    let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2312                    let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2313                    let denom = if is_population {
2314                        nums.len() as f64
2315                    } else {
2316                        (nums.len() - 1) as f64
2317                    };
2318                    LoraValue::Float((variance_sum / denom).sqrt())
2319                }
2320
2321                "percentilecont" => {
2322                    if args.len() < 2 {
2323                        return LoraValue::Null;
2324                    }
2325
2326                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2327                        .as_f64()
2328                        .unwrap_or(0.5);
2329                    let mut nums: Vec<f64> = rows
2330                        .iter()
2331                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2332                        .filter_map(as_f64_lossy)
2333                        .collect();
2334
2335                    if nums.is_empty() {
2336                        return LoraValue::Null;
2337                    }
2338
2339                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2340
2341                    let index = percentile * (nums.len() - 1) as f64;
2342                    let lower = index.floor() as usize;
2343                    let upper = index.ceil() as usize;
2344                    let fraction = index - lower as f64;
2345
2346                    if lower == upper || upper >= nums.len() {
2347                        LoraValue::Float(nums[lower])
2348                    } else {
2349                        LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2350                    }
2351                }
2352
2353                "percentiledisc" => {
2354                    if args.len() < 2 {
2355                        return LoraValue::Null;
2356                    }
2357
2358                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2359                        .as_f64()
2360                        .unwrap_or(0.5);
2361                    let mut nums: Vec<f64> = rows
2362                        .iter()
2363                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2364                        .filter_map(as_f64_lossy)
2365                        .collect();
2366
2367                    if nums.is_empty() {
2368                        return LoraValue::Null;
2369                    }
2370
2371                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2372
2373                    let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2374                    let index = index.min(nums.len() - 1);
2375                    LoraValue::Float(nums[index])
2376                }
2377
2378                _ => rows
2379                    .first()
2380                    .map(|r| eval_expr(expr, r, eval_ctx))
2381                    .unwrap_or(LoraValue::Null),
2382            }
2383        }
2384
2385        _ => rows
2386            .first()
2387            .map(|r| eval_expr(expr, r, eval_ctx))
2388            .unwrap_or(LoraValue::Null),
2389    }
2390}
2391
2392fn compare_sort_item<S: GraphStorage + ?Sized>(
2393    item: &ResolvedSortItem,
2394    a: &Row,
2395    b: &Row,
2396    eval_ctx: &EvalContext<'_, S>,
2397) -> Ordering {
2398    let av = eval_expr(&item.expr, a, eval_ctx);
2399    let bv = eval_expr(&item.expr, b, eval_ctx);
2400
2401    let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2402    compare_values_for_sort(&av, &bv, ascending)
2403}
2404
2405fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2406    let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2407    let mut out = Vec::new();
2408
2409    for value in values {
2410        let key = GroupValueKey::from_value(&value);
2411        if seen.insert(key) {
2412            out.push(value);
2413        }
2414    }
2415
2416    out
2417}
2418
2419fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2420    match v {
2421        LoraValue::Int(i) => Some(i as f64),
2422        LoraValue::Float(f) => Some(f),
2423        _ => None,
2424    }
2425}
2426
2427fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2428    let ord = match (a, b) {
2429        (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2430        (LoraValue::Null, _) => Ordering::Greater,
2431        (_, LoraValue::Null) => Ordering::Less,
2432        _ => compare_values_total(a, b),
2433    };
2434
2435    if ascending {
2436        ord
2437    } else {
2438        ord.reverse()
2439    }
2440}
2441
2442fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
2443    use LoraValue::*;
2444
2445    match (a, b) {
2446        (Bool(x), Bool(y)) => x.cmp(y),
2447        (Int(x), Int(y)) => x.cmp(y),
2448        (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
2449        (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
2450        (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
2451        (String(x), String(y)) => x.cmp(y),
2452        (Node(x), Node(y)) => x.cmp(y),
2453        (Relationship(x), Relationship(y)) => x.cmp(y),
2454        (Date(x), Date(y)) => x.cmp(y),
2455        (DateTime(x), DateTime(y)) => x.cmp(y),
2456        (Duration(x), Duration(y)) => x.cmp(y),
2457        (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
2458        _ => type_rank(a)
2459            .cmp(&type_rank(b))
2460            .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
2461    }
2462}
2463
2464pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
2465    match (expected, actual) {
2466        (LoraValue::Null, PropertyValue::Null) => true,
2467        (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
2468        (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
2469        (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
2470        (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
2471        (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
2472        (LoraValue::String(a), PropertyValue::String(b)) => a == b,
2473
2474        (LoraValue::List(xs), PropertyValue::List(ys)) => {
2475            xs.len() == ys.len()
2476                && xs
2477                    .iter()
2478                    .zip(ys.iter())
2479                    .all(|(x, y)| value_matches_property_value(x, y))
2480        }
2481
2482        (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
2483            ym.get(k)
2484                .map(|yv| value_matches_property_value(xv, yv))
2485                .unwrap_or(false)
2486        }),
2487
2488        (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
2489        (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
2490        (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
2491        (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
2492        (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
2493        (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
2494        (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
2495        (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
2496
2497        _ => false,
2498    }
2499}
2500
2501/// Build a LoraPath from the node and relationship variables currently in a row.
2502///
2503/// For variable-length relationships (stored as a List of Relationship values),
2504/// intermediate nodes are reconstructed from the storage by walking the
2505/// relationship chain.
2506fn build_path_value<S: GraphStorage + ?Sized>(
2507    row: &Row,
2508    node_vars: &[VarId],
2509    rel_vars: &[VarId],
2510    storage: &S,
2511) -> LoraValue {
2512    let mut raw_nodes = Vec::new();
2513    let mut rels = Vec::new();
2514    let mut has_var_len = false;
2515
2516    for &nv in node_vars {
2517        match row.get(nv) {
2518            Some(LoraValue::Node(id)) => raw_nodes.push(*id),
2519            Some(LoraValue::List(items)) => {
2520                for item in items {
2521                    if let LoraValue::Node(id) = item {
2522                        raw_nodes.push(*id);
2523                    }
2524                }
2525            }
2526            _ => {}
2527        }
2528    }
2529
2530    for &rv in rel_vars {
2531        match row.get(rv) {
2532            Some(LoraValue::Relationship(id)) => rels.push(*id),
2533            Some(LoraValue::List(items)) => {
2534                has_var_len = true;
2535                for item in items {
2536                    if let LoraValue::Relationship(id) = item {
2537                        rels.push(*id);
2538                    }
2539                }
2540            }
2541            _ => {}
2542        }
2543    }
2544
2545    // For variable-length paths, reconstruct the full node sequence from the
2546    // relationship chain. raw_nodes typically only has [start, end] but the
2547    // path needs all intermediate nodes as well.
2548    let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
2549        let start = raw_nodes[0];
2550        let mut ordered = Vec::with_capacity(rels.len() + 1);
2551        ordered.push(start);
2552        let mut current = start;
2553        for &rel_id in &rels {
2554            if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
2555                let next = if src == current { dst } else { src };
2556                ordered.push(next);
2557                current = next;
2558            }
2559        }
2560        ordered
2561    } else {
2562        raw_nodes
2563    };
2564
2565    LoraValue::Path(LoraPath { nodes, rels })
2566}
2567
2568fn type_rank(v: &LoraValue) -> u8 {
2569    match v {
2570        LoraValue::Null => 0,
2571        LoraValue::Bool(_) => 1,
2572        LoraValue::Int(_) | LoraValue::Float(_) => 2,
2573        LoraValue::String(_) => 3,
2574        LoraValue::Date(_) => 4,
2575        LoraValue::DateTime(_) => 5,
2576        LoraValue::LocalDateTime(_) => 6,
2577        LoraValue::Time(_) => 7,
2578        LoraValue::LocalTime(_) => 8,
2579        LoraValue::Duration(_) => 9,
2580        LoraValue::Point(_) => 10,
2581        LoraValue::Vector(_) => 11,
2582        LoraValue::List(_) => 12,
2583        LoraValue::Map(_) => 13,
2584        LoraValue::Node(_) => 14,
2585        LoraValue::Relationship(_) => 15,
2586        LoraValue::Path(_) => 16,
2587    }
2588}
2589
2590/// Check whether a node's labels satisfy all label groups.
2591/// Each group is a disjunction (OR): the node must have at least one label
2592/// from the group.  Groups are conjunctive (AND): all groups must be satisfied.
2593fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
2594    groups
2595        .iter()
2596        .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
2597}
2598
2599/// Scan the graph for candidate node IDs matching the label groups. Uses the
2600/// label index for the pick-first-label phase and avoids cloning NodeRecords.
2601fn scan_node_ids_for_label_groups<S: GraphStorage + ?Sized>(
2602    storage: &S,
2603    groups: &[Vec<String>],
2604) -> Vec<lora_store::NodeId> {
2605    if groups.len() == 1 && groups[0].len() == 1 {
2606        storage.node_ids_by_label(&groups[0][0])
2607    } else if groups.len() == 1 && groups[0].len() > 1 {
2608        let mut seen = std::collections::BTreeSet::new();
2609        let mut out = Vec::new();
2610        for label in &groups[0] {
2611            for id in storage.node_ids_by_label(label) {
2612                if seen.insert(id) {
2613                    out.push(id);
2614                }
2615            }
2616        }
2617        out
2618    } else {
2619        storage.node_ids_by_label(&groups[0][0])
2620    }
2621}
2622
2623fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
2624    let mut map = BTreeMap::new();
2625    map.insert("kind".to_string(), LoraValue::String("node".to_string()));
2626    map.insert("id".to_string(), LoraValue::Int(node.id as i64));
2627    map.insert(
2628        "labels".to_string(),
2629        LoraValue::List(
2630            node.labels
2631                .iter()
2632                .map(|s| LoraValue::String(s.clone()))
2633                .collect(),
2634        ),
2635    );
2636    map.insert(
2637        "properties".to_string(),
2638        properties_to_value_map(&node.properties),
2639    );
2640    LoraValue::Map(map)
2641}
2642
2643fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
2644    let mut map = BTreeMap::new();
2645    map.insert(
2646        "kind".to_string(),
2647        LoraValue::String("relationship".to_string()),
2648    );
2649    map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
2650    map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
2651    map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
2652    map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
2653    map.insert(
2654        "properties".to_string(),
2655        properties_to_value_map(&rel.properties),
2656    );
2657    LoraValue::Map(map)
2658}
2659
2660/// Flatten label groups into a simple Vec<String> (for CREATE/MERGE where
2661/// disjunction doesn't apply — all labels are created).
2662fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
2663    groups.iter().flat_map(|g| g.iter().cloned()).collect()
2664}
2665
2666#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2667enum GroupValueKey {
2668    Null,
2669    Bool(bool),
2670    Int(i64),
2671    Float(String),
2672    String(String),
2673    List(Vec<GroupValueKey>),
2674    Map(Vec<(String, GroupValueKey)>),
2675    Node(u64),
2676    Relationship(u64),
2677}
2678
2679impl GroupValueKey {
2680    fn from_value(v: &LoraValue) -> Self {
2681        match v {
2682            LoraValue::Null => Self::Null,
2683            LoraValue::Bool(x) => Self::Bool(*x),
2684            LoraValue::Int(x) => Self::Int(*x),
2685            LoraValue::Float(x) => Self::Float(x.to_string()),
2686            LoraValue::String(x) => Self::String(x.clone()),
2687            LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
2688            LoraValue::Map(m) => Self::Map(
2689                m.iter()
2690                    .map(|(k, v)| (k.clone(), Self::from_value(v)))
2691                    .collect(),
2692            ),
2693            LoraValue::Node(id) => Self::Node(*id),
2694            LoraValue::Relationship(id) => Self::Relationship(*id),
2695            LoraValue::Path(_) => Self::Null,
2696            // Temporal types: use their string representation as group key
2697            LoraValue::Date(d) => Self::String(d.to_string()),
2698            LoraValue::DateTime(dt) => Self::String(dt.to_string()),
2699            LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
2700            LoraValue::Time(t) => Self::String(t.to_string()),
2701            LoraValue::LocalTime(t) => Self::String(t.to_string()),
2702            LoraValue::Duration(dur) => Self::String(dur.to_string()),
2703            LoraValue::Point(p) => Self::String(p.to_string()),
2704            LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
2705        }
2706    }
2707}
2708
2709/// Compute effective (min_hops, max_hops) from a `RangeLiteral`.
2710///
2711/// Lora semantics:
2712/// - `*`       → 1..∞   (start=None, end=None)
2713/// - `*2..5`   → 2..5   (start=Some(2), end=Some(5))
2714/// - `*..3`    → 1..3   (start=None, end=Some(3))
2715/// - `*2..`    → 2..∞   (start=Some(2), end=None)
2716/// - `*3`      → 3..3   (start=Some(3), end=None, no dots → exactly 3)
2717/// - `*0..1`   → 0..1
2718///
2719/// For unbounded upper, we cap at `MAX_VAR_LEN_HOPS` to prevent runaway.
2720const MAX_VAR_LEN_HOPS: u64 = 100;
2721
2722fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
2723    let min_hops = range.start.unwrap_or(1);
2724    let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
2725    (min_hops, max_hops)
2726}
2727
2728/// An entry produced during BFS variable-length expansion.
2729struct VarLenResult {
2730    /// The destination node at the end of this path.
2731    dst_node_id: NodeId,
2732    /// The relationship IDs traversed (in order).
2733    rel_ids: Vec<u64>,
2734}
2735
2736/// Perform variable-length expansion from `start_node_id` following
2737/// relationships of the given `types` and `direction`, collecting all
2738/// reachable nodes at hop distances in `[min_hops, max_hops]`.
2739///
2740/// Uses BFS with relationship-uniqueness per path (each path does not
2741/// reuse the same relationship, but may revisit nodes).
2742fn variable_length_expand<S: GraphStorage + ?Sized>(
2743    storage: &S,
2744    start_node_id: NodeId,
2745    direction: Direction,
2746    types: &[String],
2747    min_hops: u64,
2748    max_hops: u64,
2749) -> Vec<VarLenResult> {
2750    let mut results = Vec::new();
2751
2752    // Each frontier entry: (current_node_id, relationships_used_so_far)
2753    let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
2754
2755    for depth in 1..=max_hops {
2756        // On the final hop we don't need to build next_frontier at all; every
2757        // path gets recorded and then the loop terminates. Avoids one full
2758        // pass of Vec clones on deep traversals.
2759        let is_last_hop = depth == max_hops;
2760        let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
2761
2762        for (current_node, rels_used) in &frontier {
2763            // ID-only expand avoids cloning full records/properties for every
2764            // neighbour on every hop.
2765            for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
2766                // Relationship-uniqueness: skip if this relationship was already
2767                // traversed on this particular path.
2768                if rels_used.contains(&rel_id) {
2769                    continue;
2770                }
2771
2772                if is_last_hop {
2773                    // Terminal hop: just record the result. Allocate rel_ids
2774                    // once (no duplicate clone) by extending a fresh copy.
2775                    if depth >= min_hops {
2776                        let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
2777                        rel_ids.extend_from_slice(rels_used);
2778                        rel_ids.push(rel_id);
2779                        results.push(VarLenResult {
2780                            dst_node_id: neighbor_id,
2781                            rel_ids,
2782                        });
2783                    }
2784                    continue;
2785                }
2786
2787                let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
2788                new_rels.extend_from_slice(rels_used);
2789                new_rels.push(rel_id);
2790
2791                if depth >= min_hops {
2792                    results.push(VarLenResult {
2793                        dst_node_id: neighbor_id,
2794                        rel_ids: new_rels.clone(),
2795                    });
2796                }
2797
2798                next_frontier.push((neighbor_id, new_rels));
2799            }
2800        }
2801
2802        if is_last_hop || next_frontier.is_empty() {
2803            break;
2804        }
2805
2806        frontier = next_frontier;
2807    }
2808
2809    // Handle min_hops == 0: include the start node itself at depth 0.
2810    if min_hops == 0 {
2811        results.insert(
2812            0,
2813            VarLenResult {
2814                dst_node_id: start_node_id,
2815                rel_ids: Vec::new(),
2816            },
2817        );
2818    }
2819
2820    results
2821}
2822
2823/// Filter rows to keep only shortest paths.
2824/// `all` = false → keep one shortest path; `all` = true → keep all shortest.
2825fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
2826    if rows.is_empty() {
2827        return rows;
2828    }
2829
2830    // Compute path length for each row
2831    let lengths: Vec<usize> = rows
2832        .iter()
2833        .map(|row| match row.get(path_var) {
2834            Some(LoraValue::Path(p)) => p.rels.len(),
2835            _ => usize::MAX,
2836        })
2837        .collect();
2838
2839    let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
2840
2841    let mut result: Vec<Row> = rows
2842        .into_iter()
2843        .zip(lengths.iter())
2844        .filter(|(_, len)| **len == min_len)
2845        .map(|(row, _)| row)
2846        .collect();
2847
2848    if !all && result.len() > 1 {
2849        result.truncate(1);
2850    }
2851
2852    result
2853}