1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7 symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8 ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use tracing::{debug, error, trace};
18
19pub struct ExecutionContext<'a, S: GraphStorage + ?Sized> {
20 pub storage: &'a S,
21 pub params: std::collections::BTreeMap<String, LoraValue>,
22}
23
24pub struct Executor<'a, S: GraphStorage + ?Sized> {
25 ctx: ExecutionContext<'a, S>,
26}
27
28impl<'a, S: GraphStorage + ?Sized> Executor<'a, S> {
29 pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
30 Self { ctx }
31 }
32
33 pub fn execute(
34 &self,
35 plan: &PhysicalPlan,
36 options: Option<ExecuteOptions>,
37 ) -> ExecResult<QueryResult> {
38 let _ = take_eval_error();
41
42 let rows = self.execute_node(plan, plan.root)?;
43 let rows = rows
44 .into_iter()
45 .map(|row| self.hydrate_row(row))
46 .collect::<Vec<_>>();
47 Ok(project_rows(rows, options.unwrap_or_default()))
48 }
49
50 fn hydrate_row(&self, row: Row) -> Row {
51 let mut out = Row::new();
52
53 for (var, name, value) in row.into_iter_named() {
54 out.insert_named(var, name, self.hydrate_value(value));
55 }
56
57 out
58 }
59
60 fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
61 trace!("read-only execute_node start: node_id={node_id:?}");
62
63 let result = match &plan.nodes[node_id] {
64 PhysicalOp::Argument(op) => self.exec_argument(op),
65 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
66 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
67 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
68 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
69 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
70 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
71 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
72 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
73 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
74 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
75 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
76 PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
77 PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
78 PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
79 PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
80 PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
81 };
82
83 match &result {
84 Ok(rows) => trace!(
85 "read-only execute_node ok: node_id={node_id:?}, rows={}",
86 rows.len()
87 ),
88 Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
89 }
90
91 result
92 }
93
94 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
95 Ok(vec![Row::new()])
96 }
97
98 fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
99 let base_rows = match op.input {
100 Some(input) => self.execute_node(plan, input)?,
101 None => vec![Row::new()],
102 };
103
104 let node_ids = self.ctx.storage.all_node_ids();
105 let mut out = Vec::new();
106
107 for row in base_rows {
108 if let Some(existing) = row.get(op.var) {
109 match existing {
110 LoraValue::Node(existing_id) => {
111 if self.ctx.storage.has_node(*existing_id) {
112 out.push(row);
113 }
114 }
115 other => {
116 return Err(ExecutorError::ExpectedNodeForExpand {
117 var: format!("{:?}", op.var),
118 found: value_kind(other),
119 });
120 }
121 }
122 continue;
123 }
124
125 for &id in &node_ids {
126 let mut new_row = row.clone();
127 new_row.insert(op.var, LoraValue::Node(id));
128 out.push(new_row);
129 }
130 }
131
132 Ok(out)
133 }
134
135 fn exec_node_by_label_scan(
136 &self,
137 plan: &PhysicalPlan,
138 op: &NodeByLabelScanExec,
139 ) -> ExecResult<Vec<Row>> {
140 let base_rows = match op.input {
141 Some(input) => self.execute_node(plan, input)?,
142 None => vec![Row::new()],
143 };
144
145 let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
146 let mut out = Vec::new();
147
148 for row in base_rows {
149 if let Some(existing) = row.get(op.var) {
150 match existing {
151 LoraValue::Node(existing_id) => {
152 let labels_ok = self
153 .ctx
154 .storage
155 .node_ref(*existing_id)
156 .map(|n| node_matches_label_groups(&n.labels, &op.labels))
157 .unwrap_or(false);
158 if labels_ok {
159 out.push(row);
160 }
161 }
162 other => {
163 return Err(ExecutorError::ExpectedNodeForExpand {
164 var: format!("{:?}", op.var),
165 found: value_kind(other),
166 });
167 }
168 }
169 continue;
170 }
171
172 for &id in &candidate_ids {
173 let labels_ok = self
174 .ctx
175 .storage
176 .node_ref(id)
177 .map(|n| node_matches_label_groups(&n.labels, &op.labels))
178 .unwrap_or(false);
179 if !labels_ok {
180 continue;
181 }
182 let mut new_row = row.clone();
183 new_row.insert(op.var, LoraValue::Node(id));
184 out.push(new_row);
185 }
186 }
187
188 Ok(out)
189 }
190
191 fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
192 if let Some(range) = &op.range {
194 return self.exec_expand_var_len(plan, op, range);
195 }
196
197 let input_rows = self.execute_node(plan, op.input)?;
198 let mut out = Vec::new();
199
200 for row in input_rows {
201 let src_node_id = match row.get(op.src) {
202 Some(LoraValue::Node(id)) => *id,
203 Some(other) => {
204 return Err(ExecutorError::ExpectedNodeForExpand {
205 var: format!("{:?}", op.src),
206 found: value_kind(other),
207 });
208 }
209 None => continue,
210 };
211
212 for (rel_id, dst_id) in
213 self.ctx
214 .storage
215 .expand_ids(src_node_id, op.direction, &op.types)
216 {
217 if let Some(expr) = op.rel_properties.as_ref() {
218 let matches = match self.ctx.storage.relationship_ref(rel_id) {
219 Some(rel) => {
220 self.relationship_matches_properties(&rel.properties, Some(expr), &row)?
221 }
222 None => false,
223 };
224 if !matches {
225 continue;
226 }
227 }
228
229 if let Some(existing_dst) = row.get(op.dst) {
230 match existing_dst {
231 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
232 LoraValue::Node(_) => continue,
233 other => {
234 return Err(ExecutorError::ExpectedNodeForExpand {
235 var: format!("{:?}", op.dst),
236 found: value_kind(other),
237 });
238 }
239 }
240 }
241
242 if let Some(rel_var) = op.rel {
243 if let Some(existing_rel) = row.get(rel_var) {
244 match existing_rel {
245 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
246 LoraValue::Relationship(_) => continue,
247 other => {
248 return Err(ExecutorError::ExpectedRelationshipForExpand {
249 var: format!("{:?}", rel_var),
250 found: value_kind(other),
251 });
252 }
253 }
254 }
255 }
256
257 let mut new_row = row.clone();
258
259 if !new_row.contains_key(op.dst) {
260 new_row.insert(op.dst, LoraValue::Node(dst_id));
261 }
262
263 if let Some(rel_var) = op.rel {
264 if !new_row.contains_key(rel_var) {
265 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
266 }
267 }
268
269 out.push(new_row);
270 }
271 }
272
273 Ok(out)
274 }
275
276 fn exec_expand_var_len(
277 &self,
278 plan: &PhysicalPlan,
279 op: &ExpandExec,
280 range: &RangeLiteral,
281 ) -> ExecResult<Vec<Row>> {
282 let input_rows = self.execute_node(plan, op.input)?;
283 let (min_hops, max_hops) = resolve_range(range);
284 let mut out = Vec::new();
285
286 for row in input_rows {
287 let src_node_id = match row.get(op.src) {
288 Some(LoraValue::Node(id)) => *id,
289 Some(other) => {
290 return Err(ExecutorError::ExpectedNodeForExpand {
291 var: format!("{:?}", op.src),
292 found: value_kind(other),
293 });
294 }
295 None => continue,
296 };
297
298 let expansions = variable_length_expand(
299 self.ctx.storage,
300 src_node_id,
301 op.direction,
302 &op.types,
303 min_hops,
304 max_hops,
305 );
306
307 for result in expansions {
308 let mut new_row = row.clone();
309 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
310
311 if let Some(rel_var) = op.rel {
314 let rel_list = LoraValue::List(
316 result
317 .rel_ids
318 .into_iter()
319 .map(LoraValue::Relationship)
320 .collect(),
321 );
322 new_row.insert(rel_var, rel_list);
323 }
324
325 out.push(new_row);
326 }
327 }
328
329 Ok(out)
330 }
331
332 fn relationship_matches_properties(
333 &self,
334 actual: &Properties,
335 expected_expr: Option<&ResolvedExpr>,
336 row: &Row,
337 ) -> ExecResult<bool> {
338 let Some(expr) = expected_expr else {
339 return Ok(true);
340 };
341
342 let eval_ctx = EvalContext {
343 storage: self.ctx.storage,
344 params: &self.ctx.params,
345 };
346
347 let expected = eval_expr(expr, row, &eval_ctx);
348
349 let LoraValue::Map(expected_map) = expected else {
350 return Err(ExecutorError::ExpectedPropertyMap {
351 found: value_kind(&expected),
352 });
353 };
354
355 Ok(expected_map.iter().all(|(key, expected_value)| {
356 actual
357 .get(key)
358 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
359 .unwrap_or(false)
360 }))
361 }
362
363 fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
364 let input_rows = self.execute_node(plan, op.input)?;
365 let eval_ctx = EvalContext {
366 storage: self.ctx.storage,
367 params: &self.ctx.params,
368 };
369
370 Ok(input_rows
371 .into_iter()
372 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
373 .collect())
374 }
375
376 fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
377 let input_rows = self.execute_node(plan, op.input)?;
378 let eval_ctx = EvalContext {
379 storage: self.ctx.storage,
380 params: &self.ctx.params,
381 };
382
383 let mut out = Vec::with_capacity(input_rows.len());
384
385 for row in input_rows {
386 if op.include_existing {
389 let mut projected = row;
390 for item in &op.items {
391 let value = eval_expr(&item.expr, &projected, &eval_ctx);
392 if let Some(err) = take_eval_error() {
393 return Err(ExecutorError::RuntimeError(err));
394 }
395 projected.insert_named(item.output, item.name.clone(), value);
396 }
397 out.push(projected);
398 } else {
399 let mut projected = Row::new();
400 for item in &op.items {
401 let value = eval_expr(&item.expr, &row, &eval_ctx);
402 if let Some(err) = take_eval_error() {
403 return Err(ExecutorError::RuntimeError(err));
404 }
405 projected.insert_named(item.output, item.name.clone(), value);
406 }
407 out.push(projected);
408 }
409 }
410
411 Ok(if op.distinct {
412 dedup_rows_by_vars(out)
413 } else {
414 out
415 })
416 }
417
418 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
419 match value {
420 LoraValue::Node(id) => self.hydrate_node(id),
421 LoraValue::Relationship(id) => self.hydrate_relationship(id),
422 LoraValue::List(values) => {
423 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
424 }
425 LoraValue::Map(map) => LoraValue::Map(
426 map.into_iter()
427 .map(|(k, v)| (k, self.hydrate_value(v)))
428 .collect(),
429 ),
430 other => other,
431 }
432 }
433
434 fn hydrate_node(&self, id: u64) -> LoraValue {
435 match self.ctx.storage.node_ref(id) {
436 Some(node) => hydrate_node_record(node),
437 None => LoraValue::Null,
438 }
439 }
440
441 fn hydrate_relationship(&self, id: u64) -> LoraValue {
442 match self.ctx.storage.relationship_ref(id) {
443 Some(rel) => hydrate_relationship_record(rel),
444 None => LoraValue::Null,
445 }
446 }
447
448 fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
449 let input_rows = self.execute_node(plan, op.input)?;
450 let eval_ctx = EvalContext {
451 storage: self.ctx.storage,
452 params: &self.ctx.params,
453 };
454
455 let mut out = Vec::new();
456
457 for row in input_rows {
458 match eval_expr(&op.expr, &row, &eval_ctx) {
459 LoraValue::List(values) => {
460 for value in values {
461 let mut new_row = row.clone();
462 new_row.insert(op.alias, value);
463 out.push(new_row);
464 }
465 }
466 LoraValue::Null => {}
467 other => {
468 let mut new_row = row;
469 new_row.insert(op.alias, other);
470 out.push(new_row);
471 }
472 }
473 }
474
475 Ok(out)
476 }
477
478 fn exec_hash_aggregation(
479 &self,
480 plan: &PhysicalPlan,
481 op: &HashAggregationExec,
482 ) -> ExecResult<Vec<Row>> {
483 let input_rows = self.execute_node(plan, op.input)?;
484 let eval_ctx = EvalContext {
485 storage: self.ctx.storage,
486 params: &self.ctx.params,
487 };
488
489 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
490
491 if op.group_by.is_empty() {
492 groups.insert(Vec::new(), input_rows);
493 } else {
494 for row in input_rows {
495 let key = op
496 .group_by
497 .iter()
498 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
499 .collect::<Vec<_>>();
500
501 groups.entry(key).or_default().push(row);
502 }
503 }
504
505 let mut out = Vec::new();
506
507 for rows in groups.into_values() {
508 let mut result = Row::new();
509
510 if let Some(first) = rows.first() {
511 for proj in &op.group_by {
512 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
513 result.insert_named(proj.output, proj.name.clone(), value);
514 }
515 }
516
517 for proj in &op.aggregates {
518 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
519 result.insert_named(proj.output, proj.name.clone(), value);
520 }
521
522 out.push(result);
523 }
524
525 Ok(out)
526 }
527
528 fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
529 let mut rows = self.execute_node(plan, op.input)?;
530 let eval_ctx = EvalContext {
531 storage: self.ctx.storage,
532 params: &self.ctx.params,
533 };
534
535 rows.sort_by(|a, b| {
536 for item in &op.items {
537 let ord = compare_sort_item(item, a, b, &eval_ctx);
538 if ord != Ordering::Equal {
539 return ord;
540 }
541 }
542 Ordering::Equal
543 });
544
545 Ok(rows)
546 }
547
548 fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
549 let mut rows = self.execute_node(plan, op.input)?;
550 let eval_ctx = EvalContext {
551 storage: self.ctx.storage,
552 params: &self.ctx.params,
553 };
554
555 let limit = op
556 .limit
557 .as_ref()
558 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
559 .unwrap_or(rows.len() as i64)
560 .max(0) as usize;
561
562 let skip = op
563 .skip
564 .as_ref()
565 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
566 .unwrap_or(0)
567 .max(0) as usize;
568
569 if skip >= rows.len() {
570 return Ok(Vec::new());
571 }
572
573 rows.drain(0..skip);
574 rows.truncate(limit);
575 Ok(rows)
576 }
577
578 fn exec_optional_match(
579 &self,
580 plan: &PhysicalPlan,
581 op: &OptionalMatchExec,
582 ) -> ExecResult<Vec<Row>> {
583 let input_rows = self.execute_node(plan, op.input)?;
584
585 let inner_rows = self.execute_node(plan, op.inner)?;
590
591 let mut out = Vec::new();
592
593 for input_row in input_rows {
594 let mut matched = false;
595
596 for inner_row in &inner_rows {
597 let compatible = input_row
599 .iter()
600 .all(|(var, val)| match inner_row.get(*var) {
601 Some(inner_val) => inner_val == val,
602 None => true,
603 });
604 if !compatible {
605 continue;
606 }
607
608 let mut merged = input_row.clone();
609 for (var, name, val) in inner_row.iter_named() {
610 if !merged.contains_key(*var) {
611 merged.insert_named(*var, name.into_owned(), val.clone());
612 }
613 }
614 out.push(merged);
615 matched = true;
616 }
617
618 if !matched {
619 let mut null_row = input_row;
620 for &var_id in &op.new_vars {
621 if !null_row.contains_key(var_id) {
622 null_row.insert(var_id, LoraValue::Null);
623 }
624 }
625 out.push(null_row);
626 }
627 }
628
629 Ok(out)
630 }
631
632 fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
633 let input_rows = self.execute_node(plan, op.input)?;
634 let mut rows: Vec<Row> = input_rows
635 .into_iter()
636 .map(|mut row| {
637 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
638 row.insert(op.output, path);
639 row
640 })
641 .collect();
642
643 if let Some(all) = op.shortest_path_all {
644 rows = filter_shortest_paths(rows, op.output, all);
645 }
646 Ok(rows)
647 }
648}
649
650fn properties_to_value_map(props: &Properties) -> LoraValue {
651 let mut map = BTreeMap::new();
652 for (k, v) in props.iter() {
653 map.insert(k.clone(), LoraValue::from(v));
654 }
655 LoraValue::Map(map)
656}
657
658pub struct MutableExecutionContext<'a, S: GraphStorageMut + ?Sized> {
659 pub storage: &'a mut S,
660 pub params: std::collections::BTreeMap<String, LoraValue>,
661}
662
663pub struct MutableExecutor<'a, S: GraphStorageMut + ?Sized> {
664 ctx: MutableExecutionContext<'a, S>,
665}
666
667impl<'a, S: GraphStorageMut + ?Sized> MutableExecutor<'a, S> {
668 pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
669 Self { ctx }
670 }
671
672 pub fn execute(
673 &mut self,
674 plan: &PhysicalPlan,
675 options: Option<ExecuteOptions>,
676 ) -> ExecResult<QueryResult> {
677 let _ = take_eval_error();
680
681 let rows = self.execute_node(plan, plan.root)?;
682 let rows = rows
683 .into_iter()
684 .map(|row| self.hydrate_row(row))
685 .collect::<Vec<_>>();
686 Ok(project_rows(rows, options.unwrap_or_default()))
687 }
688
689 pub fn execute_compiled(
691 &mut self,
692 compiled: &CompiledQuery,
693 options: Option<ExecuteOptions>,
694 ) -> ExecResult<QueryResult> {
695 if compiled.unions.is_empty() {
696 return self.execute(&compiled.physical, options);
697 }
698
699 let _ = take_eval_error();
700
701 let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
703
704 let mut needs_dedup = false;
707
708 for branch in &compiled.unions {
709 let branch_rows = self.execute_and_hydrate(&branch.physical)?;
710 all_rows.extend(branch_rows);
711
712 if !branch.all {
713 needs_dedup = true;
714 }
715 }
716
717 if needs_dedup {
718 all_rows = dedup_rows(all_rows);
719 }
720
721 Ok(project_rows(all_rows, options.unwrap_or_default()))
722 }
723
724 fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
725 let rows = self.execute_node(plan, plan.root)?;
726 Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
727 }
728
729 fn hydrate_row(&self, row: Row) -> Row {
730 let mut out = Row::new();
731
732 for (var, name, value) in row.into_iter_named() {
733 out.insert_named(var, name, self.hydrate_value(value));
734 }
735
736 out
737 }
738
739 fn execute_node(
740 &mut self,
741 plan: &PhysicalPlan,
742 node_id: PhysicalNodeId,
743 ) -> ExecResult<Vec<Row>> {
744 trace!("mutable execute_node start: node_id={node_id:?}");
745
746 let result = match &plan.nodes[node_id] {
747 PhysicalOp::Argument(op) => self.exec_argument(op),
748 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
749 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
750 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
751 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
752 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
753 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
754 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
755 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
756 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
757 PhysicalOp::Create(op) => self.exec_create(plan, op),
758 PhysicalOp::Merge(op) => self.exec_merge(plan, op),
759 PhysicalOp::Delete(op) => self.exec_delete(plan, op),
760 PhysicalOp::Set(op) => self.exec_set(plan, op),
761 PhysicalOp::Remove(op) => self.exec_remove(plan, op),
762 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
763 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
764 };
765
766 match &result {
767 Ok(rows) => trace!(
768 "mutable execute_node ok: node_id={node_id:?}, rows={}",
769 rows.len()
770 ),
771 Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
772 }
773
774 result
775 }
776
777 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
778 Ok(vec![Row::new()])
779 }
780
781 fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
782 let base_rows = match op.input {
783 Some(input) => self.execute_node(plan, input)?,
784 None => vec![Row::new()],
785 };
786
787 let node_ids = self.ctx.storage.all_node_ids();
788 let mut out = Vec::new();
789
790 for row in base_rows {
791 if let Some(existing) = row.get(op.var) {
792 match existing {
793 LoraValue::Node(existing_id) => {
794 if self.ctx.storage.has_node(*existing_id) {
795 out.push(row);
796 }
797 }
798 other => {
799 return Err(ExecutorError::ExpectedNodeForExpand {
800 var: format!("{:?}", op.var),
801 found: value_kind(other),
802 });
803 }
804 }
805 continue;
806 }
807
808 for &id in &node_ids {
809 let mut new_row = row.clone();
810 new_row.insert(op.var, LoraValue::Node(id));
811 out.push(new_row);
812 }
813 }
814
815 Ok(out)
816 }
817
818 fn exec_node_by_label_scan(
819 &mut self,
820 plan: &PhysicalPlan,
821 op: &NodeByLabelScanExec,
822 ) -> ExecResult<Vec<Row>> {
823 let base_rows = match op.input {
824 Some(input) => self.execute_node(plan, input)?,
825 None => vec![Row::new()],
826 };
827
828 let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
829 let mut out = Vec::new();
830
831 for row in base_rows {
832 if let Some(existing) = row.get(op.var) {
833 match existing {
834 LoraValue::Node(existing_id) => {
835 let labels_ok = self
836 .ctx
837 .storage
838 .node_ref(*existing_id)
839 .map(|n| node_matches_label_groups(&n.labels, &op.labels))
840 .unwrap_or(false);
841 if labels_ok {
842 out.push(row);
843 }
844 }
845 other => {
846 return Err(ExecutorError::ExpectedNodeForExpand {
847 var: format!("{:?}", op.var),
848 found: value_kind(other),
849 });
850 }
851 }
852 continue;
853 }
854
855 for &id in &candidate_ids {
856 let labels_ok = self
857 .ctx
858 .storage
859 .node_ref(id)
860 .map(|n| node_matches_label_groups(&n.labels, &op.labels))
861 .unwrap_or(false);
862 if !labels_ok {
863 continue;
864 }
865 let mut new_row = row.clone();
866 new_row.insert(op.var, LoraValue::Node(id));
867 out.push(new_row);
868 }
869 }
870
871 Ok(out)
872 }
873
874 fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
875 if let Some(range) = &op.range {
877 return self.exec_expand_var_len(plan, op, range);
878 }
879
880 let input_rows = self.execute_node(plan, op.input)?;
881 let mut out = Vec::new();
882
883 for row in input_rows {
884 let src_node_id = match row.get(op.src) {
885 Some(LoraValue::Node(id)) => *id,
886 Some(other) => {
887 return Err(ExecutorError::ExpectedNodeForExpand {
888 var: format!("{:?}", op.src),
889 found: value_kind(other),
890 });
891 }
892 None => continue,
893 };
894
895 for (rel_id, dst_id) in
896 self.ctx
897 .storage
898 .expand_ids(src_node_id, op.direction, &op.types)
899 {
900 if let Some(expr) = op.rel_properties.as_ref() {
901 let matches = match self.ctx.storage.relationship_ref(rel_id) {
902 Some(rel) => {
903 self.relationship_matches_properties(&rel.properties, Some(expr), &row)?
904 }
905 None => false,
906 };
907 if !matches {
908 continue;
909 }
910 }
911
912 if let Some(existing_dst) = row.get(op.dst) {
913 match existing_dst {
914 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
915 LoraValue::Node(_) => continue,
916 other => {
917 return Err(ExecutorError::ExpectedNodeForExpand {
918 var: format!("{:?}", op.dst),
919 found: value_kind(other),
920 });
921 }
922 }
923 }
924
925 if let Some(rel_var) = op.rel {
926 if let Some(existing_rel) = row.get(rel_var) {
927 match existing_rel {
928 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
929 LoraValue::Relationship(_) => continue,
930 other => {
931 return Err(ExecutorError::ExpectedRelationshipForExpand {
932 var: format!("{:?}", rel_var),
933 found: value_kind(other),
934 });
935 }
936 }
937 }
938 }
939
940 let mut new_row = row.clone();
941
942 if !new_row.contains_key(op.dst) {
943 new_row.insert(op.dst, LoraValue::Node(dst_id));
944 }
945
946 if let Some(rel_var) = op.rel {
947 if !new_row.contains_key(rel_var) {
948 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
949 }
950 }
951
952 out.push(new_row);
953 }
954 }
955
956 Ok(out)
957 }
958
959 fn exec_expand_var_len(
960 &mut self,
961 plan: &PhysicalPlan,
962 op: &ExpandExec,
963 range: &RangeLiteral,
964 ) -> ExecResult<Vec<Row>> {
965 let input_rows = self.execute_node(plan, op.input)?;
966 let (min_hops, max_hops) = resolve_range(range);
967 let mut out = Vec::new();
968
969 for row in input_rows {
970 let src_node_id = match row.get(op.src) {
971 Some(LoraValue::Node(id)) => *id,
972 Some(other) => {
973 return Err(ExecutorError::ExpectedNodeForExpand {
974 var: format!("{:?}", op.src),
975 found: value_kind(other),
976 });
977 }
978 None => continue,
979 };
980
981 let expansions = variable_length_expand(
982 &*self.ctx.storage,
983 src_node_id,
984 op.direction,
985 &op.types,
986 min_hops,
987 max_hops,
988 );
989
990 for result in expansions {
991 let mut new_row = row.clone();
992 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
993
994 if let Some(rel_var) = op.rel {
995 let rel_list = LoraValue::List(
997 result
998 .rel_ids
999 .into_iter()
1000 .map(LoraValue::Relationship)
1001 .collect(),
1002 );
1003 new_row.insert(rel_var, rel_list);
1004 }
1005
1006 out.push(new_row);
1007 }
1008 }
1009
1010 Ok(out)
1011 }
1012
1013 fn relationship_matches_properties(
1014 &self,
1015 actual: &Properties,
1016 expected_expr: Option<&ResolvedExpr>,
1017 row: &Row,
1018 ) -> ExecResult<bool> {
1019 let Some(expr) = expected_expr else {
1020 return Ok(true);
1021 };
1022
1023 let eval_ctx = EvalContext {
1024 storage: &*self.ctx.storage,
1025 params: &self.ctx.params,
1026 };
1027
1028 let expected = eval_expr(expr, row, &eval_ctx);
1029
1030 let LoraValue::Map(expected_map) = expected else {
1031 return Err(ExecutorError::ExpectedPropertyMap {
1032 found: value_kind(&expected),
1033 });
1034 };
1035
1036 Ok(expected_map.iter().all(|(key, expected_value)| {
1037 actual
1038 .get(key)
1039 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1040 .unwrap_or(false)
1041 }))
1042 }
1043
1044 fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1045 let input_rows = self.execute_node(plan, op.input)?;
1046 let eval_ctx = EvalContext {
1047 storage: &*self.ctx.storage,
1048 params: &self.ctx.params,
1049 };
1050
1051 Ok(input_rows
1052 .into_iter()
1053 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1054 .collect())
1055 }
1056
1057 fn exec_projection(
1058 &mut self,
1059 plan: &PhysicalPlan,
1060 op: &ProjectionExec,
1061 ) -> ExecResult<Vec<Row>> {
1062 let input_rows = self.execute_node(plan, op.input)?;
1063 let eval_ctx = EvalContext {
1064 storage: &*self.ctx.storage,
1065 params: &self.ctx.params,
1066 };
1067
1068 let mut out = Vec::with_capacity(input_rows.len());
1069
1070 for row in input_rows {
1071 if op.include_existing {
1072 let mut projected = row;
1073 for item in &op.items {
1074 let value = eval_expr(&item.expr, &projected, &eval_ctx);
1075 if let Some(err) = take_eval_error() {
1076 return Err(ExecutorError::RuntimeError(err));
1077 }
1078 projected.insert_named(item.output, item.name.clone(), value);
1079 }
1080 out.push(projected);
1081 } else {
1082 let mut projected = Row::new();
1083 for item in &op.items {
1084 let value = eval_expr(&item.expr, &row, &eval_ctx);
1085 if let Some(err) = take_eval_error() {
1086 return Err(ExecutorError::RuntimeError(err));
1087 }
1088 projected.insert_named(item.output, item.name.clone(), value);
1089 }
1090 out.push(projected);
1091 }
1092 }
1093
1094 Ok(if op.distinct {
1095 dedup_rows_by_vars(out)
1096 } else {
1097 out
1098 })
1099 }
1100
1101 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1102 match value {
1103 LoraValue::Node(id) => self.hydrate_node(id),
1104 LoraValue::Relationship(id) => self.hydrate_relationship(id),
1105 LoraValue::List(values) => {
1106 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1107 }
1108 LoraValue::Map(map) => LoraValue::Map(
1109 map.into_iter()
1110 .map(|(k, v)| (k, self.hydrate_value(v)))
1111 .collect(),
1112 ),
1113 other => other,
1114 }
1115 }
1116
1117 fn hydrate_node(&self, id: u64) -> LoraValue {
1118 match self.ctx.storage.node_ref(id) {
1119 Some(node) => hydrate_node_record(node),
1120 None => LoraValue::Null,
1121 }
1122 }
1123
1124 fn hydrate_relationship(&self, id: u64) -> LoraValue {
1125 match self.ctx.storage.relationship_ref(id) {
1126 Some(rel) => hydrate_relationship_record(rel),
1127 None => LoraValue::Null,
1128 }
1129 }
1130
1131 fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1132 let input_rows = self.execute_node(plan, op.input)?;
1133 let eval_ctx = EvalContext {
1134 storage: &*self.ctx.storage,
1135 params: &self.ctx.params,
1136 };
1137
1138 let mut out = Vec::new();
1139
1140 for row in input_rows {
1141 match eval_expr(&op.expr, &row, &eval_ctx) {
1142 LoraValue::List(values) => {
1143 for value in values {
1144 let mut new_row = row.clone();
1145 new_row.insert(op.alias, value);
1146 out.push(new_row);
1147 }
1148 }
1149 LoraValue::Null => {}
1150 other => {
1151 let mut new_row = row;
1152 new_row.insert(op.alias, other);
1153 out.push(new_row);
1154 }
1155 }
1156 }
1157
1158 Ok(out)
1159 }
1160
1161 fn exec_hash_aggregation(
1162 &mut self,
1163 plan: &PhysicalPlan,
1164 op: &HashAggregationExec,
1165 ) -> ExecResult<Vec<Row>> {
1166 let input_rows = self.execute_node(plan, op.input)?;
1167 let eval_ctx = EvalContext {
1168 storage: &*self.ctx.storage,
1169 params: &self.ctx.params,
1170 };
1171
1172 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1173
1174 if op.group_by.is_empty() {
1175 groups.insert(Vec::new(), input_rows);
1176 } else {
1177 for row in input_rows {
1178 let key = op
1179 .group_by
1180 .iter()
1181 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1182 .collect::<Vec<_>>();
1183
1184 groups.entry(key).or_default().push(row);
1185 }
1186 }
1187
1188 let mut out = Vec::new();
1189
1190 for rows in groups.into_values() {
1191 let mut result = Row::new();
1192
1193 if let Some(first) = rows.first() {
1194 for proj in &op.group_by {
1195 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1196 result.insert_named(proj.output, proj.name.clone(), value);
1197 }
1198 }
1199
1200 for proj in &op.aggregates {
1201 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1202 result.insert_named(proj.output, proj.name.clone(), value);
1203 }
1204
1205 out.push(result);
1206 }
1207
1208 Ok(out)
1209 }
1210
1211 fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1212 let mut rows = self.execute_node(plan, op.input)?;
1213 let eval_ctx = EvalContext {
1214 storage: &*self.ctx.storage,
1215 params: &self.ctx.params,
1216 };
1217
1218 rows.sort_by(|a, b| {
1219 for item in &op.items {
1220 let ord = compare_sort_item(item, a, b, &eval_ctx);
1221 if ord != Ordering::Equal {
1222 return ord;
1223 }
1224 }
1225 Ordering::Equal
1226 });
1227
1228 Ok(rows)
1229 }
1230
1231 fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1232 let mut rows = self.execute_node(plan, op.input)?;
1233 let eval_ctx = EvalContext {
1234 storage: &*self.ctx.storage,
1235 params: &self.ctx.params,
1236 };
1237
1238 let limit = op
1239 .limit
1240 .as_ref()
1241 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1242 .unwrap_or(rows.len() as i64)
1243 .max(0) as usize;
1244
1245 let skip = op
1246 .skip
1247 .as_ref()
1248 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1249 .unwrap_or(0)
1250 .max(0) as usize;
1251
1252 if skip >= rows.len() {
1253 return Ok(Vec::new());
1254 }
1255
1256 rows.drain(0..skip);
1257 rows.truncate(limit);
1258 Ok(rows)
1259 }
1260
1261 fn exec_optional_match(
1262 &mut self,
1263 plan: &PhysicalPlan,
1264 op: &OptionalMatchExec,
1265 ) -> ExecResult<Vec<Row>> {
1266 let input_rows = self.execute_node(plan, op.input)?;
1267
1268 let inner_rows = self.execute_node(plan, op.inner)?;
1270
1271 let mut out = Vec::new();
1272
1273 for input_row in input_rows {
1274 let mut matched = false;
1275
1276 for inner_row in &inner_rows {
1277 let compatible = input_row
1278 .iter()
1279 .all(|(var, val)| match inner_row.get(*var) {
1280 Some(inner_val) => inner_val == val,
1281 None => true,
1282 });
1283 if !compatible {
1284 continue;
1285 }
1286
1287 let mut merged = input_row.clone();
1288 for (var, name, val) in inner_row.iter_named() {
1289 if !merged.contains_key(*var) {
1290 merged.insert_named(*var, name.into_owned(), val.clone());
1291 }
1292 }
1293 out.push(merged);
1294 matched = true;
1295 }
1296
1297 if !matched {
1298 let mut null_row = input_row;
1299 for &var_id in &op.new_vars {
1300 if !null_row.contains_key(var_id) {
1301 null_row.insert(var_id, LoraValue::Null);
1302 }
1303 }
1304 out.push(null_row);
1305 }
1306 }
1307
1308 Ok(out)
1309 }
1310
1311 fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1312 let input_rows = self.execute_node(plan, op.input)?;
1313 let mut rows: Vec<Row> = input_rows
1314 .into_iter()
1315 .map(|mut row| {
1316 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1317 row.insert(op.output, path);
1318 row
1319 })
1320 .collect();
1321
1322 if let Some(all) = op.shortest_path_all {
1323 rows = filter_shortest_paths(rows, op.output, all);
1324 }
1325 Ok(rows)
1326 }
1327
1328 fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1329 let input_rows = self.execute_node(plan, op.input)?;
1330 let mut out = Vec::with_capacity(input_rows.len());
1331
1332 for mut row in input_rows {
1333 self.apply_create_pattern(&mut row, &op.pattern)?;
1334 out.push(row);
1335 }
1336
1337 Ok(out)
1338 }
1339
1340 fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1341 match item {
1342 ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1343 Some(LoraValue::Node(node_id)) => {
1344 let node_id = *node_id;
1345 for label in labels {
1346 self.ctx.storage.remove_node_label(node_id, label);
1347 }
1348 Ok(())
1349 }
1350 Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1351 found: value_kind(other),
1352 }),
1353 None => Err(ExecutorError::UnboundVariableForRemove {
1354 var: format!("{variable:?}"),
1355 }),
1356 },
1357
1358 ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1359 }
1360 }
1361
1362 fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1363 match value {
1364 LoraValue::Null => Ok(()),
1365
1366 LoraValue::Node(node_id) => {
1367 if detach {
1368 self.ctx.storage.detach_delete_node(node_id);
1369 Ok(())
1370 } else {
1371 let ok = self.ctx.storage.delete_node(node_id);
1372 if ok {
1373 Ok(())
1374 } else {
1375 Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1376 }
1377 }
1378 }
1379
1380 LoraValue::Relationship(rel_id) => {
1381 let ok = self.ctx.storage.delete_relationship(rel_id);
1382 if ok {
1383 Ok(())
1384 } else {
1385 Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1386 }
1387 }
1388
1389 LoraValue::List(values) => {
1390 for v in values {
1391 self.delete_value(v, detach)?;
1392 }
1393 Ok(())
1394 }
1395
1396 other => Err(ExecutorError::InvalidDeleteTarget {
1397 found: value_kind(&other),
1398 }),
1399 }
1400 }
1401
1402 fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1403 let input_rows = self.execute_node(plan, op.input)?;
1404 let mut out = Vec::with_capacity(input_rows.len());
1405
1406 for mut row in input_rows {
1407 let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1409
1410 let matched = if already_bound {
1411 true
1412 } else {
1413 self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1415 };
1416
1417 if !matched {
1418 self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1419 }
1420
1421 for action in &op.actions {
1422 if action.on_match == matched {
1423 for item in &action.set.items {
1424 self.apply_set_item(&row, item)?;
1425 }
1426 }
1427 }
1428
1429 out.push(row);
1430 }
1431
1432 Ok(out)
1433 }
1434
1435 fn try_match_merge_pattern(
1438 &self,
1439 row: &mut Row,
1440 part: &ResolvedPatternPart,
1441 ) -> ExecResult<bool> {
1442 match &part.element {
1443 ResolvedPatternElement::Node {
1444 var,
1445 labels,
1446 properties,
1447 } => {
1448 let candidate_ids = if labels.is_empty() {
1451 self.ctx.storage.all_node_ids()
1452 } else {
1453 scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1454 };
1455
1456 let eval_ctx = EvalContext {
1458 storage: &*self.ctx.storage,
1459 params: &self.ctx.params,
1460 };
1461 let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1462
1463 for id in candidate_ids {
1464 let Some(node) = self.ctx.storage.node_ref(id) else {
1465 continue;
1466 };
1467 if !node_matches_label_groups(&node.labels, labels) {
1468 continue;
1469 }
1470 if let Some(LoraValue::Map(expected)) = &expected_props {
1471 let all_match = expected.iter().all(|(key, expected_value)| {
1472 node.properties
1473 .get(key)
1474 .map(|actual| value_matches_property_value(expected_value, actual))
1475 .unwrap_or(false)
1476 });
1477 if !all_match {
1478 continue;
1479 }
1480 }
1481
1482 if let Some(var_id) = var {
1484 row.insert(*var_id, LoraValue::Node(node.id));
1485 }
1486 return Ok(true);
1487 }
1488
1489 Ok(false)
1490 }
1491
1492 ResolvedPatternElement::ShortestPath { .. } => {
1493 Ok(false)
1495 }
1496
1497 ResolvedPatternElement::NodeChain { head, chain } => {
1498 let head_node_id = if let Some(var_id) = head.var {
1500 if let Some(LoraValue::Node(id)) = row.get(var_id) {
1501 *id
1502 } else {
1503 let node_matched = self.try_match_merge_pattern(
1505 row,
1506 &ResolvedPatternPart {
1507 binding: None,
1508 element: ResolvedPatternElement::Node {
1509 var: head.var,
1510 labels: head.labels.clone(),
1511 properties: head.properties.clone(),
1512 },
1513 },
1514 )?;
1515 if !node_matched {
1516 return Ok(false);
1517 }
1518 match row.get(var_id) {
1519 Some(LoraValue::Node(id)) => *id,
1520 _ => return Ok(false),
1521 }
1522 }
1523 } else {
1524 return Ok(false);
1525 };
1526
1527 let mut current_node_id = head_node_id;
1528
1529 for step in chain {
1530 let eval_ctx = EvalContext {
1531 storage: &*self.ctx.storage,
1532 params: &self.ctx.params,
1533 };
1534
1535 let _ = step.rel.types.first();
1536 let direction = step.rel.direction;
1537
1538 let edges =
1541 self.ctx
1542 .storage
1543 .expand_ids(current_node_id, direction, &step.rel.types);
1544
1545 let mut found = false;
1547 for (rel_id, node_id) in edges {
1548 let Some(node_rec) = self.ctx.storage.node_ref(node_id) else {
1549 continue;
1550 };
1551 let Some(rel_rec) = self.ctx.storage.relationship_ref(rel_id) else {
1552 continue;
1553 };
1554
1555 if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
1557 continue;
1558 }
1559
1560 if let Some(props_expr) = &step.node.properties {
1562 let expected = eval_expr(props_expr, row, &eval_ctx);
1563 if let LoraValue::Map(expected_map) = &expected {
1564 let all_match = expected_map.iter().all(|(key, expected_val)| {
1565 node_rec
1566 .properties
1567 .get(key)
1568 .map(|actual| {
1569 value_matches_property_value(expected_val, actual)
1570 })
1571 .unwrap_or(false)
1572 });
1573 if !all_match {
1574 continue;
1575 }
1576 }
1577 }
1578
1579 if let Some(rel_props_expr) = &step.rel.properties {
1581 let expected = eval_expr(rel_props_expr, row, &eval_ctx);
1582 if let LoraValue::Map(expected_map) = &expected {
1583 let all_match = expected_map.iter().all(|(key, expected_val)| {
1584 rel_rec
1585 .properties
1586 .get(key)
1587 .map(|actual| {
1588 value_matches_property_value(expected_val, actual)
1589 })
1590 .unwrap_or(false)
1591 });
1592 if !all_match {
1593 continue;
1594 }
1595 }
1596 }
1597
1598 if let Some(rel_var) = step.rel.var {
1600 row.insert(rel_var, LoraValue::Relationship(rel_id));
1601 }
1602 if let Some(node_var) = step.node.var {
1603 row.insert(node_var, LoraValue::Node(node_id));
1604 }
1605 current_node_id = node_id;
1606 found = true;
1607 break;
1608 }
1609
1610 if !found {
1611 return Ok(false);
1612 }
1613 }
1614
1615 Ok(true)
1616 }
1617 }
1618 }
1619
1620 fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
1621 let input_rows = self.execute_node(plan, op.input)?;
1622
1623 for row in &input_rows {
1624 for expr in &op.expressions {
1625 let value = {
1626 let eval_ctx = EvalContext {
1627 storage: &*self.ctx.storage,
1628 params: &self.ctx.params,
1629 };
1630 eval_expr(expr, row, &eval_ctx)
1631 };
1632
1633 self.delete_value(value, op.detach)?;
1634 }
1635 }
1636
1637 Ok(input_rows)
1638 }
1639
1640 fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
1641 let input_rows = self.execute_node(plan, op.input)?;
1642
1643 for row in &input_rows {
1644 for item in &op.items {
1645 self.apply_set_item(row, item)?;
1646 }
1647 }
1648
1649 Ok(input_rows)
1650 }
1651
1652 fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
1653 let input_rows = self.execute_node(plan, op.input)?;
1654
1655 for row in &input_rows {
1656 for item in &op.items {
1657 self.apply_remove_item(row, item)?;
1658 }
1659 }
1660
1661 Ok(input_rows)
1662 }
1663
1664 fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
1665 match item {
1666 ResolvedSetItem::SetProperty { target, value } => {
1667 let new_value = {
1668 let eval_ctx = EvalContext {
1669 storage: &*self.ctx.storage,
1670 params: &self.ctx.params,
1671 };
1672 eval_expr(value, row, &eval_ctx)
1673 };
1674
1675 self.set_property_from_expr(row, target, new_value)
1676 }
1677
1678 ResolvedSetItem::SetVariable { variable, value } => {
1679 let entity_ref =
1681 row.get(*variable)
1682 .ok_or(ExecutorError::UnboundVariableForSet {
1683 var: format!("{variable:?}"),
1684 })?;
1685 let entity_target = entity_target_from_value(entity_ref)?;
1686
1687 let new_value = {
1688 let eval_ctx = EvalContext {
1689 storage: &*self.ctx.storage,
1690 params: &self.ctx.params,
1691 };
1692 eval_expr(value, row, &eval_ctx)
1693 };
1694
1695 self.overwrite_entity_target(entity_target, new_value)
1696 }
1697
1698 ResolvedSetItem::MutateVariable { variable, value } => {
1699 let entity_ref =
1700 row.get(*variable)
1701 .ok_or(ExecutorError::UnboundVariableForSet {
1702 var: format!("{variable:?}"),
1703 })?;
1704 let entity_target = entity_target_from_value(entity_ref)?;
1705
1706 let patch = {
1707 let eval_ctx = EvalContext {
1708 storage: &*self.ctx.storage,
1709 params: &self.ctx.params,
1710 };
1711 eval_expr(value, row, &eval_ctx)
1712 };
1713
1714 self.mutate_entity_target(entity_target, patch)
1715 }
1716
1717 ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
1718 Some(LoraValue::Node(node_id)) => {
1719 let node_id = *node_id;
1720 for label in labels {
1721 self.ctx.storage.add_node_label(node_id, label);
1722 }
1723 Ok(())
1724 }
1725 Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
1726 found: value_kind(other),
1727 }),
1728 None => Err(ExecutorError::UnboundVariableForSet {
1729 var: format!("{variable:?}"),
1730 }),
1731 },
1732 }
1733 }
1734
1735 fn set_property_from_expr(
1736 &mut self,
1737 row: &Row,
1738 target_expr: &ResolvedExpr,
1739 new_value: LoraValue,
1740 ) -> ExecResult<()> {
1741 let ResolvedExpr::Property { expr, property } = target_expr else {
1742 return Err(ExecutorError::UnsupportedSetTarget);
1743 };
1744
1745 let owner = {
1746 let eval_ctx = EvalContext {
1747 storage: &*self.ctx.storage,
1748 params: &self.ctx.params,
1749 };
1750 eval_expr(expr, row, &eval_ctx)
1751 };
1752
1753 match owner {
1754 LoraValue::Node(node_id) => {
1755 let prop = lora_value_to_property(new_value)
1756 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1757 self.ctx
1758 .storage
1759 .set_node_property(node_id, property.clone(), prop);
1760 Ok(())
1761 }
1762 LoraValue::Relationship(rel_id) => {
1763 let prop = lora_value_to_property(new_value)
1764 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1765 self.ctx
1766 .storage
1767 .set_relationship_property(rel_id, property.clone(), prop);
1768 Ok(())
1769 }
1770 other => Err(ExecutorError::InvalidSetTarget {
1771 found: value_kind(&other),
1772 }),
1773 }
1774 }
1775
1776 fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
1777 let ResolvedExpr::Property {
1778 expr: owner_expr,
1779 property,
1780 } = expr
1781 else {
1782 return Err(ExecutorError::UnsupportedRemoveTarget);
1783 };
1784
1785 let owner = {
1786 let eval_ctx = EvalContext {
1787 storage: &*self.ctx.storage,
1788 params: &self.ctx.params,
1789 };
1790 eval_expr(owner_expr, row, &eval_ctx)
1791 };
1792
1793 match owner {
1794 LoraValue::Node(node_id) => {
1795 self.ctx.storage.remove_node_property(node_id, property);
1796 Ok(())
1797 }
1798 LoraValue::Relationship(rel_id) => {
1799 self.ctx
1800 .storage
1801 .remove_relationship_property(rel_id, property);
1802 Ok(())
1803 }
1804 other => Err(ExecutorError::InvalidRemoveTarget {
1805 found: value_kind(&other),
1806 }),
1807 }
1808 }
1809
1810 fn overwrite_entity_target(
1811 &mut self,
1812 target: EntityTarget,
1813 new_value: LoraValue,
1814 ) -> ExecResult<()> {
1815 let LoraValue::Map(map) = new_value else {
1816 return Err(ExecutorError::ExpectedPropertyMap {
1817 found: value_kind(&new_value),
1818 });
1819 };
1820
1821 let mut props: Properties = Properties::new();
1822 for (k, v) in map {
1823 let prop = lora_value_to_property(v)
1824 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1825 props.insert(k, prop);
1826 }
1827
1828 match target {
1829 EntityTarget::Node(node_id) => {
1830 self.ctx.storage.replace_node_properties(node_id, props);
1831 }
1832 EntityTarget::Relationship(rel_id) => {
1833 self.ctx
1834 .storage
1835 .replace_relationship_properties(rel_id, props);
1836 }
1837 }
1838 Ok(())
1839 }
1840
1841 fn mutate_entity_target(
1842 &mut self,
1843 target: EntityTarget,
1844 patch_value: LoraValue,
1845 ) -> ExecResult<()> {
1846 let LoraValue::Map(map) = patch_value else {
1847 return Err(ExecutorError::ExpectedPropertyMap {
1848 found: value_kind(&patch_value),
1849 });
1850 };
1851
1852 match target {
1853 EntityTarget::Node(node_id) => {
1854 for (k, v) in map {
1855 let prop = lora_value_to_property(v)
1856 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1857 self.ctx.storage.set_node_property(node_id, k, prop);
1858 }
1859 }
1860 EntityTarget::Relationship(rel_id) => {
1861 for (k, v) in map {
1862 let prop = lora_value_to_property(v)
1863 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1864 self.ctx.storage.set_relationship_property(rel_id, k, prop);
1865 }
1866 }
1867 }
1868 Ok(())
1869 }
1870
1871 fn apply_create_pattern(&mut self, row: &mut Row, pattern: &ResolvedPattern) -> ExecResult<()> {
1872 for part in &pattern.parts {
1873 self.apply_create_pattern_part(row, part)?;
1874 }
1875 Ok(())
1876 }
1877
1878 fn apply_create_pattern_part(
1879 &mut self,
1880 row: &mut Row,
1881 part: &ResolvedPatternPart,
1882 ) -> ExecResult<()> {
1883 if part.binding.is_some() {
1884 trace!("create pattern part has path binding; path materialization not implemented");
1885 }
1886
1887 let _ = self.apply_create_pattern_element(row, &part.element)?;
1888 Ok(())
1889 }
1890
1891 fn apply_create_pattern_element(
1892 &mut self,
1893 row: &mut Row,
1894 element: &ResolvedPatternElement,
1895 ) -> ExecResult<Option<LoraValue>> {
1896 match element {
1897 ResolvedPatternElement::Node {
1898 var,
1899 labels,
1900 properties,
1901 } => {
1902 let node_id =
1903 self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
1904 Ok(Some(LoraValue::Node(node_id)))
1905 }
1906
1907 ResolvedPatternElement::NodeChain { head, chain } => {
1908 let mut current_node_id = self.materialize_node_pattern(
1909 row,
1910 head.var,
1911 &head.labels,
1912 head.properties.as_ref(),
1913 )?;
1914
1915 for link in chain {
1916 let next_node_id = self.materialize_node_pattern(
1917 row,
1918 link.node.var,
1919 &link.node.labels,
1920 link.node.properties.as_ref(),
1921 )?;
1922
1923 let _ = self.materialize_relationship_pattern(
1924 row,
1925 current_node_id,
1926 next_node_id,
1927 &link.rel,
1928 )?;
1929
1930 current_node_id = next_node_id;
1931 }
1932
1933 Ok(Some(LoraValue::Node(current_node_id)))
1934 }
1935
1936 ResolvedPatternElement::ShortestPath { .. } => {
1937 Ok(None)
1939 }
1940 }
1941 }
1942
1943 fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
1944 match &part.element {
1945 ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
1946
1947 ResolvedPatternElement::ShortestPath { .. } => false,
1948
1949 ResolvedPatternElement::NodeChain { head, chain } => {
1950 let head_ok = head.var.and_then(|v| row.get(v)).is_some();
1951
1952 let chain_ok = chain.iter().all(|link| {
1953 let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
1954 let rel_ok = match link.rel.var {
1958 Some(v) => row.get(v).is_some(),
1959 None => false,
1960 };
1961 node_ok && rel_ok
1962 });
1963
1964 head_ok && chain_ok
1965 }
1966 }
1967 }
1968
1969 fn materialize_node_pattern(
1970 &mut self,
1971 row: &mut Row,
1972 var: Option<lora_analyzer::symbols::VarId>,
1973 labels: &[Vec<String>],
1974 properties: Option<&ResolvedExpr>,
1975 ) -> ExecResult<u64> {
1976 if let Some(var_id) = var {
1977 if let Some(LoraValue::Node(id)) = row.get(var_id) {
1978 return Ok(*id);
1979 }
1980 }
1981
1982 let properties = match properties {
1983 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
1984 None => Properties::new(),
1985 };
1986
1987 let flat_labels = flatten_label_groups(labels);
1988 debug!("creating node with labels={flat_labels:?}");
1989 let created = self.ctx.storage.create_node(flat_labels, properties);
1990
1991 if let Some(var_id) = var {
1992 row.insert(var_id, LoraValue::Node(created.id));
1993 }
1994
1995 Ok(created.id)
1996 }
1997
1998 fn materialize_relationship_pattern(
1999 &mut self,
2000 row: &mut Row,
2001 left_node_id: u64,
2002 right_node_id: u64,
2003 rel: &lora_analyzer::ResolvedRel,
2004 ) -> ExecResult<u64> {
2005 if let Some(var_id) = rel.var {
2006 if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2007 let id = *id;
2008 if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2009 let endpoints_match = match rel.direction {
2010 Direction::Right | Direction::Undirected => {
2011 src == left_node_id && dst == right_node_id
2012 }
2013 Direction::Left => src == right_node_id && dst == left_node_id,
2014 };
2015
2016 if endpoints_match {
2017 return Ok(id);
2018 }
2019 }
2020 }
2021 }
2022
2023 if rel.range.is_some() {
2024 return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2025 }
2026
2027 let (src, dst) = match rel.direction {
2028 Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2029 Direction::Left => (right_node_id, left_node_id),
2030 };
2031
2032 let rel_type = rel
2033 .types
2034 .first()
2035 .ok_or(ExecutorError::MissingRelationshipType)?;
2036
2037 if rel_type.is_empty() {
2038 return Err(ExecutorError::MissingRelationshipType);
2039 }
2040
2041 let properties = match rel.properties.as_ref() {
2042 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2043 None => Properties::new(),
2044 };
2045
2046 debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2047
2048 let created = self
2049 .ctx
2050 .storage
2051 .create_relationship(src, dst, rel_type, properties)
2052 .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2053 src,
2054 dst,
2055 rel_type: rel_type.clone(),
2056 })?;
2057
2058 if let Some(var_id) = rel.var {
2059 row.insert(var_id, LoraValue::Relationship(created.id));
2060 }
2061
2062 Ok(created.id)
2063 }
2064}
2065
2066#[derive(Clone, Copy)]
2070enum EntityTarget {
2071 Node(NodeId),
2072 Relationship(u64),
2073}
2074
2075fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2076 match value {
2077 LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2078 LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2079 other => Err(ExecutorError::InvalidSetTarget {
2080 found: value_kind(other),
2081 }),
2082 }
2083}
2084
2085fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2089 let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2090 let mut out = Vec::new();
2091
2092 for row in rows {
2093 let key: Vec<GroupValueKey> = row
2094 .iter()
2095 .map(|(_, val)| GroupValueKey::from_value(val))
2096 .collect();
2097 if seen.insert(key) {
2098 out.push(row);
2099 }
2100 }
2101
2102 out
2103}
2104
2105fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2109 let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2110 let mut out = Vec::new();
2111
2112 for row in rows {
2113 let key: Vec<(String, GroupValueKey)> = row
2114 .iter_named()
2115 .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2116 .collect();
2117 if seen.insert(key) {
2118 out.push(row);
2119 }
2120 }
2121
2122 out
2123}
2124
2125fn eval_properties_expr<S: GraphStorage + ?Sized>(
2126 expr: &ResolvedExpr,
2127 row: &Row,
2128 storage: &S,
2129 params: &std::collections::BTreeMap<String, LoraValue>,
2130) -> ExecResult<Properties> {
2131 let eval_ctx = EvalContext { storage, params };
2132
2133 match eval_expr(expr, row, &eval_ctx) {
2134 LoraValue::Map(map) => {
2135 let mut out = Properties::new();
2136 for (k, v) in map {
2137 let prop = lora_value_to_property(v)
2138 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2139 out.insert(k, prop);
2140 }
2141 Ok(out)
2142 }
2143 other => Err(ExecutorError::ExpectedPropertyMap {
2144 found: value_kind(&other),
2145 }),
2146 }
2147}
2148
2149fn compute_aggregate_expr<S: GraphStorage + ?Sized>(
2150 expr: &ResolvedExpr,
2151 rows: &[Row],
2152 eval_ctx: &EvalContext<'_, S>,
2153) -> LoraValue {
2154 match expr {
2155 ResolvedExpr::Function {
2156 name,
2157 distinct,
2158 args,
2159 } => {
2160 let func = name.to_ascii_lowercase();
2161
2162 match func.as_str() {
2163 "count" => {
2164 if args.is_empty() {
2165 return LoraValue::Int(rows.len() as i64);
2166 }
2167
2168 let mut values = rows
2169 .iter()
2170 .map(|r| eval_expr(&args[0], r, eval_ctx))
2171 .filter(|v| !matches!(v, LoraValue::Null))
2172 .collect::<Vec<_>>();
2173
2174 if *distinct {
2175 values = dedup_values(values);
2176 }
2177
2178 LoraValue::Int(values.len() as i64)
2179 }
2180
2181 "collect" => {
2182 if args.is_empty() {
2183 return LoraValue::List(Vec::new());
2184 }
2185
2186 let mut values = rows
2187 .iter()
2188 .map(|r| eval_expr(&args[0], r, eval_ctx))
2189 .collect::<Vec<_>>();
2190
2191 if *distinct {
2192 values = dedup_values(values);
2193 }
2194
2195 LoraValue::List(values)
2196 }
2197
2198 "sum" => {
2199 if args.is_empty() {
2200 return LoraValue::Null;
2201 }
2202
2203 let mut values = rows
2204 .iter()
2205 .map(|r| eval_expr(&args[0], r, eval_ctx))
2206 .collect::<Vec<_>>();
2207
2208 if *distinct {
2209 values = dedup_values(values);
2210 }
2211
2212 let nums = values
2213 .into_iter()
2214 .filter_map(as_f64_lossy)
2215 .collect::<Vec<_>>();
2216
2217 if nums.is_empty() {
2218 LoraValue::Null
2219 } else if nums.iter().all(|n| n.fract() == 0.0) {
2220 LoraValue::Int(nums.iter().sum::<f64>() as i64)
2221 } else {
2222 LoraValue::Float(nums.iter().sum::<f64>())
2223 }
2224 }
2225
2226 "avg" => {
2227 if args.is_empty() {
2228 return LoraValue::Null;
2229 }
2230
2231 let mut values = rows
2232 .iter()
2233 .map(|r| eval_expr(&args[0], r, eval_ctx))
2234 .collect::<Vec<_>>();
2235
2236 if *distinct {
2237 values = dedup_values(values);
2238 }
2239
2240 let nums = values
2241 .into_iter()
2242 .filter_map(as_f64_lossy)
2243 .collect::<Vec<_>>();
2244
2245 if nums.is_empty() {
2246 LoraValue::Null
2247 } else {
2248 LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2249 }
2250 }
2251
2252 "min" => {
2253 if args.is_empty() {
2254 return LoraValue::Null;
2255 }
2256
2257 let mut values = rows
2258 .iter()
2259 .map(|r| eval_expr(&args[0], r, eval_ctx))
2260 .filter(|v| !matches!(v, LoraValue::Null))
2261 .collect::<Vec<_>>();
2262
2263 if *distinct {
2264 values = dedup_values(values);
2265 }
2266
2267 values
2268 .into_iter()
2269 .min_by(compare_values_total)
2270 .unwrap_or(LoraValue::Null)
2271 }
2272
2273 "max" => {
2274 if args.is_empty() {
2275 return LoraValue::Null;
2276 }
2277
2278 let mut values = rows
2279 .iter()
2280 .map(|r| eval_expr(&args[0], r, eval_ctx))
2281 .filter(|v| !matches!(v, LoraValue::Null))
2282 .collect::<Vec<_>>();
2283
2284 if *distinct {
2285 values = dedup_values(values);
2286 }
2287
2288 values
2289 .into_iter()
2290 .max_by(compare_values_total)
2291 .unwrap_or(LoraValue::Null)
2292 }
2293
2294 "stdev" | "stdevp" => {
2295 if args.is_empty() {
2296 return LoraValue::Null;
2297 }
2298
2299 let nums: Vec<f64> = rows
2300 .iter()
2301 .map(|r| eval_expr(&args[0], r, eval_ctx))
2302 .filter_map(as_f64_lossy)
2303 .collect();
2304
2305 let is_population = func == "stdevp";
2306
2307 if nums.is_empty() || (!is_population && nums.len() < 2) {
2308 return LoraValue::Float(0.0);
2309 }
2310
2311 let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2312 let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2313 let denom = if is_population {
2314 nums.len() as f64
2315 } else {
2316 (nums.len() - 1) as f64
2317 };
2318 LoraValue::Float((variance_sum / denom).sqrt())
2319 }
2320
2321 "percentilecont" => {
2322 if args.len() < 2 {
2323 return LoraValue::Null;
2324 }
2325
2326 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2327 .as_f64()
2328 .unwrap_or(0.5);
2329 let mut nums: Vec<f64> = rows
2330 .iter()
2331 .map(|r| eval_expr(&args[0], r, eval_ctx))
2332 .filter_map(as_f64_lossy)
2333 .collect();
2334
2335 if nums.is_empty() {
2336 return LoraValue::Null;
2337 }
2338
2339 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2340
2341 let index = percentile * (nums.len() - 1) as f64;
2342 let lower = index.floor() as usize;
2343 let upper = index.ceil() as usize;
2344 let fraction = index - lower as f64;
2345
2346 if lower == upper || upper >= nums.len() {
2347 LoraValue::Float(nums[lower])
2348 } else {
2349 LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2350 }
2351 }
2352
2353 "percentiledisc" => {
2354 if args.len() < 2 {
2355 return LoraValue::Null;
2356 }
2357
2358 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2359 .as_f64()
2360 .unwrap_or(0.5);
2361 let mut nums: Vec<f64> = rows
2362 .iter()
2363 .map(|r| eval_expr(&args[0], r, eval_ctx))
2364 .filter_map(as_f64_lossy)
2365 .collect();
2366
2367 if nums.is_empty() {
2368 return LoraValue::Null;
2369 }
2370
2371 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2372
2373 let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2374 let index = index.min(nums.len() - 1);
2375 LoraValue::Float(nums[index])
2376 }
2377
2378 _ => rows
2379 .first()
2380 .map(|r| eval_expr(expr, r, eval_ctx))
2381 .unwrap_or(LoraValue::Null),
2382 }
2383 }
2384
2385 _ => rows
2386 .first()
2387 .map(|r| eval_expr(expr, r, eval_ctx))
2388 .unwrap_or(LoraValue::Null),
2389 }
2390}
2391
2392fn compare_sort_item<S: GraphStorage + ?Sized>(
2393 item: &ResolvedSortItem,
2394 a: &Row,
2395 b: &Row,
2396 eval_ctx: &EvalContext<'_, S>,
2397) -> Ordering {
2398 let av = eval_expr(&item.expr, a, eval_ctx);
2399 let bv = eval_expr(&item.expr, b, eval_ctx);
2400
2401 let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2402 compare_values_for_sort(&av, &bv, ascending)
2403}
2404
2405fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2406 let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2407 let mut out = Vec::new();
2408
2409 for value in values {
2410 let key = GroupValueKey::from_value(&value);
2411 if seen.insert(key) {
2412 out.push(value);
2413 }
2414 }
2415
2416 out
2417}
2418
2419fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2420 match v {
2421 LoraValue::Int(i) => Some(i as f64),
2422 LoraValue::Float(f) => Some(f),
2423 _ => None,
2424 }
2425}
2426
2427fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2428 let ord = match (a, b) {
2429 (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2430 (LoraValue::Null, _) => Ordering::Greater,
2431 (_, LoraValue::Null) => Ordering::Less,
2432 _ => compare_values_total(a, b),
2433 };
2434
2435 if ascending {
2436 ord
2437 } else {
2438 ord.reverse()
2439 }
2440}
2441
2442fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
2443 use LoraValue::*;
2444
2445 match (a, b) {
2446 (Bool(x), Bool(y)) => x.cmp(y),
2447 (Int(x), Int(y)) => x.cmp(y),
2448 (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
2449 (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
2450 (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
2451 (String(x), String(y)) => x.cmp(y),
2452 (Node(x), Node(y)) => x.cmp(y),
2453 (Relationship(x), Relationship(y)) => x.cmp(y),
2454 (Date(x), Date(y)) => x.cmp(y),
2455 (DateTime(x), DateTime(y)) => x.cmp(y),
2456 (Duration(x), Duration(y)) => x.cmp(y),
2457 (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
2458 _ => type_rank(a)
2459 .cmp(&type_rank(b))
2460 .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
2461 }
2462}
2463
2464pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
2465 match (expected, actual) {
2466 (LoraValue::Null, PropertyValue::Null) => true,
2467 (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
2468 (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
2469 (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
2470 (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
2471 (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
2472 (LoraValue::String(a), PropertyValue::String(b)) => a == b,
2473
2474 (LoraValue::List(xs), PropertyValue::List(ys)) => {
2475 xs.len() == ys.len()
2476 && xs
2477 .iter()
2478 .zip(ys.iter())
2479 .all(|(x, y)| value_matches_property_value(x, y))
2480 }
2481
2482 (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
2483 ym.get(k)
2484 .map(|yv| value_matches_property_value(xv, yv))
2485 .unwrap_or(false)
2486 }),
2487
2488 (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
2489 (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
2490 (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
2491 (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
2492 (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
2493 (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
2494 (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
2495 (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
2496
2497 _ => false,
2498 }
2499}
2500
2501fn build_path_value<S: GraphStorage + ?Sized>(
2507 row: &Row,
2508 node_vars: &[VarId],
2509 rel_vars: &[VarId],
2510 storage: &S,
2511) -> LoraValue {
2512 let mut raw_nodes = Vec::new();
2513 let mut rels = Vec::new();
2514 let mut has_var_len = false;
2515
2516 for &nv in node_vars {
2517 match row.get(nv) {
2518 Some(LoraValue::Node(id)) => raw_nodes.push(*id),
2519 Some(LoraValue::List(items)) => {
2520 for item in items {
2521 if let LoraValue::Node(id) = item {
2522 raw_nodes.push(*id);
2523 }
2524 }
2525 }
2526 _ => {}
2527 }
2528 }
2529
2530 for &rv in rel_vars {
2531 match row.get(rv) {
2532 Some(LoraValue::Relationship(id)) => rels.push(*id),
2533 Some(LoraValue::List(items)) => {
2534 has_var_len = true;
2535 for item in items {
2536 if let LoraValue::Relationship(id) = item {
2537 rels.push(*id);
2538 }
2539 }
2540 }
2541 _ => {}
2542 }
2543 }
2544
2545 let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
2549 let start = raw_nodes[0];
2550 let mut ordered = Vec::with_capacity(rels.len() + 1);
2551 ordered.push(start);
2552 let mut current = start;
2553 for &rel_id in &rels {
2554 if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
2555 let next = if src == current { dst } else { src };
2556 ordered.push(next);
2557 current = next;
2558 }
2559 }
2560 ordered
2561 } else {
2562 raw_nodes
2563 };
2564
2565 LoraValue::Path(LoraPath { nodes, rels })
2566}
2567
2568fn type_rank(v: &LoraValue) -> u8 {
2569 match v {
2570 LoraValue::Null => 0,
2571 LoraValue::Bool(_) => 1,
2572 LoraValue::Int(_) | LoraValue::Float(_) => 2,
2573 LoraValue::String(_) => 3,
2574 LoraValue::Date(_) => 4,
2575 LoraValue::DateTime(_) => 5,
2576 LoraValue::LocalDateTime(_) => 6,
2577 LoraValue::Time(_) => 7,
2578 LoraValue::LocalTime(_) => 8,
2579 LoraValue::Duration(_) => 9,
2580 LoraValue::Point(_) => 10,
2581 LoraValue::Vector(_) => 11,
2582 LoraValue::List(_) => 12,
2583 LoraValue::Map(_) => 13,
2584 LoraValue::Node(_) => 14,
2585 LoraValue::Relationship(_) => 15,
2586 LoraValue::Path(_) => 16,
2587 }
2588}
2589
2590fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
2594 groups
2595 .iter()
2596 .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
2597}
2598
2599fn scan_node_ids_for_label_groups<S: GraphStorage + ?Sized>(
2602 storage: &S,
2603 groups: &[Vec<String>],
2604) -> Vec<lora_store::NodeId> {
2605 if groups.len() == 1 && groups[0].len() == 1 {
2606 storage.node_ids_by_label(&groups[0][0])
2607 } else if groups.len() == 1 && groups[0].len() > 1 {
2608 let mut seen = std::collections::BTreeSet::new();
2609 let mut out = Vec::new();
2610 for label in &groups[0] {
2611 for id in storage.node_ids_by_label(label) {
2612 if seen.insert(id) {
2613 out.push(id);
2614 }
2615 }
2616 }
2617 out
2618 } else {
2619 storage.node_ids_by_label(&groups[0][0])
2620 }
2621}
2622
2623fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
2624 let mut map = BTreeMap::new();
2625 map.insert("kind".to_string(), LoraValue::String("node".to_string()));
2626 map.insert("id".to_string(), LoraValue::Int(node.id as i64));
2627 map.insert(
2628 "labels".to_string(),
2629 LoraValue::List(
2630 node.labels
2631 .iter()
2632 .map(|s| LoraValue::String(s.clone()))
2633 .collect(),
2634 ),
2635 );
2636 map.insert(
2637 "properties".to_string(),
2638 properties_to_value_map(&node.properties),
2639 );
2640 LoraValue::Map(map)
2641}
2642
2643fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
2644 let mut map = BTreeMap::new();
2645 map.insert(
2646 "kind".to_string(),
2647 LoraValue::String("relationship".to_string()),
2648 );
2649 map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
2650 map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
2651 map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
2652 map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
2653 map.insert(
2654 "properties".to_string(),
2655 properties_to_value_map(&rel.properties),
2656 );
2657 LoraValue::Map(map)
2658}
2659
2660fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
2663 groups.iter().flat_map(|g| g.iter().cloned()).collect()
2664}
2665
2666#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2667enum GroupValueKey {
2668 Null,
2669 Bool(bool),
2670 Int(i64),
2671 Float(String),
2672 String(String),
2673 List(Vec<GroupValueKey>),
2674 Map(Vec<(String, GroupValueKey)>),
2675 Node(u64),
2676 Relationship(u64),
2677}
2678
2679impl GroupValueKey {
2680 fn from_value(v: &LoraValue) -> Self {
2681 match v {
2682 LoraValue::Null => Self::Null,
2683 LoraValue::Bool(x) => Self::Bool(*x),
2684 LoraValue::Int(x) => Self::Int(*x),
2685 LoraValue::Float(x) => Self::Float(x.to_string()),
2686 LoraValue::String(x) => Self::String(x.clone()),
2687 LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
2688 LoraValue::Map(m) => Self::Map(
2689 m.iter()
2690 .map(|(k, v)| (k.clone(), Self::from_value(v)))
2691 .collect(),
2692 ),
2693 LoraValue::Node(id) => Self::Node(*id),
2694 LoraValue::Relationship(id) => Self::Relationship(*id),
2695 LoraValue::Path(_) => Self::Null,
2696 LoraValue::Date(d) => Self::String(d.to_string()),
2698 LoraValue::DateTime(dt) => Self::String(dt.to_string()),
2699 LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
2700 LoraValue::Time(t) => Self::String(t.to_string()),
2701 LoraValue::LocalTime(t) => Self::String(t.to_string()),
2702 LoraValue::Duration(dur) => Self::String(dur.to_string()),
2703 LoraValue::Point(p) => Self::String(p.to_string()),
2704 LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
2705 }
2706 }
2707}
2708
2709const MAX_VAR_LEN_HOPS: u64 = 100;
2721
2722fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
2723 let min_hops = range.start.unwrap_or(1);
2724 let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
2725 (min_hops, max_hops)
2726}
2727
2728struct VarLenResult {
2730 dst_node_id: NodeId,
2732 rel_ids: Vec<u64>,
2734}
2735
2736fn variable_length_expand<S: GraphStorage + ?Sized>(
2743 storage: &S,
2744 start_node_id: NodeId,
2745 direction: Direction,
2746 types: &[String],
2747 min_hops: u64,
2748 max_hops: u64,
2749) -> Vec<VarLenResult> {
2750 let mut results = Vec::new();
2751
2752 let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
2754
2755 for depth in 1..=max_hops {
2756 let is_last_hop = depth == max_hops;
2760 let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
2761
2762 for (current_node, rels_used) in &frontier {
2763 for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
2766 if rels_used.contains(&rel_id) {
2769 continue;
2770 }
2771
2772 if is_last_hop {
2773 if depth >= min_hops {
2776 let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
2777 rel_ids.extend_from_slice(rels_used);
2778 rel_ids.push(rel_id);
2779 results.push(VarLenResult {
2780 dst_node_id: neighbor_id,
2781 rel_ids,
2782 });
2783 }
2784 continue;
2785 }
2786
2787 let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
2788 new_rels.extend_from_slice(rels_used);
2789 new_rels.push(rel_id);
2790
2791 if depth >= min_hops {
2792 results.push(VarLenResult {
2793 dst_node_id: neighbor_id,
2794 rel_ids: new_rels.clone(),
2795 });
2796 }
2797
2798 next_frontier.push((neighbor_id, new_rels));
2799 }
2800 }
2801
2802 if is_last_hop || next_frontier.is_empty() {
2803 break;
2804 }
2805
2806 frontier = next_frontier;
2807 }
2808
2809 if min_hops == 0 {
2811 results.insert(
2812 0,
2813 VarLenResult {
2814 dst_node_id: start_node_id,
2815 rel_ids: Vec::new(),
2816 },
2817 );
2818 }
2819
2820 results
2821}
2822
2823fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
2826 if rows.is_empty() {
2827 return rows;
2828 }
2829
2830 let lengths: Vec<usize> = rows
2832 .iter()
2833 .map(|row| match row.get(path_var) {
2834 Some(LoraValue::Path(p)) => p.rels.len(),
2835 _ => usize::MAX,
2836 })
2837 .collect();
2838
2839 let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
2840
2841 let mut result: Vec<Row> = rows
2842 .into_iter()
2843 .zip(lengths.iter())
2844 .filter(|(_, len)| **len == min_len)
2845 .map(|(row, _)| row)
2846 .collect();
2847
2848 if !all && result.len() > 1 {
2849 result.truncate(1);
2850 }
2851
2852 result
2853}