Skip to main content

lora_executor/
executor.rs

1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7    symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8    ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use tracing::{debug, error, trace};
18
19pub struct ExecutionContext<'a, S: GraphStorage> {
20    pub storage: &'a S,
21    pub params: std::collections::BTreeMap<String, LoraValue>,
22}
23
24pub struct Executor<'a, S: GraphStorage> {
25    ctx: ExecutionContext<'a, S>,
26}
27
28impl<'a, S: GraphStorage> Executor<'a, S> {
29    pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
30        Self { ctx }
31    }
32
33    pub fn execute(
34        &self,
35        plan: &PhysicalPlan,
36        options: Option<ExecuteOptions>,
37    ) -> ExecResult<QueryResult> {
38        // Clear any error residue that a previous query on this thread may have
39        // left in the thread-local eval-error slot.
40        let _ = take_eval_error();
41
42        let rows = self.execute_node(plan, plan.root)?;
43        let rows = rows
44            .into_iter()
45            .map(|row| self.hydrate_row(row))
46            .collect::<Vec<_>>();
47        Ok(project_rows(rows, options.unwrap_or_default()))
48    }
49
50    fn hydrate_row(&self, row: Row) -> Row {
51        let mut out = Row::new();
52
53        for (var, name, value) in row.into_iter_named() {
54            out.insert_named(var, name, self.hydrate_value(value));
55        }
56
57        out
58    }
59
60    fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
61        trace!("read-only execute_node start: node_id={node_id:?}");
62
63        let result = match &plan.nodes[node_id] {
64            PhysicalOp::Argument(op) => self.exec_argument(op),
65            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
66            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
67            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
68            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
69            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
70            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
71            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
72            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
73            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
74            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
75            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
76            PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
77            PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
78            PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
79            PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
80            PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
81        };
82
83        match &result {
84            Ok(rows) => trace!(
85                "read-only execute_node ok: node_id={node_id:?}, rows={}",
86                rows.len()
87            ),
88            Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
89        }
90
91        result
92    }
93
94    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
95        Ok(vec![Row::new()])
96    }
97
98    fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
99        let base_rows = match op.input {
100            Some(input) => self.execute_node(plan, input)?,
101            None => vec![Row::new()],
102        };
103
104        let node_ids = self.ctx.storage.all_node_ids();
105        let mut out = Vec::new();
106
107        for row in base_rows {
108            if let Some(existing) = row.get(op.var) {
109                match existing {
110                    LoraValue::Node(existing_id) => {
111                        if self.ctx.storage.has_node(*existing_id) {
112                            out.push(row);
113                        }
114                    }
115                    other => {
116                        return Err(ExecutorError::ExpectedNodeForExpand {
117                            var: format!("{:?}", op.var),
118                            found: value_kind(other),
119                        });
120                    }
121                }
122                continue;
123            }
124
125            for &id in &node_ids {
126                let mut new_row = row.clone();
127                new_row.insert(op.var, LoraValue::Node(id));
128                out.push(new_row);
129            }
130        }
131
132        Ok(out)
133    }
134
135    fn exec_node_by_label_scan(
136        &self,
137        plan: &PhysicalPlan,
138        op: &NodeByLabelScanExec,
139    ) -> ExecResult<Vec<Row>> {
140        let base_rows = match op.input {
141            Some(input) => self.execute_node(plan, input)?,
142            None => vec![Row::new()],
143        };
144
145        let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
146        let mut out = Vec::new();
147
148        for row in base_rows {
149            if let Some(existing) = row.get(op.var) {
150                match existing {
151                    LoraValue::Node(existing_id) => {
152                        let labels_ok = self
153                            .ctx
154                            .storage
155                            .with_node(*existing_id, |n| {
156                                node_matches_label_groups(&n.labels, &op.labels)
157                            })
158                            .unwrap_or(false);
159                        if labels_ok {
160                            out.push(row);
161                        }
162                    }
163                    other => {
164                        return Err(ExecutorError::ExpectedNodeForExpand {
165                            var: format!("{:?}", op.var),
166                            found: value_kind(other),
167                        });
168                    }
169                }
170                continue;
171            }
172
173            for &id in &candidate_ids {
174                let labels_ok = self
175                    .ctx
176                    .storage
177                    .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
178                    .unwrap_or(false);
179                if !labels_ok {
180                    continue;
181                }
182                let mut new_row = row.clone();
183                new_row.insert(op.var, LoraValue::Node(id));
184                out.push(new_row);
185            }
186        }
187
188        Ok(out)
189    }
190
191    fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
192        // Variable-length expansion: delegate to iterative expander.
193        if let Some(range) = &op.range {
194            return self.exec_expand_var_len(plan, op, range);
195        }
196
197        let input_rows = self.execute_node(plan, op.input)?;
198        let mut out = Vec::new();
199
200        for row in input_rows {
201            let src_node_id = match row.get(op.src) {
202                Some(LoraValue::Node(id)) => *id,
203                Some(other) => {
204                    return Err(ExecutorError::ExpectedNodeForExpand {
205                        var: format!("{:?}", op.src),
206                        found: value_kind(other),
207                    });
208                }
209                None => continue,
210            };
211
212            for (rel_id, dst_id) in
213                self.ctx
214                    .storage
215                    .expand_ids(src_node_id, op.direction, &op.types)
216            {
217                if let Some(expr) = op.rel_properties.as_ref() {
218                    let actual_props = self
219                        .ctx
220                        .storage
221                        .with_relationship(rel_id, |rel| rel.properties.clone());
222                    let matches = match actual_props {
223                        Some(props) => {
224                            self.relationship_matches_properties(&props, Some(expr), &row)?
225                        }
226                        None => false,
227                    };
228                    if !matches {
229                        continue;
230                    }
231                }
232
233                if let Some(existing_dst) = row.get(op.dst) {
234                    match existing_dst {
235                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
236                        LoraValue::Node(_) => continue,
237                        other => {
238                            return Err(ExecutorError::ExpectedNodeForExpand {
239                                var: format!("{:?}", op.dst),
240                                found: value_kind(other),
241                            });
242                        }
243                    }
244                }
245
246                if let Some(rel_var) = op.rel {
247                    if let Some(existing_rel) = row.get(rel_var) {
248                        match existing_rel {
249                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
250                            LoraValue::Relationship(_) => continue,
251                            other => {
252                                return Err(ExecutorError::ExpectedRelationshipForExpand {
253                                    var: format!("{:?}", rel_var),
254                                    found: value_kind(other),
255                                });
256                            }
257                        }
258                    }
259                }
260
261                let mut new_row = row.clone();
262
263                if !new_row.contains_key(op.dst) {
264                    new_row.insert(op.dst, LoraValue::Node(dst_id));
265                }
266
267                if let Some(rel_var) = op.rel {
268                    if !new_row.contains_key(rel_var) {
269                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
270                    }
271                }
272
273                out.push(new_row);
274            }
275        }
276
277        Ok(out)
278    }
279
280    fn exec_expand_var_len(
281        &self,
282        plan: &PhysicalPlan,
283        op: &ExpandExec,
284        range: &RangeLiteral,
285    ) -> ExecResult<Vec<Row>> {
286        let input_rows = self.execute_node(plan, op.input)?;
287        let (min_hops, max_hops) = resolve_range(range);
288        let mut out = Vec::new();
289
290        for row in input_rows {
291            let src_node_id = match row.get(op.src) {
292                Some(LoraValue::Node(id)) => *id,
293                Some(other) => {
294                    return Err(ExecutorError::ExpectedNodeForExpand {
295                        var: format!("{:?}", op.src),
296                        found: value_kind(other),
297                    });
298                }
299                None => continue,
300            };
301
302            let expansions = variable_length_expand(
303                self.ctx.storage,
304                src_node_id,
305                op.direction,
306                &op.types,
307                min_hops,
308                max_hops,
309            );
310
311            for result in expansions {
312                let mut new_row = row.clone();
313                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
314
315                // For variable-length patterns, bind the relationship variable
316                // to a list of relationship IDs traversed.
317                if let Some(rel_var) = op.rel {
318                    // Consume rel_ids — it's owned and no longer needed after this.
319                    let rel_list = LoraValue::List(
320                        result
321                            .rel_ids
322                            .into_iter()
323                            .map(LoraValue::Relationship)
324                            .collect(),
325                    );
326                    new_row.insert(rel_var, rel_list);
327                }
328
329                out.push(new_row);
330            }
331        }
332
333        Ok(out)
334    }
335
336    fn relationship_matches_properties(
337        &self,
338        actual: &Properties,
339        expected_expr: Option<&ResolvedExpr>,
340        row: &Row,
341    ) -> ExecResult<bool> {
342        let Some(expr) = expected_expr else {
343            return Ok(true);
344        };
345
346        let eval_ctx = EvalContext {
347            storage: self.ctx.storage,
348            params: &self.ctx.params,
349        };
350
351        let expected = eval_expr(expr, row, &eval_ctx);
352
353        let LoraValue::Map(expected_map) = expected else {
354            return Err(ExecutorError::ExpectedPropertyMap {
355                found: value_kind(&expected),
356            });
357        };
358
359        Ok(expected_map.iter().all(|(key, expected_value)| {
360            actual
361                .get(key)
362                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
363                .unwrap_or(false)
364        }))
365    }
366
367    fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
368        let input_rows = self.execute_node(plan, op.input)?;
369        let eval_ctx = EvalContext {
370            storage: self.ctx.storage,
371            params: &self.ctx.params,
372        };
373
374        Ok(input_rows
375            .into_iter()
376            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
377            .collect())
378    }
379
380    fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
381        let input_rows = self.execute_node(plan, op.input)?;
382        let eval_ctx = EvalContext {
383            storage: self.ctx.storage,
384            params: &self.ctx.params,
385        };
386
387        let mut out = Vec::with_capacity(input_rows.len());
388
389        for row in input_rows {
390            // include_existing=true means we carry all upstream columns — move
391            // the row into the projection instead of cloning every value.
392            if op.include_existing {
393                let mut projected = row;
394                for item in &op.items {
395                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
396                    if let Some(err) = take_eval_error() {
397                        return Err(ExecutorError::RuntimeError(err));
398                    }
399                    projected.insert_named(item.output, item.name.clone(), value);
400                }
401                out.push(projected);
402            } else {
403                let mut projected = Row::new();
404                for item in &op.items {
405                    let value = eval_expr(&item.expr, &row, &eval_ctx);
406                    if let Some(err) = take_eval_error() {
407                        return Err(ExecutorError::RuntimeError(err));
408                    }
409                    projected.insert_named(item.output, item.name.clone(), value);
410                }
411                out.push(projected);
412            }
413        }
414
415        Ok(if op.distinct {
416            dedup_rows_by_vars(out)
417        } else {
418            out
419        })
420    }
421
422    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
423        match value {
424            LoraValue::Node(id) => self.hydrate_node(id),
425            LoraValue::Relationship(id) => self.hydrate_relationship(id),
426            LoraValue::List(values) => {
427                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
428            }
429            LoraValue::Map(map) => LoraValue::Map(
430                map.into_iter()
431                    .map(|(k, v)| (k, self.hydrate_value(v)))
432                    .collect(),
433            ),
434            other => other,
435        }
436    }
437
438    fn hydrate_node(&self, id: u64) -> LoraValue {
439        self.ctx
440            .storage
441            .with_node(id, hydrate_node_record)
442            .unwrap_or(LoraValue::Null)
443    }
444
445    fn hydrate_relationship(&self, id: u64) -> LoraValue {
446        self.ctx
447            .storage
448            .with_relationship(id, hydrate_relationship_record)
449            .unwrap_or(LoraValue::Null)
450    }
451
452    fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
453        let input_rows = self.execute_node(plan, op.input)?;
454        let eval_ctx = EvalContext {
455            storage: self.ctx.storage,
456            params: &self.ctx.params,
457        };
458
459        let mut out = Vec::new();
460
461        for row in input_rows {
462            match eval_expr(&op.expr, &row, &eval_ctx) {
463                LoraValue::List(values) => {
464                    for value in values {
465                        let mut new_row = row.clone();
466                        new_row.insert(op.alias, value);
467                        out.push(new_row);
468                    }
469                }
470                LoraValue::Null => {}
471                other => {
472                    let mut new_row = row;
473                    new_row.insert(op.alias, other);
474                    out.push(new_row);
475                }
476            }
477        }
478
479        Ok(out)
480    }
481
482    fn exec_hash_aggregation(
483        &self,
484        plan: &PhysicalPlan,
485        op: &HashAggregationExec,
486    ) -> ExecResult<Vec<Row>> {
487        let input_rows = self.execute_node(plan, op.input)?;
488        let eval_ctx = EvalContext {
489            storage: self.ctx.storage,
490            params: &self.ctx.params,
491        };
492
493        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
494
495        if op.group_by.is_empty() {
496            groups.insert(Vec::new(), input_rows);
497        } else {
498            for row in input_rows {
499                let key = op
500                    .group_by
501                    .iter()
502                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
503                    .collect::<Vec<_>>();
504
505                groups.entry(key).or_default().push(row);
506            }
507        }
508
509        let mut out = Vec::new();
510
511        for rows in groups.into_values() {
512            let mut result = Row::new();
513
514            if let Some(first) = rows.first() {
515                for proj in &op.group_by {
516                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
517                    result.insert_named(proj.output, proj.name.clone(), value);
518                }
519            }
520
521            for proj in &op.aggregates {
522                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
523                result.insert_named(proj.output, proj.name.clone(), value);
524            }
525
526            out.push(result);
527        }
528
529        Ok(out)
530    }
531
532    fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
533        let mut rows = self.execute_node(plan, op.input)?;
534        let eval_ctx = EvalContext {
535            storage: self.ctx.storage,
536            params: &self.ctx.params,
537        };
538
539        rows.sort_by(|a, b| {
540            for item in &op.items {
541                let ord = compare_sort_item(item, a, b, &eval_ctx);
542                if ord != Ordering::Equal {
543                    return ord;
544                }
545            }
546            Ordering::Equal
547        });
548
549        Ok(rows)
550    }
551
552    fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
553        let mut rows = self.execute_node(plan, op.input)?;
554        let eval_ctx = EvalContext {
555            storage: self.ctx.storage,
556            params: &self.ctx.params,
557        };
558
559        let limit = op
560            .limit
561            .as_ref()
562            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
563            .unwrap_or(rows.len() as i64)
564            .max(0) as usize;
565
566        let skip = op
567            .skip
568            .as_ref()
569            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
570            .unwrap_or(0)
571            .max(0) as usize;
572
573        if skip >= rows.len() {
574            return Ok(Vec::new());
575        }
576
577        rows.drain(0..skip);
578        rows.truncate(limit);
579        Ok(rows)
580    }
581
582    fn exec_optional_match(
583        &self,
584        plan: &PhysicalPlan,
585        op: &OptionalMatchExec,
586    ) -> ExecResult<Vec<Row>> {
587        let input_rows = self.execute_node(plan, op.input)?;
588
589        // The inner plan is built to start from Argument (an empty row) and is
590        // read-only, so its output does not depend on the upstream input. Execute
591        // it once and reuse the result across every input row, instead of
592        // producing |input_rows| × |inner_rows| allocations.
593        let inner_rows = self.execute_node(plan, op.inner)?;
594
595        let mut out = Vec::new();
596
597        for input_row in input_rows {
598            let mut matched = false;
599
600            for inner_row in &inner_rows {
601                // Each variable already bound in input_row must match.
602                let compatible = input_row
603                    .iter()
604                    .all(|(var, val)| match inner_row.get(*var) {
605                        Some(inner_val) => inner_val == val,
606                        None => true,
607                    });
608                if !compatible {
609                    continue;
610                }
611
612                let mut merged = input_row.clone();
613                for (var, name, val) in inner_row.iter_named() {
614                    if !merged.contains_key(*var) {
615                        merged.insert_named(*var, name.into_owned(), val.clone());
616                    }
617                }
618                out.push(merged);
619                matched = true;
620            }
621
622            if !matched {
623                let mut null_row = input_row;
624                for &var_id in &op.new_vars {
625                    if !null_row.contains_key(var_id) {
626                        null_row.insert(var_id, LoraValue::Null);
627                    }
628                }
629                out.push(null_row);
630            }
631        }
632
633        Ok(out)
634    }
635
636    fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
637        let input_rows = self.execute_node(plan, op.input)?;
638        let mut rows: Vec<Row> = input_rows
639            .into_iter()
640            .map(|mut row| {
641                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
642                row.insert(op.output, path);
643                row
644            })
645            .collect();
646
647        if let Some(all) = op.shortest_path_all {
648            rows = filter_shortest_paths(rows, op.output, all);
649        }
650        Ok(rows)
651    }
652}
653
654fn properties_to_value_map(props: &Properties) -> LoraValue {
655    let mut map = BTreeMap::new();
656    for (k, v) in props.iter() {
657        map.insert(k.clone(), LoraValue::from(v));
658    }
659    LoraValue::Map(map)
660}
661
662pub struct MutableExecutionContext<'a, S: GraphStorageMut> {
663    pub storage: &'a mut S,
664    pub params: std::collections::BTreeMap<String, LoraValue>,
665}
666
667pub struct MutableExecutor<'a, S: GraphStorageMut> {
668    ctx: MutableExecutionContext<'a, S>,
669}
670
671impl<'a, S: GraphStorageMut> MutableExecutor<'a, S> {
672    pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
673        Self { ctx }
674    }
675
676    pub fn execute(
677        &mut self,
678        plan: &PhysicalPlan,
679        options: Option<ExecuteOptions>,
680    ) -> ExecResult<QueryResult> {
681        // Clear any error residue that a previous query on this thread may have
682        // left in the thread-local eval-error slot.
683        let _ = take_eval_error();
684
685        let rows = self.execute_node(plan, plan.root)?;
686        let rows = rows
687            .into_iter()
688            .map(|row| self.hydrate_row(row))
689            .collect::<Vec<_>>();
690        Ok(project_rows(rows, options.unwrap_or_default()))
691    }
692
693    /// Execute a compiled query that may include UNION branches.
694    pub fn execute_compiled(
695        &mut self,
696        compiled: &CompiledQuery,
697        options: Option<ExecuteOptions>,
698    ) -> ExecResult<QueryResult> {
699        if compiled.unions.is_empty() {
700            return self.execute(&compiled.physical, options);
701        }
702
703        let _ = take_eval_error();
704
705        // Execute the head branch.
706        let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
707
708        // Execute each UNION branch and combine.
709        // Track whether any branch uses plain UNION (dedup needed).
710        let mut needs_dedup = false;
711
712        for branch in &compiled.unions {
713            let branch_rows = self.execute_and_hydrate(&branch.physical)?;
714            all_rows.extend(branch_rows);
715
716            if !branch.all {
717                needs_dedup = true;
718            }
719        }
720
721        if needs_dedup {
722            all_rows = dedup_rows(all_rows);
723        }
724
725        Ok(project_rows(all_rows, options.unwrap_or_default()))
726    }
727
728    fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
729        let rows = self.execute_node(plan, plan.root)?;
730        Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
731    }
732
733    fn hydrate_row(&self, row: Row) -> Row {
734        let mut out = Row::new();
735
736        for (var, name, value) in row.into_iter_named() {
737            out.insert_named(var, name, self.hydrate_value(value));
738        }
739
740        out
741    }
742
743    fn execute_node(
744        &mut self,
745        plan: &PhysicalPlan,
746        node_id: PhysicalNodeId,
747    ) -> ExecResult<Vec<Row>> {
748        trace!("mutable execute_node start: node_id={node_id:?}");
749
750        let result = match &plan.nodes[node_id] {
751            PhysicalOp::Argument(op) => self.exec_argument(op),
752            PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
753            PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
754            PhysicalOp::Expand(op) => self.exec_expand(plan, op),
755            PhysicalOp::Filter(op) => self.exec_filter(plan, op),
756            PhysicalOp::Projection(op) => self.exec_projection(plan, op),
757            PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
758            PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
759            PhysicalOp::Sort(op) => self.exec_sort(plan, op),
760            PhysicalOp::Limit(op) => self.exec_limit(plan, op),
761            PhysicalOp::Create(op) => self.exec_create(plan, op),
762            PhysicalOp::Merge(op) => self.exec_merge(plan, op),
763            PhysicalOp::Delete(op) => self.exec_delete(plan, op),
764            PhysicalOp::Set(op) => self.exec_set(plan, op),
765            PhysicalOp::Remove(op) => self.exec_remove(plan, op),
766            PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
767            PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
768        };
769
770        match &result {
771            Ok(rows) => trace!(
772                "mutable execute_node ok: node_id={node_id:?}, rows={}",
773                rows.len()
774            ),
775            Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
776        }
777
778        result
779    }
780
781    fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
782        Ok(vec![Row::new()])
783    }
784
785    fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
786        let base_rows = match op.input {
787            Some(input) => self.execute_node(plan, input)?,
788            None => vec![Row::new()],
789        };
790
791        let node_ids = self.ctx.storage.all_node_ids();
792        let mut out = Vec::new();
793
794        for row in base_rows {
795            if let Some(existing) = row.get(op.var) {
796                match existing {
797                    LoraValue::Node(existing_id) => {
798                        if self.ctx.storage.has_node(*existing_id) {
799                            out.push(row);
800                        }
801                    }
802                    other => {
803                        return Err(ExecutorError::ExpectedNodeForExpand {
804                            var: format!("{:?}", op.var),
805                            found: value_kind(other),
806                        });
807                    }
808                }
809                continue;
810            }
811
812            for &id in &node_ids {
813                let mut new_row = row.clone();
814                new_row.insert(op.var, LoraValue::Node(id));
815                out.push(new_row);
816            }
817        }
818
819        Ok(out)
820    }
821
822    fn exec_node_by_label_scan(
823        &mut self,
824        plan: &PhysicalPlan,
825        op: &NodeByLabelScanExec,
826    ) -> ExecResult<Vec<Row>> {
827        let base_rows = match op.input {
828            Some(input) => self.execute_node(plan, input)?,
829            None => vec![Row::new()],
830        };
831
832        let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
833        let mut out = Vec::new();
834
835        for row in base_rows {
836            if let Some(existing) = row.get(op.var) {
837                match existing {
838                    LoraValue::Node(existing_id) => {
839                        let labels_ok = self
840                            .ctx
841                            .storage
842                            .with_node(*existing_id, |n| {
843                                node_matches_label_groups(&n.labels, &op.labels)
844                            })
845                            .unwrap_or(false);
846                        if labels_ok {
847                            out.push(row);
848                        }
849                    }
850                    other => {
851                        return Err(ExecutorError::ExpectedNodeForExpand {
852                            var: format!("{:?}", op.var),
853                            found: value_kind(other),
854                        });
855                    }
856                }
857                continue;
858            }
859
860            for &id in &candidate_ids {
861                let labels_ok = self
862                    .ctx
863                    .storage
864                    .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
865                    .unwrap_or(false);
866                if !labels_ok {
867                    continue;
868                }
869                let mut new_row = row.clone();
870                new_row.insert(op.var, LoraValue::Node(id));
871                out.push(new_row);
872            }
873        }
874
875        Ok(out)
876    }
877
878    fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
879        // Variable-length expansion: delegate to iterative expander.
880        if let Some(range) = &op.range {
881            return self.exec_expand_var_len(plan, op, range);
882        }
883
884        let input_rows = self.execute_node(plan, op.input)?;
885        let mut out = Vec::new();
886
887        for row in input_rows {
888            let src_node_id = match row.get(op.src) {
889                Some(LoraValue::Node(id)) => *id,
890                Some(other) => {
891                    return Err(ExecutorError::ExpectedNodeForExpand {
892                        var: format!("{:?}", op.src),
893                        found: value_kind(other),
894                    });
895                }
896                None => continue,
897            };
898
899            for (rel_id, dst_id) in
900                self.ctx
901                    .storage
902                    .expand_ids(src_node_id, op.direction, &op.types)
903            {
904                if let Some(expr) = op.rel_properties.as_ref() {
905                    let actual_props = self
906                        .ctx
907                        .storage
908                        .with_relationship(rel_id, |rel| rel.properties.clone());
909                    let matches = match actual_props {
910                        Some(props) => {
911                            self.relationship_matches_properties(&props, Some(expr), &row)?
912                        }
913                        None => false,
914                    };
915                    if !matches {
916                        continue;
917                    }
918                }
919
920                if let Some(existing_dst) = row.get(op.dst) {
921                    match existing_dst {
922                        LoraValue::Node(existing_id) if *existing_id == dst_id => {}
923                        LoraValue::Node(_) => continue,
924                        other => {
925                            return Err(ExecutorError::ExpectedNodeForExpand {
926                                var: format!("{:?}", op.dst),
927                                found: value_kind(other),
928                            });
929                        }
930                    }
931                }
932
933                if let Some(rel_var) = op.rel {
934                    if let Some(existing_rel) = row.get(rel_var) {
935                        match existing_rel {
936                            LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
937                            LoraValue::Relationship(_) => continue,
938                            other => {
939                                return Err(ExecutorError::ExpectedRelationshipForExpand {
940                                    var: format!("{:?}", rel_var),
941                                    found: value_kind(other),
942                                });
943                            }
944                        }
945                    }
946                }
947
948                let mut new_row = row.clone();
949
950                if !new_row.contains_key(op.dst) {
951                    new_row.insert(op.dst, LoraValue::Node(dst_id));
952                }
953
954                if let Some(rel_var) = op.rel {
955                    if !new_row.contains_key(rel_var) {
956                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
957                    }
958                }
959
960                out.push(new_row);
961            }
962        }
963
964        Ok(out)
965    }
966
967    fn exec_expand_var_len(
968        &mut self,
969        plan: &PhysicalPlan,
970        op: &ExpandExec,
971        range: &RangeLiteral,
972    ) -> ExecResult<Vec<Row>> {
973        let input_rows = self.execute_node(plan, op.input)?;
974        let (min_hops, max_hops) = resolve_range(range);
975        let mut out = Vec::new();
976
977        for row in input_rows {
978            let src_node_id = match row.get(op.src) {
979                Some(LoraValue::Node(id)) => *id,
980                Some(other) => {
981                    return Err(ExecutorError::ExpectedNodeForExpand {
982                        var: format!("{:?}", op.src),
983                        found: value_kind(other),
984                    });
985                }
986                None => continue,
987            };
988
989            let expansions = variable_length_expand(
990                &*self.ctx.storage,
991                src_node_id,
992                op.direction,
993                &op.types,
994                min_hops,
995                max_hops,
996            );
997
998            for result in expansions {
999                let mut new_row = row.clone();
1000                new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
1001
1002                if let Some(rel_var) = op.rel {
1003                    // Consume rel_ids — it's owned and no longer needed after this.
1004                    let rel_list = LoraValue::List(
1005                        result
1006                            .rel_ids
1007                            .into_iter()
1008                            .map(LoraValue::Relationship)
1009                            .collect(),
1010                    );
1011                    new_row.insert(rel_var, rel_list);
1012                }
1013
1014                out.push(new_row);
1015            }
1016        }
1017
1018        Ok(out)
1019    }
1020
1021    fn relationship_matches_properties(
1022        &self,
1023        actual: &Properties,
1024        expected_expr: Option<&ResolvedExpr>,
1025        row: &Row,
1026    ) -> ExecResult<bool> {
1027        let Some(expr) = expected_expr else {
1028            return Ok(true);
1029        };
1030
1031        let eval_ctx = EvalContext {
1032            storage: &*self.ctx.storage,
1033            params: &self.ctx.params,
1034        };
1035
1036        let expected = eval_expr(expr, row, &eval_ctx);
1037
1038        let LoraValue::Map(expected_map) = expected else {
1039            return Err(ExecutorError::ExpectedPropertyMap {
1040                found: value_kind(&expected),
1041            });
1042        };
1043
1044        Ok(expected_map.iter().all(|(key, expected_value)| {
1045            actual
1046                .get(key)
1047                .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1048                .unwrap_or(false)
1049        }))
1050    }
1051
1052    fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1053        let input_rows = self.execute_node(plan, op.input)?;
1054        let eval_ctx = EvalContext {
1055            storage: &*self.ctx.storage,
1056            params: &self.ctx.params,
1057        };
1058
1059        Ok(input_rows
1060            .into_iter()
1061            .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1062            .collect())
1063    }
1064
1065    fn exec_projection(
1066        &mut self,
1067        plan: &PhysicalPlan,
1068        op: &ProjectionExec,
1069    ) -> ExecResult<Vec<Row>> {
1070        let input_rows = self.execute_node(plan, op.input)?;
1071        let eval_ctx = EvalContext {
1072            storage: &*self.ctx.storage,
1073            params: &self.ctx.params,
1074        };
1075
1076        let mut out = Vec::with_capacity(input_rows.len());
1077
1078        for row in input_rows {
1079            if op.include_existing {
1080                let mut projected = row;
1081                for item in &op.items {
1082                    let value = eval_expr(&item.expr, &projected, &eval_ctx);
1083                    if let Some(err) = take_eval_error() {
1084                        return Err(ExecutorError::RuntimeError(err));
1085                    }
1086                    projected.insert_named(item.output, item.name.clone(), value);
1087                }
1088                out.push(projected);
1089            } else {
1090                let mut projected = Row::new();
1091                for item in &op.items {
1092                    let value = eval_expr(&item.expr, &row, &eval_ctx);
1093                    if let Some(err) = take_eval_error() {
1094                        return Err(ExecutorError::RuntimeError(err));
1095                    }
1096                    projected.insert_named(item.output, item.name.clone(), value);
1097                }
1098                out.push(projected);
1099            }
1100        }
1101
1102        Ok(if op.distinct {
1103            dedup_rows_by_vars(out)
1104        } else {
1105            out
1106        })
1107    }
1108
1109    fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1110        match value {
1111            LoraValue::Node(id) => self.hydrate_node(id),
1112            LoraValue::Relationship(id) => self.hydrate_relationship(id),
1113            LoraValue::List(values) => {
1114                LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1115            }
1116            LoraValue::Map(map) => LoraValue::Map(
1117                map.into_iter()
1118                    .map(|(k, v)| (k, self.hydrate_value(v)))
1119                    .collect(),
1120            ),
1121            other => other,
1122        }
1123    }
1124
1125    fn hydrate_node(&self, id: u64) -> LoraValue {
1126        self.ctx
1127            .storage
1128            .with_node(id, hydrate_node_record)
1129            .unwrap_or(LoraValue::Null)
1130    }
1131
1132    fn hydrate_relationship(&self, id: u64) -> LoraValue {
1133        self.ctx
1134            .storage
1135            .with_relationship(id, hydrate_relationship_record)
1136            .unwrap_or(LoraValue::Null)
1137    }
1138
1139    fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1140        let input_rows = self.execute_node(plan, op.input)?;
1141        let eval_ctx = EvalContext {
1142            storage: &*self.ctx.storage,
1143            params: &self.ctx.params,
1144        };
1145
1146        let mut out = Vec::new();
1147
1148        for row in input_rows {
1149            match eval_expr(&op.expr, &row, &eval_ctx) {
1150                LoraValue::List(values) => {
1151                    for value in values {
1152                        let mut new_row = row.clone();
1153                        new_row.insert(op.alias, value);
1154                        out.push(new_row);
1155                    }
1156                }
1157                LoraValue::Null => {}
1158                other => {
1159                    let mut new_row = row;
1160                    new_row.insert(op.alias, other);
1161                    out.push(new_row);
1162                }
1163            }
1164        }
1165
1166        Ok(out)
1167    }
1168
1169    fn exec_hash_aggregation(
1170        &mut self,
1171        plan: &PhysicalPlan,
1172        op: &HashAggregationExec,
1173    ) -> ExecResult<Vec<Row>> {
1174        let input_rows = self.execute_node(plan, op.input)?;
1175        let eval_ctx = EvalContext {
1176            storage: &*self.ctx.storage,
1177            params: &self.ctx.params,
1178        };
1179
1180        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1181
1182        if op.group_by.is_empty() {
1183            groups.insert(Vec::new(), input_rows);
1184        } else {
1185            for row in input_rows {
1186                let key = op
1187                    .group_by
1188                    .iter()
1189                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1190                    .collect::<Vec<_>>();
1191
1192                groups.entry(key).or_default().push(row);
1193            }
1194        }
1195
1196        let mut out = Vec::new();
1197
1198        for rows in groups.into_values() {
1199            let mut result = Row::new();
1200
1201            if let Some(first) = rows.first() {
1202                for proj in &op.group_by {
1203                    let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1204                    result.insert_named(proj.output, proj.name.clone(), value);
1205                }
1206            }
1207
1208            for proj in &op.aggregates {
1209                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1210                result.insert_named(proj.output, proj.name.clone(), value);
1211            }
1212
1213            out.push(result);
1214        }
1215
1216        Ok(out)
1217    }
1218
1219    fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1220        let mut rows = self.execute_node(plan, op.input)?;
1221        let eval_ctx = EvalContext {
1222            storage: &*self.ctx.storage,
1223            params: &self.ctx.params,
1224        };
1225
1226        rows.sort_by(|a, b| {
1227            for item in &op.items {
1228                let ord = compare_sort_item(item, a, b, &eval_ctx);
1229                if ord != Ordering::Equal {
1230                    return ord;
1231                }
1232            }
1233            Ordering::Equal
1234        });
1235
1236        Ok(rows)
1237    }
1238
1239    fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1240        let mut rows = self.execute_node(plan, op.input)?;
1241        let eval_ctx = EvalContext {
1242            storage: &*self.ctx.storage,
1243            params: &self.ctx.params,
1244        };
1245
1246        let limit = op
1247            .limit
1248            .as_ref()
1249            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1250            .unwrap_or(rows.len() as i64)
1251            .max(0) as usize;
1252
1253        let skip = op
1254            .skip
1255            .as_ref()
1256            .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1257            .unwrap_or(0)
1258            .max(0) as usize;
1259
1260        if skip >= rows.len() {
1261            return Ok(Vec::new());
1262        }
1263
1264        rows.drain(0..skip);
1265        rows.truncate(limit);
1266        Ok(rows)
1267    }
1268
1269    fn exec_optional_match(
1270        &mut self,
1271        plan: &PhysicalPlan,
1272        op: &OptionalMatchExec,
1273    ) -> ExecResult<Vec<Row>> {
1274        let input_rows = self.execute_node(plan, op.input)?;
1275
1276        // Inner plan is read-only and input-independent; execute once and reuse.
1277        let inner_rows = self.execute_node(plan, op.inner)?;
1278
1279        let mut out = Vec::new();
1280
1281        for input_row in input_rows {
1282            let mut matched = false;
1283
1284            for inner_row in &inner_rows {
1285                let compatible = input_row
1286                    .iter()
1287                    .all(|(var, val)| match inner_row.get(*var) {
1288                        Some(inner_val) => inner_val == val,
1289                        None => true,
1290                    });
1291                if !compatible {
1292                    continue;
1293                }
1294
1295                let mut merged = input_row.clone();
1296                for (var, name, val) in inner_row.iter_named() {
1297                    if !merged.contains_key(*var) {
1298                        merged.insert_named(*var, name.into_owned(), val.clone());
1299                    }
1300                }
1301                out.push(merged);
1302                matched = true;
1303            }
1304
1305            if !matched {
1306                let mut null_row = input_row;
1307                for &var_id in &op.new_vars {
1308                    if !null_row.contains_key(var_id) {
1309                        null_row.insert(var_id, LoraValue::Null);
1310                    }
1311                }
1312                out.push(null_row);
1313            }
1314        }
1315
1316        Ok(out)
1317    }
1318
1319    fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1320        let input_rows = self.execute_node(plan, op.input)?;
1321        let mut rows: Vec<Row> = input_rows
1322            .into_iter()
1323            .map(|mut row| {
1324                let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1325                row.insert(op.output, path);
1326                row
1327            })
1328            .collect();
1329
1330        if let Some(all) = op.shortest_path_all {
1331            rows = filter_shortest_paths(rows, op.output, all);
1332        }
1333        Ok(rows)
1334    }
1335
1336    fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1337        let input_rows = self.execute_node(plan, op.input)?;
1338        let mut out = Vec::with_capacity(input_rows.len());
1339
1340        for mut row in input_rows {
1341            self.apply_create_pattern(&mut row, &op.pattern)?;
1342            out.push(row);
1343        }
1344
1345        Ok(out)
1346    }
1347
1348    fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1349        match item {
1350            ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1351                Some(LoraValue::Node(node_id)) => {
1352                    let node_id = *node_id;
1353                    for label in labels {
1354                        self.ctx.storage.remove_node_label(node_id, label);
1355                    }
1356                    Ok(())
1357                }
1358                Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1359                    found: value_kind(other),
1360                }),
1361                None => Err(ExecutorError::UnboundVariableForRemove {
1362                    var: format!("{variable:?}"),
1363                }),
1364            },
1365
1366            ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1367        }
1368    }
1369
1370    fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1371        match value {
1372            LoraValue::Null => Ok(()),
1373
1374            LoraValue::Node(node_id) => {
1375                if detach {
1376                    self.ctx.storage.detach_delete_node(node_id);
1377                    Ok(())
1378                } else {
1379                    let ok = self.ctx.storage.delete_node(node_id);
1380                    if ok {
1381                        Ok(())
1382                    } else {
1383                        Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1384                    }
1385                }
1386            }
1387
1388            LoraValue::Relationship(rel_id) => {
1389                let ok = self.ctx.storage.delete_relationship(rel_id);
1390                if ok {
1391                    Ok(())
1392                } else {
1393                    Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1394                }
1395            }
1396
1397            LoraValue::List(values) => {
1398                for v in values {
1399                    self.delete_value(v, detach)?;
1400                }
1401                Ok(())
1402            }
1403
1404            other => Err(ExecutorError::InvalidDeleteTarget {
1405                found: value_kind(&other),
1406            }),
1407        }
1408    }
1409
1410    fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1411        let input_rows = self.execute_node(plan, op.input)?;
1412        let mut out = Vec::with_capacity(input_rows.len());
1413
1414        for mut row in input_rows {
1415            // First check if the pattern variable is already bound in the row.
1416            let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1417
1418            let matched = if already_bound {
1419                true
1420            } else {
1421                // Try to find an existing match in the graph.
1422                self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1423            };
1424
1425            if !matched {
1426                self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1427            }
1428
1429            for action in &op.actions {
1430                if action.on_match == matched {
1431                    for item in &action.set.items {
1432                        self.apply_set_item(&row, item)?;
1433                    }
1434                }
1435            }
1436
1437            out.push(row);
1438        }
1439
1440        Ok(out)
1441    }
1442
1443    /// Try to find an existing node/pattern in the graph matching the MERGE
1444    /// pattern. If found, bind the variable in the row and return true.
1445    fn try_match_merge_pattern(
1446        &self,
1447        row: &mut Row,
1448        part: &ResolvedPatternPart,
1449    ) -> ExecResult<bool> {
1450        match &part.element {
1451            ResolvedPatternElement::Node {
1452                var,
1453                labels,
1454                properties,
1455            } => {
1456                // ID-only candidate discovery; borrow the record during
1457                // label/property filtering to avoid cloning non-matches.
1458                let candidate_ids = if labels.is_empty() {
1459                    self.ctx.storage.all_node_ids()
1460                } else {
1461                    scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1462                };
1463
1464                // Filter by properties if specified
1465                let eval_ctx = EvalContext {
1466                    storage: &*self.ctx.storage,
1467                    params: &self.ctx.params,
1468                };
1469                let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1470
1471                for id in candidate_ids {
1472                    let matched = self
1473                        .ctx
1474                        .storage
1475                        .with_node(id, |node| {
1476                            if !node_matches_label_groups(&node.labels, labels) {
1477                                return false;
1478                            }
1479                            if let Some(LoraValue::Map(expected)) = &expected_props {
1480                                let all_match = expected.iter().all(|(key, expected_value)| {
1481                                    node.properties
1482                                        .get(key)
1483                                        .map(|actual| {
1484                                            value_matches_property_value(expected_value, actual)
1485                                        })
1486                                        .unwrap_or(false)
1487                                });
1488                                if !all_match {
1489                                    return false;
1490                                }
1491                            }
1492                            true
1493                        })
1494                        .unwrap_or(false);
1495
1496                    if !matched {
1497                        continue;
1498                    }
1499
1500                    // Found a match — bind the variable
1501                    if let Some(var_id) = var {
1502                        row.insert(*var_id, LoraValue::Node(id));
1503                    }
1504                    return Ok(true);
1505                }
1506
1507                Ok(false)
1508            }
1509
1510            ResolvedPatternElement::ShortestPath { .. } => {
1511                // ShortestPath is not valid in MERGE context
1512                Ok(false)
1513            }
1514
1515            ResolvedPatternElement::NodeChain { head, chain } => {
1516                // Resolve the head node — it should be already bound in the row.
1517                let head_node_id = if let Some(var_id) = head.var {
1518                    if let Some(LoraValue::Node(id)) = row.get(var_id) {
1519                        *id
1520                    } else {
1521                        // Try to match head node as a standalone node pattern.
1522                        let node_matched = self.try_match_merge_pattern(
1523                            row,
1524                            &ResolvedPatternPart {
1525                                binding: None,
1526                                element: ResolvedPatternElement::Node {
1527                                    var: head.var,
1528                                    labels: head.labels.clone(),
1529                                    properties: head.properties.clone(),
1530                                },
1531                            },
1532                        )?;
1533                        if !node_matched {
1534                            return Ok(false);
1535                        }
1536                        match row.get(var_id) {
1537                            Some(LoraValue::Node(id)) => *id,
1538                            _ => return Ok(false),
1539                        }
1540                    }
1541                } else {
1542                    return Ok(false);
1543                };
1544
1545                let mut current_node_id = head_node_id;
1546
1547                for step in chain {
1548                    let eval_ctx = EvalContext {
1549                        storage: &*self.ctx.storage,
1550                        params: &self.ctx.params,
1551                    };
1552
1553                    let _ = step.rel.types.first();
1554                    let direction = step.rel.direction;
1555
1556                    // ID-only traversal; look up records by reference only for
1557                    // candidates that pass the label/property filters.
1558                    let edges =
1559                        self.ctx
1560                            .storage
1561                            .expand_ids(current_node_id, direction, &step.rel.types);
1562
1563                    // Try to find a matching edge + target node
1564                    let mut found = false;
1565                    for (rel_id, node_id) in edges {
1566                        // Check target node labels and (optional) properties.
1567                        let node_ok = self
1568                            .ctx
1569                            .storage
1570                            .with_node(node_id, |node_rec| {
1571                                if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
1572                                    return false;
1573                                }
1574                                if let Some(props_expr) = &step.node.properties {
1575                                    let expected = eval_expr(props_expr, row, &eval_ctx);
1576                                    if let LoraValue::Map(expected_map) = &expected {
1577                                        let all_match =
1578                                            expected_map.iter().all(|(key, expected_val)| {
1579                                                node_rec
1580                                                    .properties
1581                                                    .get(key)
1582                                                    .map(|actual| {
1583                                                        value_matches_property_value(
1584                                                            expected_val,
1585                                                            actual,
1586                                                        )
1587                                                    })
1588                                                    .unwrap_or(false)
1589                                            });
1590                                        if !all_match {
1591                                            return false;
1592                                        }
1593                                    }
1594                                }
1595                                true
1596                            })
1597                            .unwrap_or(false);
1598                        if !node_ok {
1599                            continue;
1600                        }
1601
1602                        // Check relationship properties.
1603                        let rel_ok = self
1604                            .ctx
1605                            .storage
1606                            .with_relationship(rel_id, |rel_rec| {
1607                                if let Some(rel_props_expr) = &step.rel.properties {
1608                                    let expected = eval_expr(rel_props_expr, row, &eval_ctx);
1609                                    if let LoraValue::Map(expected_map) = &expected {
1610                                        let all_match =
1611                                            expected_map.iter().all(|(key, expected_val)| {
1612                                                rel_rec
1613                                                    .properties
1614                                                    .get(key)
1615                                                    .map(|actual| {
1616                                                        value_matches_property_value(
1617                                                            expected_val,
1618                                                            actual,
1619                                                        )
1620                                                    })
1621                                                    .unwrap_or(false)
1622                                            });
1623                                        if !all_match {
1624                                            return false;
1625                                        }
1626                                    }
1627                                }
1628                                true
1629                            })
1630                            .unwrap_or(false);
1631                        if !rel_ok {
1632                            continue;
1633                        }
1634
1635                        // Match found — bind variables
1636                        if let Some(rel_var) = step.rel.var {
1637                            row.insert(rel_var, LoraValue::Relationship(rel_id));
1638                        }
1639                        if let Some(node_var) = step.node.var {
1640                            row.insert(node_var, LoraValue::Node(node_id));
1641                        }
1642                        current_node_id = node_id;
1643                        found = true;
1644                        break;
1645                    }
1646
1647                    if !found {
1648                        return Ok(false);
1649                    }
1650                }
1651
1652                Ok(true)
1653            }
1654        }
1655    }
1656
1657    fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
1658        let input_rows = self.execute_node(plan, op.input)?;
1659
1660        for row in &input_rows {
1661            for expr in &op.expressions {
1662                let value = {
1663                    let eval_ctx = EvalContext {
1664                        storage: &*self.ctx.storage,
1665                        params: &self.ctx.params,
1666                    };
1667                    eval_expr(expr, row, &eval_ctx)
1668                };
1669
1670                self.delete_value(value, op.detach)?;
1671            }
1672        }
1673
1674        Ok(input_rows)
1675    }
1676
1677    fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
1678        let input_rows = self.execute_node(plan, op.input)?;
1679
1680        for row in &input_rows {
1681            for item in &op.items {
1682                self.apply_set_item(row, item)?;
1683            }
1684        }
1685
1686        Ok(input_rows)
1687    }
1688
1689    fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
1690        let input_rows = self.execute_node(plan, op.input)?;
1691
1692        for row in &input_rows {
1693            for item in &op.items {
1694                self.apply_remove_item(row, item)?;
1695            }
1696        }
1697
1698        Ok(input_rows)
1699    }
1700
1701    fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
1702        match item {
1703            ResolvedSetItem::SetProperty { target, value } => {
1704                let new_value = {
1705                    let eval_ctx = EvalContext {
1706                        storage: &*self.ctx.storage,
1707                        params: &self.ctx.params,
1708                    };
1709                    eval_expr(value, row, &eval_ctx)
1710                };
1711
1712                self.set_property_from_expr(row, target, new_value)
1713            }
1714
1715            ResolvedSetItem::SetVariable { variable, value } => {
1716                // Only need the entity's id — peek at the binding by reference.
1717                let entity_ref =
1718                    row.get(*variable)
1719                        .ok_or(ExecutorError::UnboundVariableForSet {
1720                            var: format!("{variable:?}"),
1721                        })?;
1722                let entity_target = entity_target_from_value(entity_ref)?;
1723
1724                let new_value = {
1725                    let eval_ctx = EvalContext {
1726                        storage: &*self.ctx.storage,
1727                        params: &self.ctx.params,
1728                    };
1729                    eval_expr(value, row, &eval_ctx)
1730                };
1731
1732                self.overwrite_entity_target(entity_target, new_value)
1733            }
1734
1735            ResolvedSetItem::MutateVariable { variable, value } => {
1736                let entity_ref =
1737                    row.get(*variable)
1738                        .ok_or(ExecutorError::UnboundVariableForSet {
1739                            var: format!("{variable:?}"),
1740                        })?;
1741                let entity_target = entity_target_from_value(entity_ref)?;
1742
1743                let patch = {
1744                    let eval_ctx = EvalContext {
1745                        storage: &*self.ctx.storage,
1746                        params: &self.ctx.params,
1747                    };
1748                    eval_expr(value, row, &eval_ctx)
1749                };
1750
1751                self.mutate_entity_target(entity_target, patch)
1752            }
1753
1754            ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
1755                Some(LoraValue::Node(node_id)) => {
1756                    let node_id = *node_id;
1757                    for label in labels {
1758                        self.ctx.storage.add_node_label(node_id, label);
1759                    }
1760                    Ok(())
1761                }
1762                Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
1763                    found: value_kind(other),
1764                }),
1765                None => Err(ExecutorError::UnboundVariableForSet {
1766                    var: format!("{variable:?}"),
1767                }),
1768            },
1769        }
1770    }
1771
1772    fn set_property_from_expr(
1773        &mut self,
1774        row: &Row,
1775        target_expr: &ResolvedExpr,
1776        new_value: LoraValue,
1777    ) -> ExecResult<()> {
1778        let ResolvedExpr::Property { expr, property } = target_expr else {
1779            return Err(ExecutorError::UnsupportedSetTarget);
1780        };
1781
1782        let owner = {
1783            let eval_ctx = EvalContext {
1784                storage: &*self.ctx.storage,
1785                params: &self.ctx.params,
1786            };
1787            eval_expr(expr, row, &eval_ctx)
1788        };
1789
1790        match owner {
1791            LoraValue::Node(node_id) => {
1792                let prop = lora_value_to_property(new_value)
1793                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1794                self.ctx
1795                    .storage
1796                    .set_node_property(node_id, property.clone(), prop);
1797                Ok(())
1798            }
1799            LoraValue::Relationship(rel_id) => {
1800                let prop = lora_value_to_property(new_value)
1801                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1802                self.ctx
1803                    .storage
1804                    .set_relationship_property(rel_id, property.clone(), prop);
1805                Ok(())
1806            }
1807            other => Err(ExecutorError::InvalidSetTarget {
1808                found: value_kind(&other),
1809            }),
1810        }
1811    }
1812
1813    fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
1814        let ResolvedExpr::Property {
1815            expr: owner_expr,
1816            property,
1817        } = expr
1818        else {
1819            return Err(ExecutorError::UnsupportedRemoveTarget);
1820        };
1821
1822        let owner = {
1823            let eval_ctx = EvalContext {
1824                storage: &*self.ctx.storage,
1825                params: &self.ctx.params,
1826            };
1827            eval_expr(owner_expr, row, &eval_ctx)
1828        };
1829
1830        match owner {
1831            LoraValue::Node(node_id) => {
1832                self.ctx.storage.remove_node_property(node_id, property);
1833                Ok(())
1834            }
1835            LoraValue::Relationship(rel_id) => {
1836                self.ctx
1837                    .storage
1838                    .remove_relationship_property(rel_id, property);
1839                Ok(())
1840            }
1841            other => Err(ExecutorError::InvalidRemoveTarget {
1842                found: value_kind(&other),
1843            }),
1844        }
1845    }
1846
1847    fn overwrite_entity_target(
1848        &mut self,
1849        target: EntityTarget,
1850        new_value: LoraValue,
1851    ) -> ExecResult<()> {
1852        let LoraValue::Map(map) = new_value else {
1853            return Err(ExecutorError::ExpectedPropertyMap {
1854                found: value_kind(&new_value),
1855            });
1856        };
1857
1858        let mut props: Properties = Properties::new();
1859        for (k, v) in map {
1860            let prop = lora_value_to_property(v)
1861                .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1862            props.insert(k, prop);
1863        }
1864
1865        match target {
1866            EntityTarget::Node(node_id) => {
1867                self.ctx.storage.replace_node_properties(node_id, props);
1868            }
1869            EntityTarget::Relationship(rel_id) => {
1870                self.ctx
1871                    .storage
1872                    .replace_relationship_properties(rel_id, props);
1873            }
1874        }
1875        Ok(())
1876    }
1877
1878    fn mutate_entity_target(
1879        &mut self,
1880        target: EntityTarget,
1881        patch_value: LoraValue,
1882    ) -> ExecResult<()> {
1883        let LoraValue::Map(map) = patch_value else {
1884            return Err(ExecutorError::ExpectedPropertyMap {
1885                found: value_kind(&patch_value),
1886            });
1887        };
1888
1889        match target {
1890            EntityTarget::Node(node_id) => {
1891                for (k, v) in map {
1892                    let prop = lora_value_to_property(v)
1893                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1894                    self.ctx.storage.set_node_property(node_id, k, prop);
1895                }
1896            }
1897            EntityTarget::Relationship(rel_id) => {
1898                for (k, v) in map {
1899                    let prop = lora_value_to_property(v)
1900                        .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1901                    self.ctx.storage.set_relationship_property(rel_id, k, prop);
1902                }
1903            }
1904        }
1905        Ok(())
1906    }
1907
1908    fn apply_create_pattern(&mut self, row: &mut Row, pattern: &ResolvedPattern) -> ExecResult<()> {
1909        for part in &pattern.parts {
1910            self.apply_create_pattern_part(row, part)?;
1911        }
1912        Ok(())
1913    }
1914
1915    fn apply_create_pattern_part(
1916        &mut self,
1917        row: &mut Row,
1918        part: &ResolvedPatternPart,
1919    ) -> ExecResult<()> {
1920        if part.binding.is_some() {
1921            trace!("create pattern part has path binding; path materialization not implemented");
1922        }
1923
1924        let _ = self.apply_create_pattern_element(row, &part.element)?;
1925        Ok(())
1926    }
1927
1928    fn apply_create_pattern_element(
1929        &mut self,
1930        row: &mut Row,
1931        element: &ResolvedPatternElement,
1932    ) -> ExecResult<Option<LoraValue>> {
1933        match element {
1934            ResolvedPatternElement::Node {
1935                var,
1936                labels,
1937                properties,
1938            } => {
1939                let node_id =
1940                    self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
1941                Ok(Some(LoraValue::Node(node_id)))
1942            }
1943
1944            ResolvedPatternElement::NodeChain { head, chain } => {
1945                let mut current_node_id = self.materialize_node_pattern(
1946                    row,
1947                    head.var,
1948                    &head.labels,
1949                    head.properties.as_ref(),
1950                )?;
1951
1952                for link in chain {
1953                    let next_node_id = self.materialize_node_pattern(
1954                        row,
1955                        link.node.var,
1956                        &link.node.labels,
1957                        link.node.properties.as_ref(),
1958                    )?;
1959
1960                    let _ = self.materialize_relationship_pattern(
1961                        row,
1962                        current_node_id,
1963                        next_node_id,
1964                        &link.rel,
1965                    )?;
1966
1967                    current_node_id = next_node_id;
1968                }
1969
1970                Ok(Some(LoraValue::Node(current_node_id)))
1971            }
1972
1973            ResolvedPatternElement::ShortestPath { .. } => {
1974                // ShortestPath is not valid in CREATE context
1975                Ok(None)
1976            }
1977        }
1978    }
1979
1980    fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
1981        match &part.element {
1982            ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
1983
1984            ResolvedPatternElement::ShortestPath { .. } => false,
1985
1986            ResolvedPatternElement::NodeChain { head, chain } => {
1987                let head_ok = head.var.and_then(|v| row.get(v)).is_some();
1988
1989                let chain_ok = chain.iter().all(|link| {
1990                    let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
1991                    // For MERGE, anonymous relationships cannot be considered
1992                    // "bound" because we have no variable to check. The merge
1993                    // must search the graph to see if the relationship exists.
1994                    let rel_ok = match link.rel.var {
1995                        Some(v) => row.get(v).is_some(),
1996                        None => false,
1997                    };
1998                    node_ok && rel_ok
1999                });
2000
2001                head_ok && chain_ok
2002            }
2003        }
2004    }
2005
2006    fn materialize_node_pattern(
2007        &mut self,
2008        row: &mut Row,
2009        var: Option<lora_analyzer::symbols::VarId>,
2010        labels: &[Vec<String>],
2011        properties: Option<&ResolvedExpr>,
2012    ) -> ExecResult<u64> {
2013        if let Some(var_id) = var {
2014            if let Some(LoraValue::Node(id)) = row.get(var_id) {
2015                return Ok(*id);
2016            }
2017        }
2018
2019        let properties = match properties {
2020            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2021            None => Properties::new(),
2022        };
2023
2024        let flat_labels = flatten_label_groups(labels);
2025        debug!("creating node with labels={flat_labels:?}");
2026        let created = self.ctx.storage.create_node(flat_labels, properties);
2027
2028        if let Some(var_id) = var {
2029            row.insert(var_id, LoraValue::Node(created.id));
2030        }
2031
2032        Ok(created.id)
2033    }
2034
2035    fn materialize_relationship_pattern(
2036        &mut self,
2037        row: &mut Row,
2038        left_node_id: u64,
2039        right_node_id: u64,
2040        rel: &lora_analyzer::ResolvedRel,
2041    ) -> ExecResult<u64> {
2042        if let Some(var_id) = rel.var {
2043            if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2044                let id = *id;
2045                if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2046                    let endpoints_match = match rel.direction {
2047                        Direction::Right | Direction::Undirected => {
2048                            src == left_node_id && dst == right_node_id
2049                        }
2050                        Direction::Left => src == right_node_id && dst == left_node_id,
2051                    };
2052
2053                    if endpoints_match {
2054                        return Ok(id);
2055                    }
2056                }
2057            }
2058        }
2059
2060        if rel.range.is_some() {
2061            return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2062        }
2063
2064        let (src, dst) = match rel.direction {
2065            Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2066            Direction::Left => (right_node_id, left_node_id),
2067        };
2068
2069        let rel_type = rel
2070            .types
2071            .first()
2072            .ok_or(ExecutorError::MissingRelationshipType)?;
2073
2074        if rel_type.is_empty() {
2075            return Err(ExecutorError::MissingRelationshipType);
2076        }
2077
2078        let properties = match rel.properties.as_ref() {
2079            Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2080            None => Properties::new(),
2081        };
2082
2083        debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2084
2085        let created = self
2086            .ctx
2087            .storage
2088            .create_relationship(src, dst, rel_type, properties)
2089            .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2090                src,
2091                dst,
2092                rel_type: rel_type.clone(),
2093            })?;
2094
2095        if let Some(var_id) = rel.var {
2096            row.insert(var_id, LoraValue::Relationship(created.id));
2097        }
2098
2099        Ok(created.id)
2100    }
2101}
2102
2103/// Lightweight target for SET property-mutation paths. Lets the SET logic
2104/// borrow the row entry (just pulling out the id) instead of cloning the
2105/// whole `LoraValue`.
2106#[derive(Clone, Copy)]
2107enum EntityTarget {
2108    Node(NodeId),
2109    Relationship(u64),
2110}
2111
2112fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2113    match value {
2114        LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2115        LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2116        other => Err(ExecutorError::InvalidSetTarget {
2117            found: value_kind(other),
2118        }),
2119    }
2120}
2121
2122/// Dedup rows that share the same schema (same VarId set). Compares rows by
2123/// a Vec<GroupValueKey> keyed on VarId iteration order — avoids the per-row
2124/// column-name String clones of `dedup_rows`. Used by DISTINCT projection.
2125fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2126    let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2127    let mut out = Vec::new();
2128
2129    for row in rows {
2130        let key: Vec<GroupValueKey> = row
2131            .iter()
2132            .map(|(_, val)| GroupValueKey::from_value(val))
2133            .collect();
2134        if seen.insert(key) {
2135            out.push(row);
2136        }
2137    }
2138
2139    out
2140}
2141
2142/// Dedup rows using named entries so rows with different VarIds but the same
2143/// column name + value are collapsed. Needed for UNION where each branch has
2144/// its own VarIds.
2145fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2146    let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2147    let mut out = Vec::new();
2148
2149    for row in rows {
2150        let key: Vec<(String, GroupValueKey)> = row
2151            .iter_named()
2152            .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2153            .collect();
2154        if seen.insert(key) {
2155            out.push(row);
2156        }
2157    }
2158
2159    out
2160}
2161
2162fn eval_properties_expr<S: GraphStorage>(
2163    expr: &ResolvedExpr,
2164    row: &Row,
2165    storage: &S,
2166    params: &std::collections::BTreeMap<String, LoraValue>,
2167) -> ExecResult<Properties> {
2168    let eval_ctx = EvalContext { storage, params };
2169
2170    match eval_expr(expr, row, &eval_ctx) {
2171        LoraValue::Map(map) => {
2172            let mut out = Properties::new();
2173            for (k, v) in map {
2174                let prop = lora_value_to_property(v)
2175                    .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2176                out.insert(k, prop);
2177            }
2178            Ok(out)
2179        }
2180        other => Err(ExecutorError::ExpectedPropertyMap {
2181            found: value_kind(&other),
2182        }),
2183    }
2184}
2185
2186fn compute_aggregate_expr<S: GraphStorage>(
2187    expr: &ResolvedExpr,
2188    rows: &[Row],
2189    eval_ctx: &EvalContext<'_, S>,
2190) -> LoraValue {
2191    match expr {
2192        ResolvedExpr::Function {
2193            name,
2194            distinct,
2195            args,
2196        } => {
2197            let func = name.to_ascii_lowercase();
2198
2199            match func.as_str() {
2200                "count" => {
2201                    if args.is_empty() {
2202                        return LoraValue::Int(rows.len() as i64);
2203                    }
2204
2205                    let mut values = rows
2206                        .iter()
2207                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2208                        .filter(|v| !matches!(v, LoraValue::Null))
2209                        .collect::<Vec<_>>();
2210
2211                    if *distinct {
2212                        values = dedup_values(values);
2213                    }
2214
2215                    LoraValue::Int(values.len() as i64)
2216                }
2217
2218                "collect" => {
2219                    if args.is_empty() {
2220                        return LoraValue::List(Vec::new());
2221                    }
2222
2223                    let mut values = rows
2224                        .iter()
2225                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2226                        .collect::<Vec<_>>();
2227
2228                    if *distinct {
2229                        values = dedup_values(values);
2230                    }
2231
2232                    LoraValue::List(values)
2233                }
2234
2235                "sum" => {
2236                    if args.is_empty() {
2237                        return LoraValue::Null;
2238                    }
2239
2240                    let mut values = rows
2241                        .iter()
2242                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2243                        .collect::<Vec<_>>();
2244
2245                    if *distinct {
2246                        values = dedup_values(values);
2247                    }
2248
2249                    let nums = values
2250                        .into_iter()
2251                        .filter_map(as_f64_lossy)
2252                        .collect::<Vec<_>>();
2253
2254                    if nums.is_empty() {
2255                        LoraValue::Null
2256                    } else if nums.iter().all(|n| n.fract() == 0.0) {
2257                        LoraValue::Int(nums.iter().sum::<f64>() as i64)
2258                    } else {
2259                        LoraValue::Float(nums.iter().sum::<f64>())
2260                    }
2261                }
2262
2263                "avg" => {
2264                    if args.is_empty() {
2265                        return LoraValue::Null;
2266                    }
2267
2268                    let mut values = rows
2269                        .iter()
2270                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2271                        .collect::<Vec<_>>();
2272
2273                    if *distinct {
2274                        values = dedup_values(values);
2275                    }
2276
2277                    let nums = values
2278                        .into_iter()
2279                        .filter_map(as_f64_lossy)
2280                        .collect::<Vec<_>>();
2281
2282                    if nums.is_empty() {
2283                        LoraValue::Null
2284                    } else {
2285                        LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2286                    }
2287                }
2288
2289                "min" => {
2290                    if args.is_empty() {
2291                        return LoraValue::Null;
2292                    }
2293
2294                    let mut values = rows
2295                        .iter()
2296                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2297                        .filter(|v| !matches!(v, LoraValue::Null))
2298                        .collect::<Vec<_>>();
2299
2300                    if *distinct {
2301                        values = dedup_values(values);
2302                    }
2303
2304                    values
2305                        .into_iter()
2306                        .min_by(compare_values_total)
2307                        .unwrap_or(LoraValue::Null)
2308                }
2309
2310                "max" => {
2311                    if args.is_empty() {
2312                        return LoraValue::Null;
2313                    }
2314
2315                    let mut values = rows
2316                        .iter()
2317                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2318                        .filter(|v| !matches!(v, LoraValue::Null))
2319                        .collect::<Vec<_>>();
2320
2321                    if *distinct {
2322                        values = dedup_values(values);
2323                    }
2324
2325                    values
2326                        .into_iter()
2327                        .max_by(compare_values_total)
2328                        .unwrap_or(LoraValue::Null)
2329                }
2330
2331                "stdev" | "stdevp" => {
2332                    if args.is_empty() {
2333                        return LoraValue::Null;
2334                    }
2335
2336                    let nums: Vec<f64> = rows
2337                        .iter()
2338                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2339                        .filter_map(as_f64_lossy)
2340                        .collect();
2341
2342                    let is_population = func == "stdevp";
2343
2344                    if nums.is_empty() || (!is_population && nums.len() < 2) {
2345                        return LoraValue::Float(0.0);
2346                    }
2347
2348                    let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2349                    let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2350                    let denom = if is_population {
2351                        nums.len() as f64
2352                    } else {
2353                        (nums.len() - 1) as f64
2354                    };
2355                    LoraValue::Float((variance_sum / denom).sqrt())
2356                }
2357
2358                "percentilecont" => {
2359                    if args.len() < 2 {
2360                        return LoraValue::Null;
2361                    }
2362
2363                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2364                        .as_f64()
2365                        .unwrap_or(0.5);
2366                    let mut nums: Vec<f64> = rows
2367                        .iter()
2368                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2369                        .filter_map(as_f64_lossy)
2370                        .collect();
2371
2372                    if nums.is_empty() {
2373                        return LoraValue::Null;
2374                    }
2375
2376                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2377
2378                    let index = percentile * (nums.len() - 1) as f64;
2379                    let lower = index.floor() as usize;
2380                    let upper = index.ceil() as usize;
2381                    let fraction = index - lower as f64;
2382
2383                    if lower == upper || upper >= nums.len() {
2384                        LoraValue::Float(nums[lower])
2385                    } else {
2386                        LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2387                    }
2388                }
2389
2390                "percentiledisc" => {
2391                    if args.len() < 2 {
2392                        return LoraValue::Null;
2393                    }
2394
2395                    let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2396                        .as_f64()
2397                        .unwrap_or(0.5);
2398                    let mut nums: Vec<f64> = rows
2399                        .iter()
2400                        .map(|r| eval_expr(&args[0], r, eval_ctx))
2401                        .filter_map(as_f64_lossy)
2402                        .collect();
2403
2404                    if nums.is_empty() {
2405                        return LoraValue::Null;
2406                    }
2407
2408                    nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2409
2410                    let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2411                    let index = index.min(nums.len() - 1);
2412                    LoraValue::Float(nums[index])
2413                }
2414
2415                _ => rows
2416                    .first()
2417                    .map(|r| eval_expr(expr, r, eval_ctx))
2418                    .unwrap_or(LoraValue::Null),
2419            }
2420        }
2421
2422        _ => rows
2423            .first()
2424            .map(|r| eval_expr(expr, r, eval_ctx))
2425            .unwrap_or(LoraValue::Null),
2426    }
2427}
2428
2429fn compare_sort_item<S: GraphStorage>(
2430    item: &ResolvedSortItem,
2431    a: &Row,
2432    b: &Row,
2433    eval_ctx: &EvalContext<'_, S>,
2434) -> Ordering {
2435    let av = eval_expr(&item.expr, a, eval_ctx);
2436    let bv = eval_expr(&item.expr, b, eval_ctx);
2437
2438    let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2439    compare_values_for_sort(&av, &bv, ascending)
2440}
2441
2442fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2443    let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2444    let mut out = Vec::new();
2445
2446    for value in values {
2447        let key = GroupValueKey::from_value(&value);
2448        if seen.insert(key) {
2449            out.push(value);
2450        }
2451    }
2452
2453    out
2454}
2455
2456fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2457    match v {
2458        LoraValue::Int(i) => Some(i as f64),
2459        LoraValue::Float(f) => Some(f),
2460        _ => None,
2461    }
2462}
2463
2464fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2465    let ord = match (a, b) {
2466        (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2467        (LoraValue::Null, _) => Ordering::Greater,
2468        (_, LoraValue::Null) => Ordering::Less,
2469        _ => compare_values_total(a, b),
2470    };
2471
2472    if ascending {
2473        ord
2474    } else {
2475        ord.reverse()
2476    }
2477}
2478
2479fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
2480    use LoraValue::*;
2481
2482    match (a, b) {
2483        (Bool(x), Bool(y)) => x.cmp(y),
2484        (Int(x), Int(y)) => x.cmp(y),
2485        (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
2486        (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
2487        (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
2488        (String(x), String(y)) => x.cmp(y),
2489        (Node(x), Node(y)) => x.cmp(y),
2490        (Relationship(x), Relationship(y)) => x.cmp(y),
2491        (Date(x), Date(y)) => x.cmp(y),
2492        (DateTime(x), DateTime(y)) => x.cmp(y),
2493        (Duration(x), Duration(y)) => x.cmp(y),
2494        (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
2495        _ => type_rank(a)
2496            .cmp(&type_rank(b))
2497            .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
2498    }
2499}
2500
2501pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
2502    match (expected, actual) {
2503        (LoraValue::Null, PropertyValue::Null) => true,
2504        (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
2505        (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
2506        (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
2507        (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
2508        (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
2509        (LoraValue::String(a), PropertyValue::String(b)) => a == b,
2510
2511        (LoraValue::List(xs), PropertyValue::List(ys)) => {
2512            xs.len() == ys.len()
2513                && xs
2514                    .iter()
2515                    .zip(ys.iter())
2516                    .all(|(x, y)| value_matches_property_value(x, y))
2517        }
2518
2519        (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
2520            ym.get(k)
2521                .map(|yv| value_matches_property_value(xv, yv))
2522                .unwrap_or(false)
2523        }),
2524
2525        (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
2526        (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
2527        (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
2528        (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
2529        (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
2530        (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
2531        (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
2532        (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
2533
2534        _ => false,
2535    }
2536}
2537
2538/// Build a LoraPath from the node and relationship variables currently in a row.
2539///
2540/// For variable-length relationships (stored as a List of Relationship values),
2541/// intermediate nodes are reconstructed from the storage by walking the
2542/// relationship chain.
2543fn build_path_value<S: GraphStorage>(
2544    row: &Row,
2545    node_vars: &[VarId],
2546    rel_vars: &[VarId],
2547    storage: &S,
2548) -> LoraValue {
2549    let mut raw_nodes = Vec::new();
2550    let mut rels = Vec::new();
2551    let mut has_var_len = false;
2552
2553    for &nv in node_vars {
2554        match row.get(nv) {
2555            Some(LoraValue::Node(id)) => raw_nodes.push(*id),
2556            Some(LoraValue::List(items)) => {
2557                for item in items {
2558                    if let LoraValue::Node(id) = item {
2559                        raw_nodes.push(*id);
2560                    }
2561                }
2562            }
2563            _ => {}
2564        }
2565    }
2566
2567    for &rv in rel_vars {
2568        match row.get(rv) {
2569            Some(LoraValue::Relationship(id)) => rels.push(*id),
2570            Some(LoraValue::List(items)) => {
2571                has_var_len = true;
2572                for item in items {
2573                    if let LoraValue::Relationship(id) = item {
2574                        rels.push(*id);
2575                    }
2576                }
2577            }
2578            _ => {}
2579        }
2580    }
2581
2582    // For variable-length paths, reconstruct the full node sequence from the
2583    // relationship chain. raw_nodes typically only has [start, end] but the
2584    // path needs all intermediate nodes as well.
2585    let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
2586        let start = raw_nodes[0];
2587        let mut ordered = Vec::with_capacity(rels.len() + 1);
2588        ordered.push(start);
2589        let mut current = start;
2590        for &rel_id in &rels {
2591            if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
2592                let next = if src == current { dst } else { src };
2593                ordered.push(next);
2594                current = next;
2595            }
2596        }
2597        ordered
2598    } else {
2599        raw_nodes
2600    };
2601
2602    LoraValue::Path(LoraPath { nodes, rels })
2603}
2604
2605fn type_rank(v: &LoraValue) -> u8 {
2606    match v {
2607        LoraValue::Null => 0,
2608        LoraValue::Bool(_) => 1,
2609        LoraValue::Int(_) | LoraValue::Float(_) => 2,
2610        LoraValue::String(_) => 3,
2611        LoraValue::Date(_) => 4,
2612        LoraValue::DateTime(_) => 5,
2613        LoraValue::LocalDateTime(_) => 6,
2614        LoraValue::Time(_) => 7,
2615        LoraValue::LocalTime(_) => 8,
2616        LoraValue::Duration(_) => 9,
2617        LoraValue::Point(_) => 10,
2618        LoraValue::Vector(_) => 11,
2619        LoraValue::List(_) => 12,
2620        LoraValue::Map(_) => 13,
2621        LoraValue::Node(_) => 14,
2622        LoraValue::Relationship(_) => 15,
2623        LoraValue::Path(_) => 16,
2624    }
2625}
2626
2627/// Check whether a node's labels satisfy all label groups.
2628/// Each group is a disjunction (OR): the node must have at least one label
2629/// from the group.  Groups are conjunctive (AND): all groups must be satisfied.
2630fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
2631    groups
2632        .iter()
2633        .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
2634}
2635
2636/// Scan the graph for candidate node IDs matching the label groups. Uses the
2637/// label index for the pick-first-label phase and avoids cloning NodeRecords.
2638fn scan_node_ids_for_label_groups<S: GraphStorage>(
2639    storage: &S,
2640    groups: &[Vec<String>],
2641) -> Vec<lora_store::NodeId> {
2642    if groups.len() == 1 && groups[0].len() == 1 {
2643        storage.node_ids_by_label(&groups[0][0])
2644    } else if groups.len() == 1 && groups[0].len() > 1 {
2645        let mut seen = std::collections::BTreeSet::new();
2646        let mut out = Vec::new();
2647        for label in &groups[0] {
2648            for id in storage.node_ids_by_label(label) {
2649                if seen.insert(id) {
2650                    out.push(id);
2651                }
2652            }
2653        }
2654        out
2655    } else {
2656        storage.node_ids_by_label(&groups[0][0])
2657    }
2658}
2659
2660fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
2661    let mut map = BTreeMap::new();
2662    map.insert("kind".to_string(), LoraValue::String("node".to_string()));
2663    map.insert("id".to_string(), LoraValue::Int(node.id as i64));
2664    map.insert(
2665        "labels".to_string(),
2666        LoraValue::List(
2667            node.labels
2668                .iter()
2669                .map(|s| LoraValue::String(s.clone()))
2670                .collect(),
2671        ),
2672    );
2673    map.insert(
2674        "properties".to_string(),
2675        properties_to_value_map(&node.properties),
2676    );
2677    LoraValue::Map(map)
2678}
2679
2680fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
2681    let mut map = BTreeMap::new();
2682    map.insert(
2683        "kind".to_string(),
2684        LoraValue::String("relationship".to_string()),
2685    );
2686    map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
2687    map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
2688    map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
2689    map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
2690    map.insert(
2691        "properties".to_string(),
2692        properties_to_value_map(&rel.properties),
2693    );
2694    LoraValue::Map(map)
2695}
2696
2697/// Flatten label groups into a simple Vec<String> (for CREATE/MERGE where
2698/// disjunction doesn't apply — all labels are created).
2699fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
2700    groups.iter().flat_map(|g| g.iter().cloned()).collect()
2701}
2702
2703#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2704enum GroupValueKey {
2705    Null,
2706    Bool(bool),
2707    Int(i64),
2708    Float(String),
2709    String(String),
2710    List(Vec<GroupValueKey>),
2711    Map(Vec<(String, GroupValueKey)>),
2712    Node(u64),
2713    Relationship(u64),
2714}
2715
2716impl GroupValueKey {
2717    fn from_value(v: &LoraValue) -> Self {
2718        match v {
2719            LoraValue::Null => Self::Null,
2720            LoraValue::Bool(x) => Self::Bool(*x),
2721            LoraValue::Int(x) => Self::Int(*x),
2722            LoraValue::Float(x) => Self::Float(x.to_string()),
2723            LoraValue::String(x) => Self::String(x.clone()),
2724            LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
2725            LoraValue::Map(m) => Self::Map(
2726                m.iter()
2727                    .map(|(k, v)| (k.clone(), Self::from_value(v)))
2728                    .collect(),
2729            ),
2730            LoraValue::Node(id) => Self::Node(*id),
2731            LoraValue::Relationship(id) => Self::Relationship(*id),
2732            LoraValue::Path(_) => Self::Null,
2733            // Temporal types: use their string representation as group key
2734            LoraValue::Date(d) => Self::String(d.to_string()),
2735            LoraValue::DateTime(dt) => Self::String(dt.to_string()),
2736            LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
2737            LoraValue::Time(t) => Self::String(t.to_string()),
2738            LoraValue::LocalTime(t) => Self::String(t.to_string()),
2739            LoraValue::Duration(dur) => Self::String(dur.to_string()),
2740            LoraValue::Point(p) => Self::String(p.to_string()),
2741            LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
2742        }
2743    }
2744}
2745
2746/// Compute effective (min_hops, max_hops) from a `RangeLiteral`.
2747///
2748/// Lora semantics:
2749/// - `*`       → 1..∞   (start=None, end=None)
2750/// - `*2..5`   → 2..5   (start=Some(2), end=Some(5))
2751/// - `*..3`    → 1..3   (start=None, end=Some(3))
2752/// - `*2..`    → 2..∞   (start=Some(2), end=None)
2753/// - `*3`      → 3..3   (start=Some(3), end=None, no dots → exactly 3)
2754/// - `*0..1`   → 0..1
2755///
2756/// For unbounded upper, we cap at `MAX_VAR_LEN_HOPS` to prevent runaway.
2757const MAX_VAR_LEN_HOPS: u64 = 100;
2758
2759fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
2760    let min_hops = range.start.unwrap_or(1);
2761    let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
2762    (min_hops, max_hops)
2763}
2764
2765/// An entry produced during BFS variable-length expansion.
2766struct VarLenResult {
2767    /// The destination node at the end of this path.
2768    dst_node_id: NodeId,
2769    /// The relationship IDs traversed (in order).
2770    rel_ids: Vec<u64>,
2771}
2772
2773/// Perform variable-length expansion from `start_node_id` following
2774/// relationships of the given `types` and `direction`, collecting all
2775/// reachable nodes at hop distances in `[min_hops, max_hops]`.
2776///
2777/// Uses BFS with relationship-uniqueness per path (each path does not
2778/// reuse the same relationship, but may revisit nodes).
2779fn variable_length_expand<S: GraphStorage>(
2780    storage: &S,
2781    start_node_id: NodeId,
2782    direction: Direction,
2783    types: &[String],
2784    min_hops: u64,
2785    max_hops: u64,
2786) -> Vec<VarLenResult> {
2787    let mut results = Vec::new();
2788
2789    // Each frontier entry: (current_node_id, relationships_used_so_far)
2790    let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
2791
2792    for depth in 1..=max_hops {
2793        // On the final hop we don't need to build next_frontier at all; every
2794        // path gets recorded and then the loop terminates. Avoids one full
2795        // pass of Vec clones on deep traversals.
2796        let is_last_hop = depth == max_hops;
2797        let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
2798
2799        for (current_node, rels_used) in &frontier {
2800            // ID-only expand avoids cloning full records/properties for every
2801            // neighbour on every hop.
2802            for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
2803                // Relationship-uniqueness: skip if this relationship was already
2804                // traversed on this particular path.
2805                if rels_used.contains(&rel_id) {
2806                    continue;
2807                }
2808
2809                if is_last_hop {
2810                    // Terminal hop: just record the result. Allocate rel_ids
2811                    // once (no duplicate clone) by extending a fresh copy.
2812                    if depth >= min_hops {
2813                        let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
2814                        rel_ids.extend_from_slice(rels_used);
2815                        rel_ids.push(rel_id);
2816                        results.push(VarLenResult {
2817                            dst_node_id: neighbor_id,
2818                            rel_ids,
2819                        });
2820                    }
2821                    continue;
2822                }
2823
2824                let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
2825                new_rels.extend_from_slice(rels_used);
2826                new_rels.push(rel_id);
2827
2828                if depth >= min_hops {
2829                    results.push(VarLenResult {
2830                        dst_node_id: neighbor_id,
2831                        rel_ids: new_rels.clone(),
2832                    });
2833                }
2834
2835                next_frontier.push((neighbor_id, new_rels));
2836            }
2837        }
2838
2839        if is_last_hop || next_frontier.is_empty() {
2840            break;
2841        }
2842
2843        frontier = next_frontier;
2844    }
2845
2846    // Handle min_hops == 0: include the start node itself at depth 0.
2847    if min_hops == 0 {
2848        results.insert(
2849            0,
2850            VarLenResult {
2851                dst_node_id: start_node_id,
2852                rel_ids: Vec::new(),
2853            },
2854        );
2855    }
2856
2857    results
2858}
2859
2860/// Filter rows to keep only shortest paths.
2861/// `all` = false → keep one shortest path; `all` = true → keep all shortest.
2862fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
2863    if rows.is_empty() {
2864        return rows;
2865    }
2866
2867    // Compute path length for each row
2868    let lengths: Vec<usize> = rows
2869        .iter()
2870        .map(|row| match row.get(path_var) {
2871            Some(LoraValue::Path(p)) => p.rels.len(),
2872            _ => usize::MAX,
2873        })
2874        .collect();
2875
2876    let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
2877
2878    let mut result: Vec<Row> = rows
2879        .into_iter()
2880        .zip(lengths.iter())
2881        .filter(|(_, len)| **len == min_len)
2882        .map(|(row, _)| row)
2883        .collect();
2884
2885    if !all && result.len() > 1 {
2886        result.truncate(1);
2887    }
2888
2889    result
2890}