1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7 symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8 ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use tracing::{debug, error, trace};
18
19pub struct ExecutionContext<'a, S: GraphStorage> {
20 pub storage: &'a S,
21 pub params: std::collections::BTreeMap<String, LoraValue>,
22}
23
24pub struct Executor<'a, S: GraphStorage> {
25 ctx: ExecutionContext<'a, S>,
26}
27
28impl<'a, S: GraphStorage> Executor<'a, S> {
29 pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
30 Self { ctx }
31 }
32
33 pub fn execute(
34 &self,
35 plan: &PhysicalPlan,
36 options: Option<ExecuteOptions>,
37 ) -> ExecResult<QueryResult> {
38 let _ = take_eval_error();
41
42 let rows = self.execute_node(plan, plan.root)?;
43 let rows = rows
44 .into_iter()
45 .map(|row| self.hydrate_row(row))
46 .collect::<Vec<_>>();
47 Ok(project_rows(rows, options.unwrap_or_default()))
48 }
49
50 fn hydrate_row(&self, row: Row) -> Row {
51 let mut out = Row::new();
52
53 for (var, name, value) in row.into_iter_named() {
54 out.insert_named(var, name, self.hydrate_value(value));
55 }
56
57 out
58 }
59
60 fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
61 trace!("read-only execute_node start: node_id={node_id:?}");
62
63 let result = match &plan.nodes[node_id] {
64 PhysicalOp::Argument(op) => self.exec_argument(op),
65 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
66 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
67 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
68 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
69 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
70 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
71 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
72 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
73 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
74 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
75 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
76 PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
77 PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
78 PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
79 PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
80 PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
81 };
82
83 match &result {
84 Ok(rows) => trace!(
85 "read-only execute_node ok: node_id={node_id:?}, rows={}",
86 rows.len()
87 ),
88 Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
89 }
90
91 result
92 }
93
94 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
95 Ok(vec![Row::new()])
96 }
97
98 fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
99 let base_rows = match op.input {
100 Some(input) => self.execute_node(plan, input)?,
101 None => vec![Row::new()],
102 };
103
104 let node_ids = self.ctx.storage.all_node_ids();
105 let mut out = Vec::new();
106
107 for row in base_rows {
108 if let Some(existing) = row.get(op.var) {
109 match existing {
110 LoraValue::Node(existing_id) => {
111 if self.ctx.storage.has_node(*existing_id) {
112 out.push(row);
113 }
114 }
115 other => {
116 return Err(ExecutorError::ExpectedNodeForExpand {
117 var: format!("{:?}", op.var),
118 found: value_kind(other),
119 });
120 }
121 }
122 continue;
123 }
124
125 for &id in &node_ids {
126 let mut new_row = row.clone();
127 new_row.insert(op.var, LoraValue::Node(id));
128 out.push(new_row);
129 }
130 }
131
132 Ok(out)
133 }
134
135 fn exec_node_by_label_scan(
136 &self,
137 plan: &PhysicalPlan,
138 op: &NodeByLabelScanExec,
139 ) -> ExecResult<Vec<Row>> {
140 let base_rows = match op.input {
141 Some(input) => self.execute_node(plan, input)?,
142 None => vec![Row::new()],
143 };
144
145 let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
146 let mut out = Vec::new();
147
148 for row in base_rows {
149 if let Some(existing) = row.get(op.var) {
150 match existing {
151 LoraValue::Node(existing_id) => {
152 let labels_ok = self
153 .ctx
154 .storage
155 .with_node(*existing_id, |n| {
156 node_matches_label_groups(&n.labels, &op.labels)
157 })
158 .unwrap_or(false);
159 if labels_ok {
160 out.push(row);
161 }
162 }
163 other => {
164 return Err(ExecutorError::ExpectedNodeForExpand {
165 var: format!("{:?}", op.var),
166 found: value_kind(other),
167 });
168 }
169 }
170 continue;
171 }
172
173 for &id in &candidate_ids {
174 let labels_ok = self
175 .ctx
176 .storage
177 .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
178 .unwrap_or(false);
179 if !labels_ok {
180 continue;
181 }
182 let mut new_row = row.clone();
183 new_row.insert(op.var, LoraValue::Node(id));
184 out.push(new_row);
185 }
186 }
187
188 Ok(out)
189 }
190
191 fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
192 if let Some(range) = &op.range {
194 return self.exec_expand_var_len(plan, op, range);
195 }
196
197 let input_rows = self.execute_node(plan, op.input)?;
198 let mut out = Vec::new();
199
200 for row in input_rows {
201 let src_node_id = match row.get(op.src) {
202 Some(LoraValue::Node(id)) => *id,
203 Some(other) => {
204 return Err(ExecutorError::ExpectedNodeForExpand {
205 var: format!("{:?}", op.src),
206 found: value_kind(other),
207 });
208 }
209 None => continue,
210 };
211
212 for (rel_id, dst_id) in
213 self.ctx
214 .storage
215 .expand_ids(src_node_id, op.direction, &op.types)
216 {
217 if let Some(expr) = op.rel_properties.as_ref() {
218 let actual_props = self
219 .ctx
220 .storage
221 .with_relationship(rel_id, |rel| rel.properties.clone());
222 let matches = match actual_props {
223 Some(props) => {
224 self.relationship_matches_properties(&props, Some(expr), &row)?
225 }
226 None => false,
227 };
228 if !matches {
229 continue;
230 }
231 }
232
233 if let Some(existing_dst) = row.get(op.dst) {
234 match existing_dst {
235 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
236 LoraValue::Node(_) => continue,
237 other => {
238 return Err(ExecutorError::ExpectedNodeForExpand {
239 var: format!("{:?}", op.dst),
240 found: value_kind(other),
241 });
242 }
243 }
244 }
245
246 if let Some(rel_var) = op.rel {
247 if let Some(existing_rel) = row.get(rel_var) {
248 match existing_rel {
249 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
250 LoraValue::Relationship(_) => continue,
251 other => {
252 return Err(ExecutorError::ExpectedRelationshipForExpand {
253 var: format!("{:?}", rel_var),
254 found: value_kind(other),
255 });
256 }
257 }
258 }
259 }
260
261 let mut new_row = row.clone();
262
263 if !new_row.contains_key(op.dst) {
264 new_row.insert(op.dst, LoraValue::Node(dst_id));
265 }
266
267 if let Some(rel_var) = op.rel {
268 if !new_row.contains_key(rel_var) {
269 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
270 }
271 }
272
273 out.push(new_row);
274 }
275 }
276
277 Ok(out)
278 }
279
280 fn exec_expand_var_len(
281 &self,
282 plan: &PhysicalPlan,
283 op: &ExpandExec,
284 range: &RangeLiteral,
285 ) -> ExecResult<Vec<Row>> {
286 let input_rows = self.execute_node(plan, op.input)?;
287 let (min_hops, max_hops) = resolve_range(range);
288 let mut out = Vec::new();
289
290 for row in input_rows {
291 let src_node_id = match row.get(op.src) {
292 Some(LoraValue::Node(id)) => *id,
293 Some(other) => {
294 return Err(ExecutorError::ExpectedNodeForExpand {
295 var: format!("{:?}", op.src),
296 found: value_kind(other),
297 });
298 }
299 None => continue,
300 };
301
302 let expansions = variable_length_expand(
303 self.ctx.storage,
304 src_node_id,
305 op.direction,
306 &op.types,
307 min_hops,
308 max_hops,
309 );
310
311 for result in expansions {
312 let mut new_row = row.clone();
313 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
314
315 if let Some(rel_var) = op.rel {
318 let rel_list = LoraValue::List(
320 result
321 .rel_ids
322 .into_iter()
323 .map(LoraValue::Relationship)
324 .collect(),
325 );
326 new_row.insert(rel_var, rel_list);
327 }
328
329 out.push(new_row);
330 }
331 }
332
333 Ok(out)
334 }
335
336 fn relationship_matches_properties(
337 &self,
338 actual: &Properties,
339 expected_expr: Option<&ResolvedExpr>,
340 row: &Row,
341 ) -> ExecResult<bool> {
342 let Some(expr) = expected_expr else {
343 return Ok(true);
344 };
345
346 let eval_ctx = EvalContext {
347 storage: self.ctx.storage,
348 params: &self.ctx.params,
349 };
350
351 let expected = eval_expr(expr, row, &eval_ctx);
352
353 let LoraValue::Map(expected_map) = expected else {
354 return Err(ExecutorError::ExpectedPropertyMap {
355 found: value_kind(&expected),
356 });
357 };
358
359 Ok(expected_map.iter().all(|(key, expected_value)| {
360 actual
361 .get(key)
362 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
363 .unwrap_or(false)
364 }))
365 }
366
367 fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
368 let input_rows = self.execute_node(plan, op.input)?;
369 let eval_ctx = EvalContext {
370 storage: self.ctx.storage,
371 params: &self.ctx.params,
372 };
373
374 Ok(input_rows
375 .into_iter()
376 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
377 .collect())
378 }
379
380 fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
381 let input_rows = self.execute_node(plan, op.input)?;
382 let eval_ctx = EvalContext {
383 storage: self.ctx.storage,
384 params: &self.ctx.params,
385 };
386
387 let mut out = Vec::with_capacity(input_rows.len());
388
389 for row in input_rows {
390 if op.include_existing {
393 let mut projected = row;
394 for item in &op.items {
395 let value = eval_expr(&item.expr, &projected, &eval_ctx);
396 if let Some(err) = take_eval_error() {
397 return Err(ExecutorError::RuntimeError(err));
398 }
399 projected.insert_named(item.output, item.name.clone(), value);
400 }
401 out.push(projected);
402 } else {
403 let mut projected = Row::new();
404 for item in &op.items {
405 let value = eval_expr(&item.expr, &row, &eval_ctx);
406 if let Some(err) = take_eval_error() {
407 return Err(ExecutorError::RuntimeError(err));
408 }
409 projected.insert_named(item.output, item.name.clone(), value);
410 }
411 out.push(projected);
412 }
413 }
414
415 Ok(if op.distinct {
416 dedup_rows_by_vars(out)
417 } else {
418 out
419 })
420 }
421
422 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
423 match value {
424 LoraValue::Node(id) => self.hydrate_node(id),
425 LoraValue::Relationship(id) => self.hydrate_relationship(id),
426 LoraValue::List(values) => {
427 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
428 }
429 LoraValue::Map(map) => LoraValue::Map(
430 map.into_iter()
431 .map(|(k, v)| (k, self.hydrate_value(v)))
432 .collect(),
433 ),
434 other => other,
435 }
436 }
437
438 fn hydrate_node(&self, id: u64) -> LoraValue {
439 self.ctx
440 .storage
441 .with_node(id, hydrate_node_record)
442 .unwrap_or(LoraValue::Null)
443 }
444
445 fn hydrate_relationship(&self, id: u64) -> LoraValue {
446 self.ctx
447 .storage
448 .with_relationship(id, hydrate_relationship_record)
449 .unwrap_or(LoraValue::Null)
450 }
451
452 fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
453 let input_rows = self.execute_node(plan, op.input)?;
454 let eval_ctx = EvalContext {
455 storage: self.ctx.storage,
456 params: &self.ctx.params,
457 };
458
459 let mut out = Vec::new();
460
461 for row in input_rows {
462 match eval_expr(&op.expr, &row, &eval_ctx) {
463 LoraValue::List(values) => {
464 for value in values {
465 let mut new_row = row.clone();
466 new_row.insert(op.alias, value);
467 out.push(new_row);
468 }
469 }
470 LoraValue::Null => {}
471 other => {
472 let mut new_row = row;
473 new_row.insert(op.alias, other);
474 out.push(new_row);
475 }
476 }
477 }
478
479 Ok(out)
480 }
481
482 fn exec_hash_aggregation(
483 &self,
484 plan: &PhysicalPlan,
485 op: &HashAggregationExec,
486 ) -> ExecResult<Vec<Row>> {
487 let input_rows = self.execute_node(plan, op.input)?;
488 let eval_ctx = EvalContext {
489 storage: self.ctx.storage,
490 params: &self.ctx.params,
491 };
492
493 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
494
495 if op.group_by.is_empty() {
496 groups.insert(Vec::new(), input_rows);
497 } else {
498 for row in input_rows {
499 let key = op
500 .group_by
501 .iter()
502 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
503 .collect::<Vec<_>>();
504
505 groups.entry(key).or_default().push(row);
506 }
507 }
508
509 let mut out = Vec::new();
510
511 for rows in groups.into_values() {
512 let mut result = Row::new();
513
514 if let Some(first) = rows.first() {
515 for proj in &op.group_by {
516 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
517 result.insert_named(proj.output, proj.name.clone(), value);
518 }
519 }
520
521 for proj in &op.aggregates {
522 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
523 result.insert_named(proj.output, proj.name.clone(), value);
524 }
525
526 out.push(result);
527 }
528
529 Ok(out)
530 }
531
532 fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
533 let mut rows = self.execute_node(plan, op.input)?;
534 let eval_ctx = EvalContext {
535 storage: self.ctx.storage,
536 params: &self.ctx.params,
537 };
538
539 rows.sort_by(|a, b| {
540 for item in &op.items {
541 let ord = compare_sort_item(item, a, b, &eval_ctx);
542 if ord != Ordering::Equal {
543 return ord;
544 }
545 }
546 Ordering::Equal
547 });
548
549 Ok(rows)
550 }
551
552 fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
553 let mut rows = self.execute_node(plan, op.input)?;
554 let eval_ctx = EvalContext {
555 storage: self.ctx.storage,
556 params: &self.ctx.params,
557 };
558
559 let limit = op
560 .limit
561 .as_ref()
562 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
563 .unwrap_or(rows.len() as i64)
564 .max(0) as usize;
565
566 let skip = op
567 .skip
568 .as_ref()
569 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
570 .unwrap_or(0)
571 .max(0) as usize;
572
573 if skip >= rows.len() {
574 return Ok(Vec::new());
575 }
576
577 rows.drain(0..skip);
578 rows.truncate(limit);
579 Ok(rows)
580 }
581
582 fn exec_optional_match(
583 &self,
584 plan: &PhysicalPlan,
585 op: &OptionalMatchExec,
586 ) -> ExecResult<Vec<Row>> {
587 let input_rows = self.execute_node(plan, op.input)?;
588
589 let inner_rows = self.execute_node(plan, op.inner)?;
594
595 let mut out = Vec::new();
596
597 for input_row in input_rows {
598 let mut matched = false;
599
600 for inner_row in &inner_rows {
601 let compatible = input_row
603 .iter()
604 .all(|(var, val)| match inner_row.get(*var) {
605 Some(inner_val) => inner_val == val,
606 None => true,
607 });
608 if !compatible {
609 continue;
610 }
611
612 let mut merged = input_row.clone();
613 for (var, name, val) in inner_row.iter_named() {
614 if !merged.contains_key(*var) {
615 merged.insert_named(*var, name.into_owned(), val.clone());
616 }
617 }
618 out.push(merged);
619 matched = true;
620 }
621
622 if !matched {
623 let mut null_row = input_row;
624 for &var_id in &op.new_vars {
625 if !null_row.contains_key(var_id) {
626 null_row.insert(var_id, LoraValue::Null);
627 }
628 }
629 out.push(null_row);
630 }
631 }
632
633 Ok(out)
634 }
635
636 fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
637 let input_rows = self.execute_node(plan, op.input)?;
638 let mut rows: Vec<Row> = input_rows
639 .into_iter()
640 .map(|mut row| {
641 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
642 row.insert(op.output, path);
643 row
644 })
645 .collect();
646
647 if let Some(all) = op.shortest_path_all {
648 rows = filter_shortest_paths(rows, op.output, all);
649 }
650 Ok(rows)
651 }
652}
653
654fn properties_to_value_map(props: &Properties) -> LoraValue {
655 let mut map = BTreeMap::new();
656 for (k, v) in props.iter() {
657 map.insert(k.clone(), LoraValue::from(v));
658 }
659 LoraValue::Map(map)
660}
661
662pub struct MutableExecutionContext<'a, S: GraphStorageMut> {
663 pub storage: &'a mut S,
664 pub params: std::collections::BTreeMap<String, LoraValue>,
665}
666
667pub struct MutableExecutor<'a, S: GraphStorageMut> {
668 ctx: MutableExecutionContext<'a, S>,
669}
670
671impl<'a, S: GraphStorageMut> MutableExecutor<'a, S> {
672 pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
673 Self { ctx }
674 }
675
676 pub fn execute(
677 &mut self,
678 plan: &PhysicalPlan,
679 options: Option<ExecuteOptions>,
680 ) -> ExecResult<QueryResult> {
681 let _ = take_eval_error();
684
685 let rows = self.execute_node(plan, plan.root)?;
686 let rows = rows
687 .into_iter()
688 .map(|row| self.hydrate_row(row))
689 .collect::<Vec<_>>();
690 Ok(project_rows(rows, options.unwrap_or_default()))
691 }
692
693 pub fn execute_compiled(
695 &mut self,
696 compiled: &CompiledQuery,
697 options: Option<ExecuteOptions>,
698 ) -> ExecResult<QueryResult> {
699 if compiled.unions.is_empty() {
700 return self.execute(&compiled.physical, options);
701 }
702
703 let _ = take_eval_error();
704
705 let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
707
708 let mut needs_dedup = false;
711
712 for branch in &compiled.unions {
713 let branch_rows = self.execute_and_hydrate(&branch.physical)?;
714 all_rows.extend(branch_rows);
715
716 if !branch.all {
717 needs_dedup = true;
718 }
719 }
720
721 if needs_dedup {
722 all_rows = dedup_rows(all_rows);
723 }
724
725 Ok(project_rows(all_rows, options.unwrap_or_default()))
726 }
727
728 fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
729 let rows = self.execute_node(plan, plan.root)?;
730 Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
731 }
732
733 fn hydrate_row(&self, row: Row) -> Row {
734 let mut out = Row::new();
735
736 for (var, name, value) in row.into_iter_named() {
737 out.insert_named(var, name, self.hydrate_value(value));
738 }
739
740 out
741 }
742
743 fn execute_node(
744 &mut self,
745 plan: &PhysicalPlan,
746 node_id: PhysicalNodeId,
747 ) -> ExecResult<Vec<Row>> {
748 trace!("mutable execute_node start: node_id={node_id:?}");
749
750 let result = match &plan.nodes[node_id] {
751 PhysicalOp::Argument(op) => self.exec_argument(op),
752 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
753 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
754 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
755 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
756 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
757 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
758 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
759 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
760 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
761 PhysicalOp::Create(op) => self.exec_create(plan, op),
762 PhysicalOp::Merge(op) => self.exec_merge(plan, op),
763 PhysicalOp::Delete(op) => self.exec_delete(plan, op),
764 PhysicalOp::Set(op) => self.exec_set(plan, op),
765 PhysicalOp::Remove(op) => self.exec_remove(plan, op),
766 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
767 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
768 };
769
770 match &result {
771 Ok(rows) => trace!(
772 "mutable execute_node ok: node_id={node_id:?}, rows={}",
773 rows.len()
774 ),
775 Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
776 }
777
778 result
779 }
780
781 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
782 Ok(vec![Row::new()])
783 }
784
785 fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
786 let base_rows = match op.input {
787 Some(input) => self.execute_node(plan, input)?,
788 None => vec![Row::new()],
789 };
790
791 let node_ids = self.ctx.storage.all_node_ids();
792 let mut out = Vec::new();
793
794 for row in base_rows {
795 if let Some(existing) = row.get(op.var) {
796 match existing {
797 LoraValue::Node(existing_id) => {
798 if self.ctx.storage.has_node(*existing_id) {
799 out.push(row);
800 }
801 }
802 other => {
803 return Err(ExecutorError::ExpectedNodeForExpand {
804 var: format!("{:?}", op.var),
805 found: value_kind(other),
806 });
807 }
808 }
809 continue;
810 }
811
812 for &id in &node_ids {
813 let mut new_row = row.clone();
814 new_row.insert(op.var, LoraValue::Node(id));
815 out.push(new_row);
816 }
817 }
818
819 Ok(out)
820 }
821
822 fn exec_node_by_label_scan(
823 &mut self,
824 plan: &PhysicalPlan,
825 op: &NodeByLabelScanExec,
826 ) -> ExecResult<Vec<Row>> {
827 let base_rows = match op.input {
828 Some(input) => self.execute_node(plan, input)?,
829 None => vec![Row::new()],
830 };
831
832 let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
833 let mut out = Vec::new();
834
835 for row in base_rows {
836 if let Some(existing) = row.get(op.var) {
837 match existing {
838 LoraValue::Node(existing_id) => {
839 let labels_ok = self
840 .ctx
841 .storage
842 .with_node(*existing_id, |n| {
843 node_matches_label_groups(&n.labels, &op.labels)
844 })
845 .unwrap_or(false);
846 if labels_ok {
847 out.push(row);
848 }
849 }
850 other => {
851 return Err(ExecutorError::ExpectedNodeForExpand {
852 var: format!("{:?}", op.var),
853 found: value_kind(other),
854 });
855 }
856 }
857 continue;
858 }
859
860 for &id in &candidate_ids {
861 let labels_ok = self
862 .ctx
863 .storage
864 .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
865 .unwrap_or(false);
866 if !labels_ok {
867 continue;
868 }
869 let mut new_row = row.clone();
870 new_row.insert(op.var, LoraValue::Node(id));
871 out.push(new_row);
872 }
873 }
874
875 Ok(out)
876 }
877
878 fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
879 if let Some(range) = &op.range {
881 return self.exec_expand_var_len(plan, op, range);
882 }
883
884 let input_rows = self.execute_node(plan, op.input)?;
885 let mut out = Vec::new();
886
887 for row in input_rows {
888 let src_node_id = match row.get(op.src) {
889 Some(LoraValue::Node(id)) => *id,
890 Some(other) => {
891 return Err(ExecutorError::ExpectedNodeForExpand {
892 var: format!("{:?}", op.src),
893 found: value_kind(other),
894 });
895 }
896 None => continue,
897 };
898
899 for (rel_id, dst_id) in
900 self.ctx
901 .storage
902 .expand_ids(src_node_id, op.direction, &op.types)
903 {
904 if let Some(expr) = op.rel_properties.as_ref() {
905 let actual_props = self
906 .ctx
907 .storage
908 .with_relationship(rel_id, |rel| rel.properties.clone());
909 let matches = match actual_props {
910 Some(props) => {
911 self.relationship_matches_properties(&props, Some(expr), &row)?
912 }
913 None => false,
914 };
915 if !matches {
916 continue;
917 }
918 }
919
920 if let Some(existing_dst) = row.get(op.dst) {
921 match existing_dst {
922 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
923 LoraValue::Node(_) => continue,
924 other => {
925 return Err(ExecutorError::ExpectedNodeForExpand {
926 var: format!("{:?}", op.dst),
927 found: value_kind(other),
928 });
929 }
930 }
931 }
932
933 if let Some(rel_var) = op.rel {
934 if let Some(existing_rel) = row.get(rel_var) {
935 match existing_rel {
936 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
937 LoraValue::Relationship(_) => continue,
938 other => {
939 return Err(ExecutorError::ExpectedRelationshipForExpand {
940 var: format!("{:?}", rel_var),
941 found: value_kind(other),
942 });
943 }
944 }
945 }
946 }
947
948 let mut new_row = row.clone();
949
950 if !new_row.contains_key(op.dst) {
951 new_row.insert(op.dst, LoraValue::Node(dst_id));
952 }
953
954 if let Some(rel_var) = op.rel {
955 if !new_row.contains_key(rel_var) {
956 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
957 }
958 }
959
960 out.push(new_row);
961 }
962 }
963
964 Ok(out)
965 }
966
967 fn exec_expand_var_len(
968 &mut self,
969 plan: &PhysicalPlan,
970 op: &ExpandExec,
971 range: &RangeLiteral,
972 ) -> ExecResult<Vec<Row>> {
973 let input_rows = self.execute_node(plan, op.input)?;
974 let (min_hops, max_hops) = resolve_range(range);
975 let mut out = Vec::new();
976
977 for row in input_rows {
978 let src_node_id = match row.get(op.src) {
979 Some(LoraValue::Node(id)) => *id,
980 Some(other) => {
981 return Err(ExecutorError::ExpectedNodeForExpand {
982 var: format!("{:?}", op.src),
983 found: value_kind(other),
984 });
985 }
986 None => continue,
987 };
988
989 let expansions = variable_length_expand(
990 &*self.ctx.storage,
991 src_node_id,
992 op.direction,
993 &op.types,
994 min_hops,
995 max_hops,
996 );
997
998 for result in expansions {
999 let mut new_row = row.clone();
1000 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
1001
1002 if let Some(rel_var) = op.rel {
1003 let rel_list = LoraValue::List(
1005 result
1006 .rel_ids
1007 .into_iter()
1008 .map(LoraValue::Relationship)
1009 .collect(),
1010 );
1011 new_row.insert(rel_var, rel_list);
1012 }
1013
1014 out.push(new_row);
1015 }
1016 }
1017
1018 Ok(out)
1019 }
1020
1021 fn relationship_matches_properties(
1022 &self,
1023 actual: &Properties,
1024 expected_expr: Option<&ResolvedExpr>,
1025 row: &Row,
1026 ) -> ExecResult<bool> {
1027 let Some(expr) = expected_expr else {
1028 return Ok(true);
1029 };
1030
1031 let eval_ctx = EvalContext {
1032 storage: &*self.ctx.storage,
1033 params: &self.ctx.params,
1034 };
1035
1036 let expected = eval_expr(expr, row, &eval_ctx);
1037
1038 let LoraValue::Map(expected_map) = expected else {
1039 return Err(ExecutorError::ExpectedPropertyMap {
1040 found: value_kind(&expected),
1041 });
1042 };
1043
1044 Ok(expected_map.iter().all(|(key, expected_value)| {
1045 actual
1046 .get(key)
1047 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1048 .unwrap_or(false)
1049 }))
1050 }
1051
1052 fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1053 let input_rows = self.execute_node(plan, op.input)?;
1054 let eval_ctx = EvalContext {
1055 storage: &*self.ctx.storage,
1056 params: &self.ctx.params,
1057 };
1058
1059 Ok(input_rows
1060 .into_iter()
1061 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1062 .collect())
1063 }
1064
1065 fn exec_projection(
1066 &mut self,
1067 plan: &PhysicalPlan,
1068 op: &ProjectionExec,
1069 ) -> ExecResult<Vec<Row>> {
1070 let input_rows = self.execute_node(plan, op.input)?;
1071 let eval_ctx = EvalContext {
1072 storage: &*self.ctx.storage,
1073 params: &self.ctx.params,
1074 };
1075
1076 let mut out = Vec::with_capacity(input_rows.len());
1077
1078 for row in input_rows {
1079 if op.include_existing {
1080 let mut projected = row;
1081 for item in &op.items {
1082 let value = eval_expr(&item.expr, &projected, &eval_ctx);
1083 if let Some(err) = take_eval_error() {
1084 return Err(ExecutorError::RuntimeError(err));
1085 }
1086 projected.insert_named(item.output, item.name.clone(), value);
1087 }
1088 out.push(projected);
1089 } else {
1090 let mut projected = Row::new();
1091 for item in &op.items {
1092 let value = eval_expr(&item.expr, &row, &eval_ctx);
1093 if let Some(err) = take_eval_error() {
1094 return Err(ExecutorError::RuntimeError(err));
1095 }
1096 projected.insert_named(item.output, item.name.clone(), value);
1097 }
1098 out.push(projected);
1099 }
1100 }
1101
1102 Ok(if op.distinct {
1103 dedup_rows_by_vars(out)
1104 } else {
1105 out
1106 })
1107 }
1108
1109 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1110 match value {
1111 LoraValue::Node(id) => self.hydrate_node(id),
1112 LoraValue::Relationship(id) => self.hydrate_relationship(id),
1113 LoraValue::List(values) => {
1114 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1115 }
1116 LoraValue::Map(map) => LoraValue::Map(
1117 map.into_iter()
1118 .map(|(k, v)| (k, self.hydrate_value(v)))
1119 .collect(),
1120 ),
1121 other => other,
1122 }
1123 }
1124
1125 fn hydrate_node(&self, id: u64) -> LoraValue {
1126 self.ctx
1127 .storage
1128 .with_node(id, hydrate_node_record)
1129 .unwrap_or(LoraValue::Null)
1130 }
1131
1132 fn hydrate_relationship(&self, id: u64) -> LoraValue {
1133 self.ctx
1134 .storage
1135 .with_relationship(id, hydrate_relationship_record)
1136 .unwrap_or(LoraValue::Null)
1137 }
1138
1139 fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1140 let input_rows = self.execute_node(plan, op.input)?;
1141 let eval_ctx = EvalContext {
1142 storage: &*self.ctx.storage,
1143 params: &self.ctx.params,
1144 };
1145
1146 let mut out = Vec::new();
1147
1148 for row in input_rows {
1149 match eval_expr(&op.expr, &row, &eval_ctx) {
1150 LoraValue::List(values) => {
1151 for value in values {
1152 let mut new_row = row.clone();
1153 new_row.insert(op.alias, value);
1154 out.push(new_row);
1155 }
1156 }
1157 LoraValue::Null => {}
1158 other => {
1159 let mut new_row = row;
1160 new_row.insert(op.alias, other);
1161 out.push(new_row);
1162 }
1163 }
1164 }
1165
1166 Ok(out)
1167 }
1168
1169 fn exec_hash_aggregation(
1170 &mut self,
1171 plan: &PhysicalPlan,
1172 op: &HashAggregationExec,
1173 ) -> ExecResult<Vec<Row>> {
1174 let input_rows = self.execute_node(plan, op.input)?;
1175 let eval_ctx = EvalContext {
1176 storage: &*self.ctx.storage,
1177 params: &self.ctx.params,
1178 };
1179
1180 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1181
1182 if op.group_by.is_empty() {
1183 groups.insert(Vec::new(), input_rows);
1184 } else {
1185 for row in input_rows {
1186 let key = op
1187 .group_by
1188 .iter()
1189 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1190 .collect::<Vec<_>>();
1191
1192 groups.entry(key).or_default().push(row);
1193 }
1194 }
1195
1196 let mut out = Vec::new();
1197
1198 for rows in groups.into_values() {
1199 let mut result = Row::new();
1200
1201 if let Some(first) = rows.first() {
1202 for proj in &op.group_by {
1203 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1204 result.insert_named(proj.output, proj.name.clone(), value);
1205 }
1206 }
1207
1208 for proj in &op.aggregates {
1209 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1210 result.insert_named(proj.output, proj.name.clone(), value);
1211 }
1212
1213 out.push(result);
1214 }
1215
1216 Ok(out)
1217 }
1218
1219 fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1220 let mut rows = self.execute_node(plan, op.input)?;
1221 let eval_ctx = EvalContext {
1222 storage: &*self.ctx.storage,
1223 params: &self.ctx.params,
1224 };
1225
1226 rows.sort_by(|a, b| {
1227 for item in &op.items {
1228 let ord = compare_sort_item(item, a, b, &eval_ctx);
1229 if ord != Ordering::Equal {
1230 return ord;
1231 }
1232 }
1233 Ordering::Equal
1234 });
1235
1236 Ok(rows)
1237 }
1238
1239 fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1240 let mut rows = self.execute_node(plan, op.input)?;
1241 let eval_ctx = EvalContext {
1242 storage: &*self.ctx.storage,
1243 params: &self.ctx.params,
1244 };
1245
1246 let limit = op
1247 .limit
1248 .as_ref()
1249 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1250 .unwrap_or(rows.len() as i64)
1251 .max(0) as usize;
1252
1253 let skip = op
1254 .skip
1255 .as_ref()
1256 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1257 .unwrap_or(0)
1258 .max(0) as usize;
1259
1260 if skip >= rows.len() {
1261 return Ok(Vec::new());
1262 }
1263
1264 rows.drain(0..skip);
1265 rows.truncate(limit);
1266 Ok(rows)
1267 }
1268
1269 fn exec_optional_match(
1270 &mut self,
1271 plan: &PhysicalPlan,
1272 op: &OptionalMatchExec,
1273 ) -> ExecResult<Vec<Row>> {
1274 let input_rows = self.execute_node(plan, op.input)?;
1275
1276 let inner_rows = self.execute_node(plan, op.inner)?;
1278
1279 let mut out = Vec::new();
1280
1281 for input_row in input_rows {
1282 let mut matched = false;
1283
1284 for inner_row in &inner_rows {
1285 let compatible = input_row
1286 .iter()
1287 .all(|(var, val)| match inner_row.get(*var) {
1288 Some(inner_val) => inner_val == val,
1289 None => true,
1290 });
1291 if !compatible {
1292 continue;
1293 }
1294
1295 let mut merged = input_row.clone();
1296 for (var, name, val) in inner_row.iter_named() {
1297 if !merged.contains_key(*var) {
1298 merged.insert_named(*var, name.into_owned(), val.clone());
1299 }
1300 }
1301 out.push(merged);
1302 matched = true;
1303 }
1304
1305 if !matched {
1306 let mut null_row = input_row;
1307 for &var_id in &op.new_vars {
1308 if !null_row.contains_key(var_id) {
1309 null_row.insert(var_id, LoraValue::Null);
1310 }
1311 }
1312 out.push(null_row);
1313 }
1314 }
1315
1316 Ok(out)
1317 }
1318
1319 fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1320 let input_rows = self.execute_node(plan, op.input)?;
1321 let mut rows: Vec<Row> = input_rows
1322 .into_iter()
1323 .map(|mut row| {
1324 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1325 row.insert(op.output, path);
1326 row
1327 })
1328 .collect();
1329
1330 if let Some(all) = op.shortest_path_all {
1331 rows = filter_shortest_paths(rows, op.output, all);
1332 }
1333 Ok(rows)
1334 }
1335
1336 fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1337 let input_rows = self.execute_node(plan, op.input)?;
1338 let mut out = Vec::with_capacity(input_rows.len());
1339
1340 for mut row in input_rows {
1341 self.apply_create_pattern(&mut row, &op.pattern)?;
1342 out.push(row);
1343 }
1344
1345 Ok(out)
1346 }
1347
1348 fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1349 match item {
1350 ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1351 Some(LoraValue::Node(node_id)) => {
1352 let node_id = *node_id;
1353 for label in labels {
1354 self.ctx.storage.remove_node_label(node_id, label);
1355 }
1356 Ok(())
1357 }
1358 Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1359 found: value_kind(other),
1360 }),
1361 None => Err(ExecutorError::UnboundVariableForRemove {
1362 var: format!("{variable:?}"),
1363 }),
1364 },
1365
1366 ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1367 }
1368 }
1369
1370 fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1371 match value {
1372 LoraValue::Null => Ok(()),
1373
1374 LoraValue::Node(node_id) => {
1375 if detach {
1376 self.ctx.storage.detach_delete_node(node_id);
1377 Ok(())
1378 } else {
1379 let ok = self.ctx.storage.delete_node(node_id);
1380 if ok {
1381 Ok(())
1382 } else {
1383 Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1384 }
1385 }
1386 }
1387
1388 LoraValue::Relationship(rel_id) => {
1389 let ok = self.ctx.storage.delete_relationship(rel_id);
1390 if ok {
1391 Ok(())
1392 } else {
1393 Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1394 }
1395 }
1396
1397 LoraValue::List(values) => {
1398 for v in values {
1399 self.delete_value(v, detach)?;
1400 }
1401 Ok(())
1402 }
1403
1404 other => Err(ExecutorError::InvalidDeleteTarget {
1405 found: value_kind(&other),
1406 }),
1407 }
1408 }
1409
1410 fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1411 let input_rows = self.execute_node(plan, op.input)?;
1412 let mut out = Vec::with_capacity(input_rows.len());
1413
1414 for mut row in input_rows {
1415 let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1417
1418 let matched = if already_bound {
1419 true
1420 } else {
1421 self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1423 };
1424
1425 if !matched {
1426 self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1427 }
1428
1429 for action in &op.actions {
1430 if action.on_match == matched {
1431 for item in &action.set.items {
1432 self.apply_set_item(&row, item)?;
1433 }
1434 }
1435 }
1436
1437 out.push(row);
1438 }
1439
1440 Ok(out)
1441 }
1442
1443 fn try_match_merge_pattern(
1446 &self,
1447 row: &mut Row,
1448 part: &ResolvedPatternPart,
1449 ) -> ExecResult<bool> {
1450 match &part.element {
1451 ResolvedPatternElement::Node {
1452 var,
1453 labels,
1454 properties,
1455 } => {
1456 let candidate_ids = if labels.is_empty() {
1459 self.ctx.storage.all_node_ids()
1460 } else {
1461 scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1462 };
1463
1464 let eval_ctx = EvalContext {
1466 storage: &*self.ctx.storage,
1467 params: &self.ctx.params,
1468 };
1469 let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1470
1471 for id in candidate_ids {
1472 let matched = self
1473 .ctx
1474 .storage
1475 .with_node(id, |node| {
1476 if !node_matches_label_groups(&node.labels, labels) {
1477 return false;
1478 }
1479 if let Some(LoraValue::Map(expected)) = &expected_props {
1480 let all_match = expected.iter().all(|(key, expected_value)| {
1481 node.properties
1482 .get(key)
1483 .map(|actual| {
1484 value_matches_property_value(expected_value, actual)
1485 })
1486 .unwrap_or(false)
1487 });
1488 if !all_match {
1489 return false;
1490 }
1491 }
1492 true
1493 })
1494 .unwrap_or(false);
1495
1496 if !matched {
1497 continue;
1498 }
1499
1500 if let Some(var_id) = var {
1502 row.insert(*var_id, LoraValue::Node(id));
1503 }
1504 return Ok(true);
1505 }
1506
1507 Ok(false)
1508 }
1509
1510 ResolvedPatternElement::ShortestPath { .. } => {
1511 Ok(false)
1513 }
1514
1515 ResolvedPatternElement::NodeChain { head, chain } => {
1516 let head_node_id = if let Some(var_id) = head.var {
1518 if let Some(LoraValue::Node(id)) = row.get(var_id) {
1519 *id
1520 } else {
1521 let node_matched = self.try_match_merge_pattern(
1523 row,
1524 &ResolvedPatternPart {
1525 binding: None,
1526 element: ResolvedPatternElement::Node {
1527 var: head.var,
1528 labels: head.labels.clone(),
1529 properties: head.properties.clone(),
1530 },
1531 },
1532 )?;
1533 if !node_matched {
1534 return Ok(false);
1535 }
1536 match row.get(var_id) {
1537 Some(LoraValue::Node(id)) => *id,
1538 _ => return Ok(false),
1539 }
1540 }
1541 } else {
1542 return Ok(false);
1543 };
1544
1545 let mut current_node_id = head_node_id;
1546
1547 for step in chain {
1548 let eval_ctx = EvalContext {
1549 storage: &*self.ctx.storage,
1550 params: &self.ctx.params,
1551 };
1552
1553 let _ = step.rel.types.first();
1554 let direction = step.rel.direction;
1555
1556 let edges =
1559 self.ctx
1560 .storage
1561 .expand_ids(current_node_id, direction, &step.rel.types);
1562
1563 let mut found = false;
1565 for (rel_id, node_id) in edges {
1566 let node_ok = self
1568 .ctx
1569 .storage
1570 .with_node(node_id, |node_rec| {
1571 if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
1572 return false;
1573 }
1574 if let Some(props_expr) = &step.node.properties {
1575 let expected = eval_expr(props_expr, row, &eval_ctx);
1576 if let LoraValue::Map(expected_map) = &expected {
1577 let all_match =
1578 expected_map.iter().all(|(key, expected_val)| {
1579 node_rec
1580 .properties
1581 .get(key)
1582 .map(|actual| {
1583 value_matches_property_value(
1584 expected_val,
1585 actual,
1586 )
1587 })
1588 .unwrap_or(false)
1589 });
1590 if !all_match {
1591 return false;
1592 }
1593 }
1594 }
1595 true
1596 })
1597 .unwrap_or(false);
1598 if !node_ok {
1599 continue;
1600 }
1601
1602 let rel_ok = self
1604 .ctx
1605 .storage
1606 .with_relationship(rel_id, |rel_rec| {
1607 if let Some(rel_props_expr) = &step.rel.properties {
1608 let expected = eval_expr(rel_props_expr, row, &eval_ctx);
1609 if let LoraValue::Map(expected_map) = &expected {
1610 let all_match =
1611 expected_map.iter().all(|(key, expected_val)| {
1612 rel_rec
1613 .properties
1614 .get(key)
1615 .map(|actual| {
1616 value_matches_property_value(
1617 expected_val,
1618 actual,
1619 )
1620 })
1621 .unwrap_or(false)
1622 });
1623 if !all_match {
1624 return false;
1625 }
1626 }
1627 }
1628 true
1629 })
1630 .unwrap_or(false);
1631 if !rel_ok {
1632 continue;
1633 }
1634
1635 if let Some(rel_var) = step.rel.var {
1637 row.insert(rel_var, LoraValue::Relationship(rel_id));
1638 }
1639 if let Some(node_var) = step.node.var {
1640 row.insert(node_var, LoraValue::Node(node_id));
1641 }
1642 current_node_id = node_id;
1643 found = true;
1644 break;
1645 }
1646
1647 if !found {
1648 return Ok(false);
1649 }
1650 }
1651
1652 Ok(true)
1653 }
1654 }
1655 }
1656
1657 fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
1658 let input_rows = self.execute_node(plan, op.input)?;
1659
1660 for row in &input_rows {
1661 for expr in &op.expressions {
1662 let value = {
1663 let eval_ctx = EvalContext {
1664 storage: &*self.ctx.storage,
1665 params: &self.ctx.params,
1666 };
1667 eval_expr(expr, row, &eval_ctx)
1668 };
1669
1670 self.delete_value(value, op.detach)?;
1671 }
1672 }
1673
1674 Ok(input_rows)
1675 }
1676
1677 fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
1678 let input_rows = self.execute_node(plan, op.input)?;
1679
1680 for row in &input_rows {
1681 for item in &op.items {
1682 self.apply_set_item(row, item)?;
1683 }
1684 }
1685
1686 Ok(input_rows)
1687 }
1688
1689 fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
1690 let input_rows = self.execute_node(plan, op.input)?;
1691
1692 for row in &input_rows {
1693 for item in &op.items {
1694 self.apply_remove_item(row, item)?;
1695 }
1696 }
1697
1698 Ok(input_rows)
1699 }
1700
1701 fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
1702 match item {
1703 ResolvedSetItem::SetProperty { target, value } => {
1704 let new_value = {
1705 let eval_ctx = EvalContext {
1706 storage: &*self.ctx.storage,
1707 params: &self.ctx.params,
1708 };
1709 eval_expr(value, row, &eval_ctx)
1710 };
1711
1712 self.set_property_from_expr(row, target, new_value)
1713 }
1714
1715 ResolvedSetItem::SetVariable { variable, value } => {
1716 let entity_ref =
1718 row.get(*variable)
1719 .ok_or(ExecutorError::UnboundVariableForSet {
1720 var: format!("{variable:?}"),
1721 })?;
1722 let entity_target = entity_target_from_value(entity_ref)?;
1723
1724 let new_value = {
1725 let eval_ctx = EvalContext {
1726 storage: &*self.ctx.storage,
1727 params: &self.ctx.params,
1728 };
1729 eval_expr(value, row, &eval_ctx)
1730 };
1731
1732 self.overwrite_entity_target(entity_target, new_value)
1733 }
1734
1735 ResolvedSetItem::MutateVariable { variable, value } => {
1736 let entity_ref =
1737 row.get(*variable)
1738 .ok_or(ExecutorError::UnboundVariableForSet {
1739 var: format!("{variable:?}"),
1740 })?;
1741 let entity_target = entity_target_from_value(entity_ref)?;
1742
1743 let patch = {
1744 let eval_ctx = EvalContext {
1745 storage: &*self.ctx.storage,
1746 params: &self.ctx.params,
1747 };
1748 eval_expr(value, row, &eval_ctx)
1749 };
1750
1751 self.mutate_entity_target(entity_target, patch)
1752 }
1753
1754 ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
1755 Some(LoraValue::Node(node_id)) => {
1756 let node_id = *node_id;
1757 for label in labels {
1758 self.ctx.storage.add_node_label(node_id, label);
1759 }
1760 Ok(())
1761 }
1762 Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
1763 found: value_kind(other),
1764 }),
1765 None => Err(ExecutorError::UnboundVariableForSet {
1766 var: format!("{variable:?}"),
1767 }),
1768 },
1769 }
1770 }
1771
1772 fn set_property_from_expr(
1773 &mut self,
1774 row: &Row,
1775 target_expr: &ResolvedExpr,
1776 new_value: LoraValue,
1777 ) -> ExecResult<()> {
1778 let ResolvedExpr::Property { expr, property } = target_expr else {
1779 return Err(ExecutorError::UnsupportedSetTarget);
1780 };
1781
1782 let owner = {
1783 let eval_ctx = EvalContext {
1784 storage: &*self.ctx.storage,
1785 params: &self.ctx.params,
1786 };
1787 eval_expr(expr, row, &eval_ctx)
1788 };
1789
1790 match owner {
1791 LoraValue::Node(node_id) => {
1792 let prop = lora_value_to_property(new_value)
1793 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1794 self.ctx
1795 .storage
1796 .set_node_property(node_id, property.clone(), prop);
1797 Ok(())
1798 }
1799 LoraValue::Relationship(rel_id) => {
1800 let prop = lora_value_to_property(new_value)
1801 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1802 self.ctx
1803 .storage
1804 .set_relationship_property(rel_id, property.clone(), prop);
1805 Ok(())
1806 }
1807 other => Err(ExecutorError::InvalidSetTarget {
1808 found: value_kind(&other),
1809 }),
1810 }
1811 }
1812
1813 fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
1814 let ResolvedExpr::Property {
1815 expr: owner_expr,
1816 property,
1817 } = expr
1818 else {
1819 return Err(ExecutorError::UnsupportedRemoveTarget);
1820 };
1821
1822 let owner = {
1823 let eval_ctx = EvalContext {
1824 storage: &*self.ctx.storage,
1825 params: &self.ctx.params,
1826 };
1827 eval_expr(owner_expr, row, &eval_ctx)
1828 };
1829
1830 match owner {
1831 LoraValue::Node(node_id) => {
1832 self.ctx.storage.remove_node_property(node_id, property);
1833 Ok(())
1834 }
1835 LoraValue::Relationship(rel_id) => {
1836 self.ctx
1837 .storage
1838 .remove_relationship_property(rel_id, property);
1839 Ok(())
1840 }
1841 other => Err(ExecutorError::InvalidRemoveTarget {
1842 found: value_kind(&other),
1843 }),
1844 }
1845 }
1846
1847 fn overwrite_entity_target(
1848 &mut self,
1849 target: EntityTarget,
1850 new_value: LoraValue,
1851 ) -> ExecResult<()> {
1852 let LoraValue::Map(map) = new_value else {
1853 return Err(ExecutorError::ExpectedPropertyMap {
1854 found: value_kind(&new_value),
1855 });
1856 };
1857
1858 let mut props: Properties = Properties::new();
1859 for (k, v) in map {
1860 let prop = lora_value_to_property(v)
1861 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1862 props.insert(k, prop);
1863 }
1864
1865 match target {
1866 EntityTarget::Node(node_id) => {
1867 self.ctx.storage.replace_node_properties(node_id, props);
1868 }
1869 EntityTarget::Relationship(rel_id) => {
1870 self.ctx
1871 .storage
1872 .replace_relationship_properties(rel_id, props);
1873 }
1874 }
1875 Ok(())
1876 }
1877
1878 fn mutate_entity_target(
1879 &mut self,
1880 target: EntityTarget,
1881 patch_value: LoraValue,
1882 ) -> ExecResult<()> {
1883 let LoraValue::Map(map) = patch_value else {
1884 return Err(ExecutorError::ExpectedPropertyMap {
1885 found: value_kind(&patch_value),
1886 });
1887 };
1888
1889 match target {
1890 EntityTarget::Node(node_id) => {
1891 for (k, v) in map {
1892 let prop = lora_value_to_property(v)
1893 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1894 self.ctx.storage.set_node_property(node_id, k, prop);
1895 }
1896 }
1897 EntityTarget::Relationship(rel_id) => {
1898 for (k, v) in map {
1899 let prop = lora_value_to_property(v)
1900 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
1901 self.ctx.storage.set_relationship_property(rel_id, k, prop);
1902 }
1903 }
1904 }
1905 Ok(())
1906 }
1907
1908 fn apply_create_pattern(&mut self, row: &mut Row, pattern: &ResolvedPattern) -> ExecResult<()> {
1909 for part in &pattern.parts {
1910 self.apply_create_pattern_part(row, part)?;
1911 }
1912 Ok(())
1913 }
1914
1915 fn apply_create_pattern_part(
1916 &mut self,
1917 row: &mut Row,
1918 part: &ResolvedPatternPart,
1919 ) -> ExecResult<()> {
1920 if part.binding.is_some() {
1921 trace!("create pattern part has path binding; path materialization not implemented");
1922 }
1923
1924 let _ = self.apply_create_pattern_element(row, &part.element)?;
1925 Ok(())
1926 }
1927
1928 fn apply_create_pattern_element(
1929 &mut self,
1930 row: &mut Row,
1931 element: &ResolvedPatternElement,
1932 ) -> ExecResult<Option<LoraValue>> {
1933 match element {
1934 ResolvedPatternElement::Node {
1935 var,
1936 labels,
1937 properties,
1938 } => {
1939 let node_id =
1940 self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
1941 Ok(Some(LoraValue::Node(node_id)))
1942 }
1943
1944 ResolvedPatternElement::NodeChain { head, chain } => {
1945 let mut current_node_id = self.materialize_node_pattern(
1946 row,
1947 head.var,
1948 &head.labels,
1949 head.properties.as_ref(),
1950 )?;
1951
1952 for link in chain {
1953 let next_node_id = self.materialize_node_pattern(
1954 row,
1955 link.node.var,
1956 &link.node.labels,
1957 link.node.properties.as_ref(),
1958 )?;
1959
1960 let _ = self.materialize_relationship_pattern(
1961 row,
1962 current_node_id,
1963 next_node_id,
1964 &link.rel,
1965 )?;
1966
1967 current_node_id = next_node_id;
1968 }
1969
1970 Ok(Some(LoraValue::Node(current_node_id)))
1971 }
1972
1973 ResolvedPatternElement::ShortestPath { .. } => {
1974 Ok(None)
1976 }
1977 }
1978 }
1979
1980 fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
1981 match &part.element {
1982 ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
1983
1984 ResolvedPatternElement::ShortestPath { .. } => false,
1985
1986 ResolvedPatternElement::NodeChain { head, chain } => {
1987 let head_ok = head.var.and_then(|v| row.get(v)).is_some();
1988
1989 let chain_ok = chain.iter().all(|link| {
1990 let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
1991 let rel_ok = match link.rel.var {
1995 Some(v) => row.get(v).is_some(),
1996 None => false,
1997 };
1998 node_ok && rel_ok
1999 });
2000
2001 head_ok && chain_ok
2002 }
2003 }
2004 }
2005
2006 fn materialize_node_pattern(
2007 &mut self,
2008 row: &mut Row,
2009 var: Option<lora_analyzer::symbols::VarId>,
2010 labels: &[Vec<String>],
2011 properties: Option<&ResolvedExpr>,
2012 ) -> ExecResult<u64> {
2013 if let Some(var_id) = var {
2014 if let Some(LoraValue::Node(id)) = row.get(var_id) {
2015 return Ok(*id);
2016 }
2017 }
2018
2019 let properties = match properties {
2020 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2021 None => Properties::new(),
2022 };
2023
2024 let flat_labels = flatten_label_groups(labels);
2025 debug!("creating node with labels={flat_labels:?}");
2026 let created = self.ctx.storage.create_node(flat_labels, properties);
2027
2028 if let Some(var_id) = var {
2029 row.insert(var_id, LoraValue::Node(created.id));
2030 }
2031
2032 Ok(created.id)
2033 }
2034
2035 fn materialize_relationship_pattern(
2036 &mut self,
2037 row: &mut Row,
2038 left_node_id: u64,
2039 right_node_id: u64,
2040 rel: &lora_analyzer::ResolvedRel,
2041 ) -> ExecResult<u64> {
2042 if let Some(var_id) = rel.var {
2043 if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2044 let id = *id;
2045 if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2046 let endpoints_match = match rel.direction {
2047 Direction::Right | Direction::Undirected => {
2048 src == left_node_id && dst == right_node_id
2049 }
2050 Direction::Left => src == right_node_id && dst == left_node_id,
2051 };
2052
2053 if endpoints_match {
2054 return Ok(id);
2055 }
2056 }
2057 }
2058 }
2059
2060 if rel.range.is_some() {
2061 return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2062 }
2063
2064 let (src, dst) = match rel.direction {
2065 Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2066 Direction::Left => (right_node_id, left_node_id),
2067 };
2068
2069 let rel_type = rel
2070 .types
2071 .first()
2072 .ok_or(ExecutorError::MissingRelationshipType)?;
2073
2074 if rel_type.is_empty() {
2075 return Err(ExecutorError::MissingRelationshipType);
2076 }
2077
2078 let properties = match rel.properties.as_ref() {
2079 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2080 None => Properties::new(),
2081 };
2082
2083 debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2084
2085 let created = self
2086 .ctx
2087 .storage
2088 .create_relationship(src, dst, rel_type, properties)
2089 .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2090 src,
2091 dst,
2092 rel_type: rel_type.clone(),
2093 })?;
2094
2095 if let Some(var_id) = rel.var {
2096 row.insert(var_id, LoraValue::Relationship(created.id));
2097 }
2098
2099 Ok(created.id)
2100 }
2101}
2102
2103#[derive(Clone, Copy)]
2107enum EntityTarget {
2108 Node(NodeId),
2109 Relationship(u64),
2110}
2111
2112fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2113 match value {
2114 LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2115 LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2116 other => Err(ExecutorError::InvalidSetTarget {
2117 found: value_kind(other),
2118 }),
2119 }
2120}
2121
2122fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2126 let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2127 let mut out = Vec::new();
2128
2129 for row in rows {
2130 let key: Vec<GroupValueKey> = row
2131 .iter()
2132 .map(|(_, val)| GroupValueKey::from_value(val))
2133 .collect();
2134 if seen.insert(key) {
2135 out.push(row);
2136 }
2137 }
2138
2139 out
2140}
2141
2142fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2146 let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2147 let mut out = Vec::new();
2148
2149 for row in rows {
2150 let key: Vec<(String, GroupValueKey)> = row
2151 .iter_named()
2152 .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2153 .collect();
2154 if seen.insert(key) {
2155 out.push(row);
2156 }
2157 }
2158
2159 out
2160}
2161
2162fn eval_properties_expr<S: GraphStorage>(
2163 expr: &ResolvedExpr,
2164 row: &Row,
2165 storage: &S,
2166 params: &std::collections::BTreeMap<String, LoraValue>,
2167) -> ExecResult<Properties> {
2168 let eval_ctx = EvalContext { storage, params };
2169
2170 match eval_expr(expr, row, &eval_ctx) {
2171 LoraValue::Map(map) => {
2172 let mut out = Properties::new();
2173 for (k, v) in map {
2174 let prop = lora_value_to_property(v)
2175 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2176 out.insert(k, prop);
2177 }
2178 Ok(out)
2179 }
2180 other => Err(ExecutorError::ExpectedPropertyMap {
2181 found: value_kind(&other),
2182 }),
2183 }
2184}
2185
2186fn compute_aggregate_expr<S: GraphStorage>(
2187 expr: &ResolvedExpr,
2188 rows: &[Row],
2189 eval_ctx: &EvalContext<'_, S>,
2190) -> LoraValue {
2191 match expr {
2192 ResolvedExpr::Function {
2193 name,
2194 distinct,
2195 args,
2196 } => {
2197 let func = name.to_ascii_lowercase();
2198
2199 match func.as_str() {
2200 "count" => {
2201 if args.is_empty() {
2202 return LoraValue::Int(rows.len() as i64);
2203 }
2204
2205 let mut values = rows
2206 .iter()
2207 .map(|r| eval_expr(&args[0], r, eval_ctx))
2208 .filter(|v| !matches!(v, LoraValue::Null))
2209 .collect::<Vec<_>>();
2210
2211 if *distinct {
2212 values = dedup_values(values);
2213 }
2214
2215 LoraValue::Int(values.len() as i64)
2216 }
2217
2218 "collect" => {
2219 if args.is_empty() {
2220 return LoraValue::List(Vec::new());
2221 }
2222
2223 let mut values = rows
2224 .iter()
2225 .map(|r| eval_expr(&args[0], r, eval_ctx))
2226 .collect::<Vec<_>>();
2227
2228 if *distinct {
2229 values = dedup_values(values);
2230 }
2231
2232 LoraValue::List(values)
2233 }
2234
2235 "sum" => {
2236 if args.is_empty() {
2237 return LoraValue::Null;
2238 }
2239
2240 let mut values = rows
2241 .iter()
2242 .map(|r| eval_expr(&args[0], r, eval_ctx))
2243 .collect::<Vec<_>>();
2244
2245 if *distinct {
2246 values = dedup_values(values);
2247 }
2248
2249 let nums = values
2250 .into_iter()
2251 .filter_map(as_f64_lossy)
2252 .collect::<Vec<_>>();
2253
2254 if nums.is_empty() {
2255 LoraValue::Null
2256 } else if nums.iter().all(|n| n.fract() == 0.0) {
2257 LoraValue::Int(nums.iter().sum::<f64>() as i64)
2258 } else {
2259 LoraValue::Float(nums.iter().sum::<f64>())
2260 }
2261 }
2262
2263 "avg" => {
2264 if args.is_empty() {
2265 return LoraValue::Null;
2266 }
2267
2268 let mut values = rows
2269 .iter()
2270 .map(|r| eval_expr(&args[0], r, eval_ctx))
2271 .collect::<Vec<_>>();
2272
2273 if *distinct {
2274 values = dedup_values(values);
2275 }
2276
2277 let nums = values
2278 .into_iter()
2279 .filter_map(as_f64_lossy)
2280 .collect::<Vec<_>>();
2281
2282 if nums.is_empty() {
2283 LoraValue::Null
2284 } else {
2285 LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2286 }
2287 }
2288
2289 "min" => {
2290 if args.is_empty() {
2291 return LoraValue::Null;
2292 }
2293
2294 let mut values = rows
2295 .iter()
2296 .map(|r| eval_expr(&args[0], r, eval_ctx))
2297 .filter(|v| !matches!(v, LoraValue::Null))
2298 .collect::<Vec<_>>();
2299
2300 if *distinct {
2301 values = dedup_values(values);
2302 }
2303
2304 values
2305 .into_iter()
2306 .min_by(compare_values_total)
2307 .unwrap_or(LoraValue::Null)
2308 }
2309
2310 "max" => {
2311 if args.is_empty() {
2312 return LoraValue::Null;
2313 }
2314
2315 let mut values = rows
2316 .iter()
2317 .map(|r| eval_expr(&args[0], r, eval_ctx))
2318 .filter(|v| !matches!(v, LoraValue::Null))
2319 .collect::<Vec<_>>();
2320
2321 if *distinct {
2322 values = dedup_values(values);
2323 }
2324
2325 values
2326 .into_iter()
2327 .max_by(compare_values_total)
2328 .unwrap_or(LoraValue::Null)
2329 }
2330
2331 "stdev" | "stdevp" => {
2332 if args.is_empty() {
2333 return LoraValue::Null;
2334 }
2335
2336 let nums: Vec<f64> = rows
2337 .iter()
2338 .map(|r| eval_expr(&args[0], r, eval_ctx))
2339 .filter_map(as_f64_lossy)
2340 .collect();
2341
2342 let is_population = func == "stdevp";
2343
2344 if nums.is_empty() || (!is_population && nums.len() < 2) {
2345 return LoraValue::Float(0.0);
2346 }
2347
2348 let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2349 let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2350 let denom = if is_population {
2351 nums.len() as f64
2352 } else {
2353 (nums.len() - 1) as f64
2354 };
2355 LoraValue::Float((variance_sum / denom).sqrt())
2356 }
2357
2358 "percentilecont" => {
2359 if args.len() < 2 {
2360 return LoraValue::Null;
2361 }
2362
2363 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2364 .as_f64()
2365 .unwrap_or(0.5);
2366 let mut nums: Vec<f64> = rows
2367 .iter()
2368 .map(|r| eval_expr(&args[0], r, eval_ctx))
2369 .filter_map(as_f64_lossy)
2370 .collect();
2371
2372 if nums.is_empty() {
2373 return LoraValue::Null;
2374 }
2375
2376 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2377
2378 let index = percentile * (nums.len() - 1) as f64;
2379 let lower = index.floor() as usize;
2380 let upper = index.ceil() as usize;
2381 let fraction = index - lower as f64;
2382
2383 if lower == upper || upper >= nums.len() {
2384 LoraValue::Float(nums[lower])
2385 } else {
2386 LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2387 }
2388 }
2389
2390 "percentiledisc" => {
2391 if args.len() < 2 {
2392 return LoraValue::Null;
2393 }
2394
2395 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2396 .as_f64()
2397 .unwrap_or(0.5);
2398 let mut nums: Vec<f64> = rows
2399 .iter()
2400 .map(|r| eval_expr(&args[0], r, eval_ctx))
2401 .filter_map(as_f64_lossy)
2402 .collect();
2403
2404 if nums.is_empty() {
2405 return LoraValue::Null;
2406 }
2407
2408 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2409
2410 let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2411 let index = index.min(nums.len() - 1);
2412 LoraValue::Float(nums[index])
2413 }
2414
2415 _ => rows
2416 .first()
2417 .map(|r| eval_expr(expr, r, eval_ctx))
2418 .unwrap_or(LoraValue::Null),
2419 }
2420 }
2421
2422 _ => rows
2423 .first()
2424 .map(|r| eval_expr(expr, r, eval_ctx))
2425 .unwrap_or(LoraValue::Null),
2426 }
2427}
2428
2429fn compare_sort_item<S: GraphStorage>(
2430 item: &ResolvedSortItem,
2431 a: &Row,
2432 b: &Row,
2433 eval_ctx: &EvalContext<'_, S>,
2434) -> Ordering {
2435 let av = eval_expr(&item.expr, a, eval_ctx);
2436 let bv = eval_expr(&item.expr, b, eval_ctx);
2437
2438 let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2439 compare_values_for_sort(&av, &bv, ascending)
2440}
2441
2442fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2443 let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2444 let mut out = Vec::new();
2445
2446 for value in values {
2447 let key = GroupValueKey::from_value(&value);
2448 if seen.insert(key) {
2449 out.push(value);
2450 }
2451 }
2452
2453 out
2454}
2455
2456fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2457 match v {
2458 LoraValue::Int(i) => Some(i as f64),
2459 LoraValue::Float(f) => Some(f),
2460 _ => None,
2461 }
2462}
2463
2464fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2465 let ord = match (a, b) {
2466 (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2467 (LoraValue::Null, _) => Ordering::Greater,
2468 (_, LoraValue::Null) => Ordering::Less,
2469 _ => compare_values_total(a, b),
2470 };
2471
2472 if ascending {
2473 ord
2474 } else {
2475 ord.reverse()
2476 }
2477}
2478
2479fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
2480 use LoraValue::*;
2481
2482 match (a, b) {
2483 (Bool(x), Bool(y)) => x.cmp(y),
2484 (Int(x), Int(y)) => x.cmp(y),
2485 (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
2486 (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
2487 (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
2488 (String(x), String(y)) => x.cmp(y),
2489 (Node(x), Node(y)) => x.cmp(y),
2490 (Relationship(x), Relationship(y)) => x.cmp(y),
2491 (Date(x), Date(y)) => x.cmp(y),
2492 (DateTime(x), DateTime(y)) => x.cmp(y),
2493 (Duration(x), Duration(y)) => x.cmp(y),
2494 (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
2495 _ => type_rank(a)
2496 .cmp(&type_rank(b))
2497 .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
2498 }
2499}
2500
2501pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
2502 match (expected, actual) {
2503 (LoraValue::Null, PropertyValue::Null) => true,
2504 (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
2505 (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
2506 (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
2507 (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
2508 (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
2509 (LoraValue::String(a), PropertyValue::String(b)) => a == b,
2510
2511 (LoraValue::List(xs), PropertyValue::List(ys)) => {
2512 xs.len() == ys.len()
2513 && xs
2514 .iter()
2515 .zip(ys.iter())
2516 .all(|(x, y)| value_matches_property_value(x, y))
2517 }
2518
2519 (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
2520 ym.get(k)
2521 .map(|yv| value_matches_property_value(xv, yv))
2522 .unwrap_or(false)
2523 }),
2524
2525 (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
2526 (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
2527 (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
2528 (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
2529 (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
2530 (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
2531 (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
2532 (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
2533
2534 _ => false,
2535 }
2536}
2537
2538fn build_path_value<S: GraphStorage>(
2544 row: &Row,
2545 node_vars: &[VarId],
2546 rel_vars: &[VarId],
2547 storage: &S,
2548) -> LoraValue {
2549 let mut raw_nodes = Vec::new();
2550 let mut rels = Vec::new();
2551 let mut has_var_len = false;
2552
2553 for &nv in node_vars {
2554 match row.get(nv) {
2555 Some(LoraValue::Node(id)) => raw_nodes.push(*id),
2556 Some(LoraValue::List(items)) => {
2557 for item in items {
2558 if let LoraValue::Node(id) = item {
2559 raw_nodes.push(*id);
2560 }
2561 }
2562 }
2563 _ => {}
2564 }
2565 }
2566
2567 for &rv in rel_vars {
2568 match row.get(rv) {
2569 Some(LoraValue::Relationship(id)) => rels.push(*id),
2570 Some(LoraValue::List(items)) => {
2571 has_var_len = true;
2572 for item in items {
2573 if let LoraValue::Relationship(id) = item {
2574 rels.push(*id);
2575 }
2576 }
2577 }
2578 _ => {}
2579 }
2580 }
2581
2582 let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
2586 let start = raw_nodes[0];
2587 let mut ordered = Vec::with_capacity(rels.len() + 1);
2588 ordered.push(start);
2589 let mut current = start;
2590 for &rel_id in &rels {
2591 if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
2592 let next = if src == current { dst } else { src };
2593 ordered.push(next);
2594 current = next;
2595 }
2596 }
2597 ordered
2598 } else {
2599 raw_nodes
2600 };
2601
2602 LoraValue::Path(LoraPath { nodes, rels })
2603}
2604
2605fn type_rank(v: &LoraValue) -> u8 {
2606 match v {
2607 LoraValue::Null => 0,
2608 LoraValue::Bool(_) => 1,
2609 LoraValue::Int(_) | LoraValue::Float(_) => 2,
2610 LoraValue::String(_) => 3,
2611 LoraValue::Date(_) => 4,
2612 LoraValue::DateTime(_) => 5,
2613 LoraValue::LocalDateTime(_) => 6,
2614 LoraValue::Time(_) => 7,
2615 LoraValue::LocalTime(_) => 8,
2616 LoraValue::Duration(_) => 9,
2617 LoraValue::Point(_) => 10,
2618 LoraValue::Vector(_) => 11,
2619 LoraValue::List(_) => 12,
2620 LoraValue::Map(_) => 13,
2621 LoraValue::Node(_) => 14,
2622 LoraValue::Relationship(_) => 15,
2623 LoraValue::Path(_) => 16,
2624 }
2625}
2626
2627fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
2631 groups
2632 .iter()
2633 .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
2634}
2635
2636fn scan_node_ids_for_label_groups<S: GraphStorage>(
2639 storage: &S,
2640 groups: &[Vec<String>],
2641) -> Vec<lora_store::NodeId> {
2642 if groups.len() == 1 && groups[0].len() == 1 {
2643 storage.node_ids_by_label(&groups[0][0])
2644 } else if groups.len() == 1 && groups[0].len() > 1 {
2645 let mut seen = std::collections::BTreeSet::new();
2646 let mut out = Vec::new();
2647 for label in &groups[0] {
2648 for id in storage.node_ids_by_label(label) {
2649 if seen.insert(id) {
2650 out.push(id);
2651 }
2652 }
2653 }
2654 out
2655 } else {
2656 storage.node_ids_by_label(&groups[0][0])
2657 }
2658}
2659
2660fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
2661 let mut map = BTreeMap::new();
2662 map.insert("kind".to_string(), LoraValue::String("node".to_string()));
2663 map.insert("id".to_string(), LoraValue::Int(node.id as i64));
2664 map.insert(
2665 "labels".to_string(),
2666 LoraValue::List(
2667 node.labels
2668 .iter()
2669 .map(|s| LoraValue::String(s.clone()))
2670 .collect(),
2671 ),
2672 );
2673 map.insert(
2674 "properties".to_string(),
2675 properties_to_value_map(&node.properties),
2676 );
2677 LoraValue::Map(map)
2678}
2679
2680fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
2681 let mut map = BTreeMap::new();
2682 map.insert(
2683 "kind".to_string(),
2684 LoraValue::String("relationship".to_string()),
2685 );
2686 map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
2687 map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
2688 map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
2689 map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
2690 map.insert(
2691 "properties".to_string(),
2692 properties_to_value_map(&rel.properties),
2693 );
2694 LoraValue::Map(map)
2695}
2696
2697fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
2700 groups.iter().flat_map(|g| g.iter().cloned()).collect()
2701}
2702
2703#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2704enum GroupValueKey {
2705 Null,
2706 Bool(bool),
2707 Int(i64),
2708 Float(String),
2709 String(String),
2710 List(Vec<GroupValueKey>),
2711 Map(Vec<(String, GroupValueKey)>),
2712 Node(u64),
2713 Relationship(u64),
2714}
2715
2716impl GroupValueKey {
2717 fn from_value(v: &LoraValue) -> Self {
2718 match v {
2719 LoraValue::Null => Self::Null,
2720 LoraValue::Bool(x) => Self::Bool(*x),
2721 LoraValue::Int(x) => Self::Int(*x),
2722 LoraValue::Float(x) => Self::Float(x.to_string()),
2723 LoraValue::String(x) => Self::String(x.clone()),
2724 LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
2725 LoraValue::Map(m) => Self::Map(
2726 m.iter()
2727 .map(|(k, v)| (k.clone(), Self::from_value(v)))
2728 .collect(),
2729 ),
2730 LoraValue::Node(id) => Self::Node(*id),
2731 LoraValue::Relationship(id) => Self::Relationship(*id),
2732 LoraValue::Path(_) => Self::Null,
2733 LoraValue::Date(d) => Self::String(d.to_string()),
2735 LoraValue::DateTime(dt) => Self::String(dt.to_string()),
2736 LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
2737 LoraValue::Time(t) => Self::String(t.to_string()),
2738 LoraValue::LocalTime(t) => Self::String(t.to_string()),
2739 LoraValue::Duration(dur) => Self::String(dur.to_string()),
2740 LoraValue::Point(p) => Self::String(p.to_string()),
2741 LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
2742 }
2743 }
2744}
2745
2746const MAX_VAR_LEN_HOPS: u64 = 100;
2758
2759fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
2760 let min_hops = range.start.unwrap_or(1);
2761 let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
2762 (min_hops, max_hops)
2763}
2764
2765struct VarLenResult {
2767 dst_node_id: NodeId,
2769 rel_ids: Vec<u64>,
2771}
2772
2773fn variable_length_expand<S: GraphStorage>(
2780 storage: &S,
2781 start_node_id: NodeId,
2782 direction: Direction,
2783 types: &[String],
2784 min_hops: u64,
2785 max_hops: u64,
2786) -> Vec<VarLenResult> {
2787 let mut results = Vec::new();
2788
2789 let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
2791
2792 for depth in 1..=max_hops {
2793 let is_last_hop = depth == max_hops;
2797 let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
2798
2799 for (current_node, rels_used) in &frontier {
2800 for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
2803 if rels_used.contains(&rel_id) {
2806 continue;
2807 }
2808
2809 if is_last_hop {
2810 if depth >= min_hops {
2813 let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
2814 rel_ids.extend_from_slice(rels_used);
2815 rel_ids.push(rel_id);
2816 results.push(VarLenResult {
2817 dst_node_id: neighbor_id,
2818 rel_ids,
2819 });
2820 }
2821 continue;
2822 }
2823
2824 let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
2825 new_rels.extend_from_slice(rels_used);
2826 new_rels.push(rel_id);
2827
2828 if depth >= min_hops {
2829 results.push(VarLenResult {
2830 dst_node_id: neighbor_id,
2831 rel_ids: new_rels.clone(),
2832 });
2833 }
2834
2835 next_frontier.push((neighbor_id, new_rels));
2836 }
2837 }
2838
2839 if is_last_hop || next_frontier.is_empty() {
2840 break;
2841 }
2842
2843 frontier = next_frontier;
2844 }
2845
2846 if min_hops == 0 {
2848 results.insert(
2849 0,
2850 VarLenResult {
2851 dst_node_id: start_node_id,
2852 rel_ids: Vec::new(),
2853 },
2854 );
2855 }
2856
2857 results
2858}
2859
2860fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
2863 if rows.is_empty() {
2864 return rows;
2865 }
2866
2867 let lengths: Vec<usize> = rows
2869 .iter()
2870 .map(|row| match row.get(path_var) {
2871 Some(LoraValue::Path(p)) => p.rels.len(),
2872 _ => usize::MAX,
2873 })
2874 .collect();
2875
2876 let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
2877
2878 let mut result: Vec<Row> = rows
2879 .into_iter()
2880 .zip(lengths.iter())
2881 .filter(|(_, len)| **len == min_len)
2882 .map(|(row, _)| row)
2883 .collect();
2884
2885 if !all && result.len() > 1 {
2886 result.truncate(1);
2887 }
2888
2889 result
2890}