1use crate::errors::{value_kind, ExecResult, ExecutorError};
2use crate::eval::{eval_expr, take_eval_error, EvalContext};
3use crate::value::{lora_value_to_property, LoraPath, LoraValue, Row};
4use crate::{project_rows, ExecuteOptions, QueryResult};
5
6use lora_analyzer::{
7 symbols::VarId, ResolvedExpr, ResolvedPattern, ResolvedPatternElement, ResolvedPatternPart,
8 ResolvedRemoveItem, ResolvedSetItem, ResolvedSortItem,
9};
10use lora_ast::{Direction, RangeLiteral};
11use lora_compiler::physical::*;
12use lora_compiler::CompiledQuery;
13use lora_store::{GraphStorage, GraphStorageMut, NodeId, Properties, PropertyValue};
14
15use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet};
17use std::time::Instant;
18use tracing::{debug, error, trace};
19
20pub struct ExecutionContext<'a, S: GraphStorage> {
21 pub storage: &'a S,
22 pub params: std::collections::BTreeMap<String, LoraValue>,
23}
24
25pub struct Executor<'a, S: GraphStorage> {
26 ctx: ExecutionContext<'a, S>,
27 deadline: Option<Instant>,
28}
29
30impl<'a, S: GraphStorage> Executor<'a, S> {
31 pub fn new(ctx: ExecutionContext<'a, S>) -> Self {
32 Self {
33 ctx,
34 deadline: None,
35 }
36 }
37
38 pub fn with_deadline(ctx: ExecutionContext<'a, S>, deadline: Option<Instant>) -> Self {
39 Self { ctx, deadline }
40 }
41
42 #[inline]
43 fn check_deadline(&self) -> ExecResult<()> {
44 if let Some(deadline) = self.deadline {
45 check_deadline_at(deadline)
46 } else {
47 Ok(())
48 }
49 }
50
51 #[inline]
52 fn check_loop_deadline(deadline: Option<Instant>) -> ExecResult<()> {
53 if let Some(deadline) = deadline {
54 check_deadline_at(deadline)
55 } else {
56 Ok(())
57 }
58 }
59}
60
61#[inline]
62fn check_deadline_at(deadline: Instant) -> ExecResult<()> {
63 if Instant::now() >= deadline {
64 Err(ExecutorError::QueryTimeout)
65 } else {
66 Ok(())
67 }
68}
69
70impl<'a, S: GraphStorage> Executor<'a, S> {
71 pub fn execute(
72 &self,
73 plan: &PhysicalPlan,
74 options: Option<ExecuteOptions>,
75 ) -> ExecResult<QueryResult> {
76 let rows = self.execute_rows(plan)?;
77 Ok(project_rows(rows, options.unwrap_or_default()))
78 }
79
80 pub fn execute_compiled(
81 &self,
82 compiled: &CompiledQuery,
83 options: Option<ExecuteOptions>,
84 ) -> ExecResult<QueryResult> {
85 let rows = self.execute_compiled_rows(compiled)?;
86 Ok(project_rows(rows, options.unwrap_or_default()))
87 }
88
89 pub fn execute_compiled_rows(&self, compiled: &CompiledQuery) -> ExecResult<Vec<Row>> {
90 self.check_deadline()?;
91 if compiled.unions.is_empty() {
92 return self.execute_rows(&compiled.physical);
93 }
94
95 let _ = take_eval_error();
96
97 let mut all_rows = self.execute_rows(&compiled.physical)?;
98 let mut needs_dedup = false;
99
100 for branch in &compiled.unions {
101 self.check_deadline()?;
102 let branch_rows = self.execute_rows(&branch.physical)?;
103 all_rows.extend(branch_rows);
104
105 if !branch.all {
106 needs_dedup = true;
107 }
108 }
109
110 if needs_dedup {
111 all_rows = dedup_rows(all_rows);
112 }
113
114 Ok(all_rows)
115 }
116
117 pub fn execute_rows(&self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
118 self.check_deadline()?;
119 let _ = take_eval_error();
122
123 let rows = self.execute_node(plan, plan.root)?;
124 Ok(rows
125 .into_iter()
126 .map(|row| self.hydrate_row(row))
127 .collect::<Vec<_>>())
128 }
129
130 fn hydrate_row(&self, row: Row) -> Row {
131 let mut out = Row::new();
132
133 for (var, name, value) in row.into_iter_named() {
134 out.insert_named(var, name, self.hydrate_value(value));
135 }
136
137 out
138 }
139
140 pub(crate) fn execute_subtree(
144 &self,
145 plan: &PhysicalPlan,
146 node_id: PhysicalNodeId,
147 ) -> ExecResult<Vec<Row>> {
148 self.execute_node(plan, node_id)
149 }
150
151 fn execute_node(&self, plan: &PhysicalPlan, node_id: PhysicalNodeId) -> ExecResult<Vec<Row>> {
152 self.check_deadline()?;
153 trace!("read-only execute_node start: node_id={node_id:?}");
154
155 let result = match &plan.nodes[node_id] {
156 PhysicalOp::Argument(op) => self.exec_argument(op),
157 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
158 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
159 PhysicalOp::NodeByPropertyScan(op) => self.exec_node_by_property_scan(plan, op),
160 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
161 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
162 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
163 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
164 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
165 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
166 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
167 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
168 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
169 PhysicalOp::Create(_) => Err(ExecutorError::ReadOnlyCreate { node_id }),
170 PhysicalOp::Merge(_) => Err(ExecutorError::ReadOnlyMerge { node_id }),
171 PhysicalOp::Delete(_) => Err(ExecutorError::ReadOnlyDelete { node_id }),
172 PhysicalOp::Set(_) => Err(ExecutorError::ReadOnlySet { node_id }),
173 PhysicalOp::Remove(_) => Err(ExecutorError::ReadOnlyRemove { node_id }),
174 };
175
176 match &result {
177 Ok(rows) => trace!(
178 "read-only execute_node ok: node_id={node_id:?}, rows={}",
179 rows.len()
180 ),
181 Err(err) => error!("read-only execute_node failed: node_id={node_id:?}, error={err}"),
182 }
183
184 result
185 }
186
187 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
188 Ok(vec![Row::new()])
189 }
190
191 fn exec_node_scan(&self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
192 let base_rows = match op.input {
193 Some(input) => self.execute_node(plan, input)?,
194 None => vec![Row::new()],
195 };
196
197 let node_ids = self.ctx.storage.all_node_ids();
198 let mut out = Vec::new();
199
200 let deadline = self.deadline;
201 for row in base_rows {
202 Self::check_loop_deadline(deadline)?;
203 if let Some(existing) = row.get(op.var) {
204 match existing {
205 LoraValue::Node(existing_id) => {
206 if self.ctx.storage.has_node(*existing_id) {
207 out.push(row);
208 }
209 }
210 other => {
211 return Err(ExecutorError::ExpectedNodeForExpand {
212 var: format!("{:?}", op.var),
213 found: value_kind(other),
214 });
215 }
216 }
217 continue;
218 }
219
220 for &id in &node_ids {
221 Self::check_loop_deadline(deadline)?;
222 let mut new_row = row.clone();
223 new_row.insert(op.var, LoraValue::Node(id));
224 out.push(new_row);
225 }
226 }
227
228 Ok(out)
229 }
230
231 fn exec_node_by_label_scan(
232 &self,
233 plan: &PhysicalPlan,
234 op: &NodeByLabelScanExec,
235 ) -> ExecResult<Vec<Row>> {
236 let base_rows = match op.input {
237 Some(input) => self.execute_node(plan, input)?,
238 None => vec![Row::new()],
239 };
240
241 let candidate_ids = scan_node_ids_for_label_groups(self.ctx.storage, &op.labels);
242 let candidates_prefiltered = label_group_candidates_prefiltered(&op.labels);
243 let mut out = Vec::new();
244
245 match self.deadline {
246 Some(deadline) => {
247 for row in base_rows {
248 check_deadline_at(deadline)?;
249 if let Some(existing) = row.get(op.var) {
250 match existing {
251 LoraValue::Node(existing_id) => {
252 let labels_ok = self
253 .ctx
254 .storage
255 .with_node(*existing_id, |n| {
256 node_matches_label_groups(&n.labels, &op.labels)
257 })
258 .unwrap_or(false);
259 if labels_ok {
260 out.push(row);
261 }
262 }
263 other => {
264 return Err(ExecutorError::ExpectedNodeForExpand {
265 var: format!("{:?}", op.var),
266 found: value_kind(other),
267 });
268 }
269 }
270 continue;
271 }
272
273 for &id in &candidate_ids {
274 check_deadline_at(deadline)?;
275 if !candidates_prefiltered {
276 let labels_ok = self
277 .ctx
278 .storage
279 .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
280 .unwrap_or(false);
281 if !labels_ok {
282 continue;
283 }
284 }
285 let mut new_row = row.clone();
286 new_row.insert(op.var, LoraValue::Node(id));
287 out.push(new_row);
288 }
289 }
290 }
291 None => {
292 for row in base_rows {
293 if let Some(existing) = row.get(op.var) {
294 match existing {
295 LoraValue::Node(existing_id) => {
296 let labels_ok = self
297 .ctx
298 .storage
299 .with_node(*existing_id, |n| {
300 node_matches_label_groups(&n.labels, &op.labels)
301 })
302 .unwrap_or(false);
303 if labels_ok {
304 out.push(row);
305 }
306 }
307 other => {
308 return Err(ExecutorError::ExpectedNodeForExpand {
309 var: format!("{:?}", op.var),
310 found: value_kind(other),
311 });
312 }
313 }
314 continue;
315 }
316
317 for &id in &candidate_ids {
318 if !candidates_prefiltered {
319 let labels_ok = self
320 .ctx
321 .storage
322 .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
323 .unwrap_or(false);
324 if !labels_ok {
325 continue;
326 }
327 }
328 let mut new_row = row.clone();
329 new_row.insert(op.var, LoraValue::Node(id));
330 out.push(new_row);
331 }
332 }
333 }
334 }
335
336 Ok(out)
337 }
338
339 fn exec_node_by_property_scan(
340 &self,
341 plan: &PhysicalPlan,
342 op: &NodeByPropertyScanExec,
343 ) -> ExecResult<Vec<Row>> {
344 let base_rows = match op.input {
345 Some(input) => self.execute_node(plan, input)?,
346 None => vec![Row::new()],
347 };
348
349 let eval_ctx = EvalContext {
350 storage: self.ctx.storage,
351 params: &self.ctx.params,
352 };
353 let mut out = Vec::new();
354
355 let deadline = self.deadline;
356 for row in base_rows {
357 Self::check_loop_deadline(deadline)?;
358 let expected = eval_expr(&op.value, &row, &eval_ctx);
359
360 if let Some(existing) = row.get(op.var) {
361 match existing {
362 LoraValue::Node(existing_id) => {
363 if node_matches_property_filter(
364 self.ctx.storage,
365 *existing_id,
366 &op.labels,
367 &op.key,
368 &expected,
369 ) {
370 out.push(row);
371 }
372 }
373 other => {
374 return Err(ExecutorError::ExpectedNodeForExpand {
375 var: format!("{:?}", op.var),
376 found: value_kind(other),
377 });
378 }
379 }
380 continue;
381 }
382
383 let candidates =
384 indexed_node_property_candidates(self.ctx.storage, &op.labels, &op.key, &expected);
385 for id in candidates.ids {
386 Self::check_loop_deadline(deadline)?;
387 if !candidates.prefiltered
388 && !node_matches_property_filter(
389 self.ctx.storage,
390 id,
391 &op.labels,
392 &op.key,
393 &expected,
394 )
395 {
396 continue;
397 }
398 let mut new_row = row.clone();
399 new_row.insert(op.var, LoraValue::Node(id));
400 out.push(new_row);
401 }
402 }
403
404 Ok(out)
405 }
406
407 fn exec_expand(&self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
408 if let Some(range) = &op.range {
410 return self.exec_expand_var_len(plan, op, range);
411 }
412
413 let input_rows = self.execute_node(plan, op.input)?;
414 let mut out = Vec::new();
415
416 for row in input_rows {
417 let src_node_id = match row.get(op.src) {
418 Some(LoraValue::Node(id)) => *id,
419 Some(other) => {
420 return Err(ExecutorError::ExpectedNodeForExpand {
421 var: format!("{:?}", op.src),
422 found: value_kind(other),
423 });
424 }
425 None => continue,
426 };
427
428 for (rel_id, dst_id) in
429 self.ctx
430 .storage
431 .expand_ids(src_node_id, op.direction, &op.types)
432 {
433 if let Some(expr) = op.rel_properties.as_ref() {
434 let actual_props = self
435 .ctx
436 .storage
437 .with_relationship(rel_id, |rel| rel.properties.clone());
438 let matches = match actual_props {
439 Some(props) => {
440 self.relationship_matches_properties(&props, Some(expr), &row)?
441 }
442 None => false,
443 };
444 if !matches {
445 continue;
446 }
447 }
448
449 if let Some(existing_dst) = row.get(op.dst) {
450 match existing_dst {
451 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
452 LoraValue::Node(_) => continue,
453 other => {
454 return Err(ExecutorError::ExpectedNodeForExpand {
455 var: format!("{:?}", op.dst),
456 found: value_kind(other),
457 });
458 }
459 }
460 }
461
462 if let Some(rel_var) = op.rel {
463 if let Some(existing_rel) = row.get(rel_var) {
464 match existing_rel {
465 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
466 LoraValue::Relationship(_) => continue,
467 other => {
468 return Err(ExecutorError::ExpectedRelationshipForExpand {
469 var: format!("{:?}", rel_var),
470 found: value_kind(other),
471 });
472 }
473 }
474 }
475 }
476
477 let mut new_row = row.clone();
478
479 if !new_row.contains_key(op.dst) {
480 new_row.insert(op.dst, LoraValue::Node(dst_id));
481 }
482
483 if let Some(rel_var) = op.rel {
484 if !new_row.contains_key(rel_var) {
485 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
486 }
487 }
488
489 out.push(new_row);
490 }
491 }
492
493 Ok(out)
494 }
495
496 fn exec_expand_var_len(
497 &self,
498 plan: &PhysicalPlan,
499 op: &ExpandExec,
500 range: &RangeLiteral,
501 ) -> ExecResult<Vec<Row>> {
502 let input_rows = self.execute_node(plan, op.input)?;
503 let (min_hops, max_hops) = resolve_range(range);
504 let mut out = Vec::new();
505
506 for row in input_rows {
507 let src_node_id = match row.get(op.src) {
508 Some(LoraValue::Node(id)) => *id,
509 Some(other) => {
510 return Err(ExecutorError::ExpectedNodeForExpand {
511 var: format!("{:?}", op.src),
512 found: value_kind(other),
513 });
514 }
515 None => continue,
516 };
517
518 let expansions = variable_length_expand(
519 self.ctx.storage,
520 src_node_id,
521 op.direction,
522 &op.types,
523 min_hops,
524 max_hops,
525 );
526
527 for result in expansions {
528 let mut new_row = row.clone();
529 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
530
531 if let Some(rel_var) = op.rel {
534 let rel_list = LoraValue::List(
536 result
537 .rel_ids
538 .into_iter()
539 .map(LoraValue::Relationship)
540 .collect(),
541 );
542 new_row.insert(rel_var, rel_list);
543 }
544
545 out.push(new_row);
546 }
547 }
548
549 Ok(out)
550 }
551
552 fn relationship_matches_properties(
553 &self,
554 actual: &Properties,
555 expected_expr: Option<&ResolvedExpr>,
556 row: &Row,
557 ) -> ExecResult<bool> {
558 let Some(expr) = expected_expr else {
559 return Ok(true);
560 };
561
562 let eval_ctx = EvalContext {
563 storage: self.ctx.storage,
564 params: &self.ctx.params,
565 };
566
567 let expected = eval_expr(expr, row, &eval_ctx);
568
569 let LoraValue::Map(expected_map) = expected else {
570 return Err(ExecutorError::ExpectedPropertyMap {
571 found: value_kind(&expected),
572 });
573 };
574
575 Ok(expected_map.iter().all(|(key, expected_value)| {
576 actual
577 .get(key)
578 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
579 .unwrap_or(false)
580 }))
581 }
582
583 fn exec_filter(&self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
584 let input_rows = self.execute_node(plan, op.input)?;
585 let eval_ctx = EvalContext {
586 storage: self.ctx.storage,
587 params: &self.ctx.params,
588 };
589
590 Ok(input_rows
591 .into_iter()
592 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
593 .collect())
594 }
595
596 fn exec_projection(&self, plan: &PhysicalPlan, op: &ProjectionExec) -> ExecResult<Vec<Row>> {
597 let input_rows = self.execute_node(plan, op.input)?;
598 let eval_ctx = EvalContext {
599 storage: self.ctx.storage,
600 params: &self.ctx.params,
601 };
602
603 let mut out = Vec::with_capacity(input_rows.len());
604
605 for row in input_rows {
606 if op.include_existing {
609 let mut projected = row;
610 for item in &op.items {
611 let value = eval_expr(&item.expr, &projected, &eval_ctx);
612 if let Some(err) = take_eval_error() {
613 return Err(ExecutorError::RuntimeError(err));
614 }
615 projected.insert_named(item.output, item.name.clone(), value);
616 }
617 out.push(projected);
618 } else {
619 let mut projected = Row::new();
620 for item in &op.items {
621 let value = eval_expr(&item.expr, &row, &eval_ctx);
622 if let Some(err) = take_eval_error() {
623 return Err(ExecutorError::RuntimeError(err));
624 }
625 projected.insert_named(item.output, item.name.clone(), value);
626 }
627 out.push(projected);
628 }
629 }
630
631 Ok(if op.distinct {
632 dedup_rows_by_vars(out)
633 } else {
634 out
635 })
636 }
637
638 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
639 match value {
640 LoraValue::Node(id) => self.hydrate_node(id),
641 LoraValue::Relationship(id) => self.hydrate_relationship(id),
642 LoraValue::List(values) => {
643 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
644 }
645 LoraValue::Map(map) => LoraValue::Map(
646 map.into_iter()
647 .map(|(k, v)| (k, self.hydrate_value(v)))
648 .collect(),
649 ),
650 other => other,
651 }
652 }
653
654 fn hydrate_node(&self, id: u64) -> LoraValue {
655 self.ctx
656 .storage
657 .with_node(id, hydrate_node_record)
658 .unwrap_or(LoraValue::Null)
659 }
660
661 fn hydrate_relationship(&self, id: u64) -> LoraValue {
662 self.ctx
663 .storage
664 .with_relationship(id, hydrate_relationship_record)
665 .unwrap_or(LoraValue::Null)
666 }
667
668 fn exec_unwind(&self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
669 let input_rows = self.execute_node(plan, op.input)?;
670 let eval_ctx = EvalContext {
671 storage: self.ctx.storage,
672 params: &self.ctx.params,
673 };
674
675 let mut out = Vec::new();
676
677 for row in input_rows {
678 match eval_expr(&op.expr, &row, &eval_ctx) {
679 LoraValue::List(values) => {
680 for value in values {
681 let mut new_row = row.clone();
682 new_row.insert(op.alias, value);
683 out.push(new_row);
684 }
685 }
686 LoraValue::Null => {}
687 other => {
688 let mut new_row = row;
689 new_row.insert(op.alias, other);
690 out.push(new_row);
691 }
692 }
693 }
694
695 Ok(out)
696 }
697
698 fn exec_hash_aggregation(
699 &self,
700 plan: &PhysicalPlan,
701 op: &HashAggregationExec,
702 ) -> ExecResult<Vec<Row>> {
703 let input_rows = self.execute_node(plan, op.input)?;
704 let eval_ctx = EvalContext {
705 storage: self.ctx.storage,
706 params: &self.ctx.params,
707 };
708
709 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
710
711 if op.group_by.is_empty() {
712 groups.insert(Vec::new(), input_rows);
713 } else {
714 for row in input_rows {
715 let key = op
716 .group_by
717 .iter()
718 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
719 .collect::<Vec<_>>();
720
721 groups.entry(key).or_default().push(row);
722 }
723 }
724
725 let mut out = Vec::new();
726
727 for rows in groups.into_values() {
728 let mut result = Row::new();
729
730 if let Some(first) = rows.first() {
731 for proj in &op.group_by {
732 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
733 result.insert_named(proj.output, proj.name.clone(), value);
734 }
735 }
736
737 for proj in &op.aggregates {
738 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
739 result.insert_named(proj.output, proj.name.clone(), value);
740 }
741
742 out.push(result);
743 }
744
745 Ok(out)
746 }
747
748 fn exec_sort(&self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
749 let mut rows = self.execute_node(plan, op.input)?;
750 let eval_ctx = EvalContext {
751 storage: self.ctx.storage,
752 params: &self.ctx.params,
753 };
754
755 rows.sort_by(|a, b| {
756 for item in &op.items {
757 let ord = compare_sort_item(item, a, b, &eval_ctx);
758 if ord != Ordering::Equal {
759 return ord;
760 }
761 }
762 Ordering::Equal
763 });
764
765 Ok(rows)
766 }
767
768 fn exec_limit(&self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
769 let mut rows = self.execute_node(plan, op.input)?;
770 let eval_ctx = EvalContext {
771 storage: self.ctx.storage,
772 params: &self.ctx.params,
773 };
774
775 let limit = op
776 .limit
777 .as_ref()
778 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
779 .unwrap_or(rows.len() as i64)
780 .max(0) as usize;
781
782 let skip = op
783 .skip
784 .as_ref()
785 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
786 .unwrap_or(0)
787 .max(0) as usize;
788
789 if skip >= rows.len() {
790 return Ok(Vec::new());
791 }
792
793 rows.drain(0..skip);
794 rows.truncate(limit);
795 Ok(rows)
796 }
797
798 fn exec_optional_match(
799 &self,
800 plan: &PhysicalPlan,
801 op: &OptionalMatchExec,
802 ) -> ExecResult<Vec<Row>> {
803 let input_rows = self.execute_node(plan, op.input)?;
804
805 let inner_rows = self.execute_node(plan, op.inner)?;
810
811 let mut out = Vec::new();
812
813 for input_row in input_rows {
814 let mut matched = false;
815
816 for inner_row in &inner_rows {
817 let compatible = input_row
819 .iter()
820 .all(|(var, val)| match inner_row.get(*var) {
821 Some(inner_val) => inner_val == val,
822 None => true,
823 });
824 if !compatible {
825 continue;
826 }
827
828 let mut merged = input_row.clone();
829 for (var, name, val) in inner_row.iter_named() {
830 if !merged.contains_key(*var) {
831 merged.insert_named(*var, name.into_owned(), val.clone());
832 }
833 }
834 out.push(merged);
835 matched = true;
836 }
837
838 if !matched {
839 let mut null_row = input_row;
840 for &var_id in &op.new_vars {
841 if !null_row.contains_key(var_id) {
842 null_row.insert(var_id, LoraValue::Null);
843 }
844 }
845 out.push(null_row);
846 }
847 }
848
849 Ok(out)
850 }
851
852 fn exec_path_build(&self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
853 let input_rows = self.execute_node(plan, op.input)?;
854 let mut rows: Vec<Row> = input_rows
855 .into_iter()
856 .map(|mut row| {
857 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, self.ctx.storage);
858 row.insert(op.output, path);
859 row
860 })
861 .collect();
862
863 if let Some(all) = op.shortest_path_all {
864 rows = filter_shortest_paths(rows, op.output, all);
865 }
866 Ok(rows)
867 }
868}
869
870fn properties_to_value_map(props: &Properties) -> LoraValue {
871 let mut map = BTreeMap::new();
872 for (k, v) in props.iter() {
873 map.insert(k.clone(), LoraValue::from(v));
874 }
875 LoraValue::Map(map)
876}
877
878pub struct MutableExecutionContext<'a, S: GraphStorageMut> {
879 pub storage: &'a mut S,
880 pub params: std::collections::BTreeMap<String, LoraValue>,
881}
882
883pub struct MutableExecutor<'a, S: GraphStorageMut> {
884 ctx: MutableExecutionContext<'a, S>,
885 deadline: Option<Instant>,
886}
887
888impl<'a, S: GraphStorageMut> MutableExecutor<'a, S> {
889 pub fn new(ctx: MutableExecutionContext<'a, S>) -> Self {
890 Self {
891 ctx,
892 deadline: None,
893 }
894 }
895
896 pub fn with_deadline(ctx: MutableExecutionContext<'a, S>, deadline: Option<Instant>) -> Self {
897 Self { ctx, deadline }
898 }
899
900 #[inline]
901 fn check_deadline(&self) -> ExecResult<()> {
902 if let Some(deadline) = self.deadline {
903 check_deadline_at(deadline)
904 } else {
905 Ok(())
906 }
907 }
908
909 #[inline]
910 fn check_loop_deadline(deadline: Option<Instant>) -> ExecResult<()> {
911 if let Some(deadline) = deadline {
912 check_deadline_at(deadline)
913 } else {
914 Ok(())
915 }
916 }
917
918 pub fn execute(
919 &mut self,
920 plan: &PhysicalPlan,
921 options: Option<ExecuteOptions>,
922 ) -> ExecResult<QueryResult> {
923 let rows = self.execute_rows(plan)?;
924 Ok(project_rows(rows, options.unwrap_or_default()))
925 }
926
927 pub fn execute_rows(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
928 self.check_deadline()?;
929 let _ = take_eval_error();
932
933 let rows = self.execute_node(plan, plan.root)?;
934 Ok(rows
935 .into_iter()
936 .map(|row| self.hydrate_row(row))
937 .collect::<Vec<_>>())
938 }
939
940 pub fn execute_compiled(
942 &mut self,
943 compiled: &CompiledQuery,
944 options: Option<ExecuteOptions>,
945 ) -> ExecResult<QueryResult> {
946 let rows = self.execute_compiled_rows(compiled)?;
947 Ok(project_rows(rows, options.unwrap_or_default()))
948 }
949
950 pub fn execute_compiled_rows(&mut self, compiled: &CompiledQuery) -> ExecResult<Vec<Row>> {
951 self.check_deadline()?;
952 if compiled.unions.is_empty() {
953 return self.execute_rows(&compiled.physical);
954 }
955
956 let _ = take_eval_error();
957
958 let mut all_rows = self.execute_and_hydrate(&compiled.physical)?;
960
961 let mut needs_dedup = false;
964
965 for branch in &compiled.unions {
966 self.check_deadline()?;
967 let branch_rows = self.execute_and_hydrate(&branch.physical)?;
968 all_rows.extend(branch_rows);
969
970 if !branch.all {
971 needs_dedup = true;
972 }
973 }
974
975 if needs_dedup {
976 all_rows = dedup_rows(all_rows);
977 }
978
979 Ok(all_rows)
980 }
981
982 fn execute_and_hydrate(&mut self, plan: &PhysicalPlan) -> ExecResult<Vec<Row>> {
983 self.check_deadline()?;
984 let rows = self.execute_node(plan, plan.root)?;
985 Ok(rows.into_iter().map(|row| self.hydrate_row(row)).collect())
986 }
987
988 pub(crate) fn hydrate_row(&self, row: Row) -> Row {
989 let mut out = Row::new();
990
991 for (var, name, value) in row.into_iter_named() {
992 out.insert_named(var, name, self.hydrate_value(value));
993 }
994
995 out
996 }
997
998 fn execute_node(
999 &mut self,
1000 plan: &PhysicalPlan,
1001 node_id: PhysicalNodeId,
1002 ) -> ExecResult<Vec<Row>> {
1003 self.check_deadline()?;
1004 trace!("mutable execute_node start: node_id={node_id:?}");
1005
1006 let result = match &plan.nodes[node_id] {
1007 PhysicalOp::Argument(op) => self.exec_argument(op),
1008 PhysicalOp::NodeScan(op) => self.exec_node_scan(plan, op),
1009 PhysicalOp::NodeByLabelScan(op) => self.exec_node_by_label_scan(plan, op),
1010 PhysicalOp::NodeByPropertyScan(op) => self.exec_node_by_property_scan(plan, op),
1011 PhysicalOp::Expand(op) => self.exec_expand(plan, op),
1012 PhysicalOp::Filter(op) => self.exec_filter(plan, op),
1013 PhysicalOp::Projection(op) => self.exec_projection(plan, op),
1014 PhysicalOp::Unwind(op) => self.exec_unwind(plan, op),
1015 PhysicalOp::HashAggregation(op) => self.exec_hash_aggregation(plan, op),
1016 PhysicalOp::Sort(op) => self.exec_sort(plan, op),
1017 PhysicalOp::Limit(op) => self.exec_limit(plan, op),
1018 PhysicalOp::Create(op) => self.exec_create(plan, op),
1019 PhysicalOp::Merge(op) => self.exec_merge(plan, op),
1020 PhysicalOp::Delete(op) => self.exec_delete(plan, op),
1021 PhysicalOp::Set(op) => self.exec_set(plan, op),
1022 PhysicalOp::Remove(op) => self.exec_remove(plan, op),
1023 PhysicalOp::OptionalMatch(op) => self.exec_optional_match(plan, op),
1024 PhysicalOp::PathBuild(op) => self.exec_path_build(plan, op),
1025 };
1026
1027 match &result {
1028 Ok(rows) => trace!(
1029 "mutable execute_node ok: node_id={node_id:?}, rows={}",
1030 rows.len()
1031 ),
1032 Err(err) => error!("mutable execute_node failed: node_id={node_id:?}, error={err}"),
1033 }
1034
1035 result
1036 }
1037
1038 fn exec_argument(&self, _op: &ArgumentExec) -> ExecResult<Vec<Row>> {
1039 Ok(vec![Row::new()])
1040 }
1041
1042 fn exec_node_scan(&mut self, plan: &PhysicalPlan, op: &NodeScanExec) -> ExecResult<Vec<Row>> {
1043 let base_rows = match op.input {
1044 Some(input) => self.execute_node(plan, input)?,
1045 None => vec![Row::new()],
1046 };
1047
1048 let node_ids = self.ctx.storage.all_node_ids();
1049 let mut out = Vec::new();
1050
1051 let deadline = self.deadline;
1052 for row in base_rows {
1053 Self::check_loop_deadline(deadline)?;
1054 if let Some(existing) = row.get(op.var) {
1055 match existing {
1056 LoraValue::Node(existing_id) => {
1057 if self.ctx.storage.has_node(*existing_id) {
1058 out.push(row);
1059 }
1060 }
1061 other => {
1062 return Err(ExecutorError::ExpectedNodeForExpand {
1063 var: format!("{:?}", op.var),
1064 found: value_kind(other),
1065 });
1066 }
1067 }
1068 continue;
1069 }
1070
1071 for &id in &node_ids {
1072 Self::check_loop_deadline(deadline)?;
1073 let mut new_row = row.clone();
1074 new_row.insert(op.var, LoraValue::Node(id));
1075 out.push(new_row);
1076 }
1077 }
1078
1079 Ok(out)
1080 }
1081
1082 fn exec_node_by_label_scan(
1083 &mut self,
1084 plan: &PhysicalPlan,
1085 op: &NodeByLabelScanExec,
1086 ) -> ExecResult<Vec<Row>> {
1087 let base_rows = match op.input {
1088 Some(input) => self.execute_node(plan, input)?,
1089 None => vec![Row::new()],
1090 };
1091
1092 let candidate_ids = scan_node_ids_for_label_groups(&*self.ctx.storage, &op.labels);
1093 let candidates_prefiltered = label_group_candidates_prefiltered(&op.labels);
1094 let mut out = Vec::new();
1095
1096 let deadline = self.deadline;
1097 for row in base_rows {
1098 Self::check_loop_deadline(deadline)?;
1099 if let Some(existing) = row.get(op.var) {
1100 match existing {
1101 LoraValue::Node(existing_id) => {
1102 let labels_ok = self
1103 .ctx
1104 .storage
1105 .with_node(*existing_id, |n| {
1106 node_matches_label_groups(&n.labels, &op.labels)
1107 })
1108 .unwrap_or(false);
1109 if labels_ok {
1110 out.push(row);
1111 }
1112 }
1113 other => {
1114 return Err(ExecutorError::ExpectedNodeForExpand {
1115 var: format!("{:?}", op.var),
1116 found: value_kind(other),
1117 });
1118 }
1119 }
1120 continue;
1121 }
1122
1123 for &id in &candidate_ids {
1124 Self::check_loop_deadline(deadline)?;
1125 if !candidates_prefiltered {
1126 let labels_ok = self
1127 .ctx
1128 .storage
1129 .with_node(id, |n| node_matches_label_groups(&n.labels, &op.labels))
1130 .unwrap_or(false);
1131 if !labels_ok {
1132 continue;
1133 }
1134 }
1135 let mut new_row = row.clone();
1136 new_row.insert(op.var, LoraValue::Node(id));
1137 out.push(new_row);
1138 }
1139 }
1140
1141 Ok(out)
1142 }
1143
1144 fn exec_node_by_property_scan(
1145 &mut self,
1146 plan: &PhysicalPlan,
1147 op: &NodeByPropertyScanExec,
1148 ) -> ExecResult<Vec<Row>> {
1149 let base_rows = match op.input {
1150 Some(input) => self.execute_node(plan, input)?,
1151 None => vec![Row::new()],
1152 };
1153
1154 let mut out = Vec::new();
1155
1156 let deadline = self.deadline;
1157 for row in base_rows {
1158 Self::check_loop_deadline(deadline)?;
1159 let expected = {
1160 let eval_ctx = EvalContext {
1161 storage: &*self.ctx.storage,
1162 params: &self.ctx.params,
1163 };
1164 eval_expr(&op.value, &row, &eval_ctx)
1165 };
1166
1167 if let Some(existing) = row.get(op.var) {
1168 match existing {
1169 LoraValue::Node(existing_id) => {
1170 if node_matches_property_filter(
1171 &*self.ctx.storage,
1172 *existing_id,
1173 &op.labels,
1174 &op.key,
1175 &expected,
1176 ) {
1177 out.push(row);
1178 }
1179 }
1180 other => {
1181 return Err(ExecutorError::ExpectedNodeForExpand {
1182 var: format!("{:?}", op.var),
1183 found: value_kind(other),
1184 });
1185 }
1186 }
1187 continue;
1188 }
1189
1190 let candidates = indexed_node_property_candidates(
1191 &*self.ctx.storage,
1192 &op.labels,
1193 &op.key,
1194 &expected,
1195 );
1196 for id in candidates.ids {
1197 Self::check_loop_deadline(deadline)?;
1198 if !candidates.prefiltered
1199 && !node_matches_property_filter(
1200 &*self.ctx.storage,
1201 id,
1202 &op.labels,
1203 &op.key,
1204 &expected,
1205 )
1206 {
1207 continue;
1208 }
1209 let mut new_row = row.clone();
1210 new_row.insert(op.var, LoraValue::Node(id));
1211 out.push(new_row);
1212 }
1213 }
1214
1215 Ok(out)
1216 }
1217
1218 fn exec_expand(&mut self, plan: &PhysicalPlan, op: &ExpandExec) -> ExecResult<Vec<Row>> {
1219 if let Some(range) = &op.range {
1221 return self.exec_expand_var_len(plan, op, range);
1222 }
1223
1224 let input_rows = self.execute_node(plan, op.input)?;
1225 let mut out = Vec::new();
1226
1227 for row in input_rows {
1228 let src_node_id = match row.get(op.src) {
1229 Some(LoraValue::Node(id)) => *id,
1230 Some(other) => {
1231 return Err(ExecutorError::ExpectedNodeForExpand {
1232 var: format!("{:?}", op.src),
1233 found: value_kind(other),
1234 });
1235 }
1236 None => continue,
1237 };
1238
1239 for (rel_id, dst_id) in
1240 self.ctx
1241 .storage
1242 .expand_ids(src_node_id, op.direction, &op.types)
1243 {
1244 if let Some(expr) = op.rel_properties.as_ref() {
1245 let actual_props = self
1246 .ctx
1247 .storage
1248 .with_relationship(rel_id, |rel| rel.properties.clone());
1249 let matches = match actual_props {
1250 Some(props) => {
1251 self.relationship_matches_properties(&props, Some(expr), &row)?
1252 }
1253 None => false,
1254 };
1255 if !matches {
1256 continue;
1257 }
1258 }
1259
1260 if let Some(existing_dst) = row.get(op.dst) {
1261 match existing_dst {
1262 LoraValue::Node(existing_id) if *existing_id == dst_id => {}
1263 LoraValue::Node(_) => continue,
1264 other => {
1265 return Err(ExecutorError::ExpectedNodeForExpand {
1266 var: format!("{:?}", op.dst),
1267 found: value_kind(other),
1268 });
1269 }
1270 }
1271 }
1272
1273 if let Some(rel_var) = op.rel {
1274 if let Some(existing_rel) = row.get(rel_var) {
1275 match existing_rel {
1276 LoraValue::Relationship(existing_id) if *existing_id == rel_id => {}
1277 LoraValue::Relationship(_) => continue,
1278 other => {
1279 return Err(ExecutorError::ExpectedRelationshipForExpand {
1280 var: format!("{:?}", rel_var),
1281 found: value_kind(other),
1282 });
1283 }
1284 }
1285 }
1286 }
1287
1288 let mut new_row = row.clone();
1289
1290 if !new_row.contains_key(op.dst) {
1291 new_row.insert(op.dst, LoraValue::Node(dst_id));
1292 }
1293
1294 if let Some(rel_var) = op.rel {
1295 if !new_row.contains_key(rel_var) {
1296 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
1297 }
1298 }
1299
1300 out.push(new_row);
1301 }
1302 }
1303
1304 Ok(out)
1305 }
1306
1307 fn exec_expand_var_len(
1308 &mut self,
1309 plan: &PhysicalPlan,
1310 op: &ExpandExec,
1311 range: &RangeLiteral,
1312 ) -> ExecResult<Vec<Row>> {
1313 let input_rows = self.execute_node(plan, op.input)?;
1314 let (min_hops, max_hops) = resolve_range(range);
1315 let mut out = Vec::new();
1316
1317 for row in input_rows {
1318 let src_node_id = match row.get(op.src) {
1319 Some(LoraValue::Node(id)) => *id,
1320 Some(other) => {
1321 return Err(ExecutorError::ExpectedNodeForExpand {
1322 var: format!("{:?}", op.src),
1323 found: value_kind(other),
1324 });
1325 }
1326 None => continue,
1327 };
1328
1329 let expansions = variable_length_expand(
1330 &*self.ctx.storage,
1331 src_node_id,
1332 op.direction,
1333 &op.types,
1334 min_hops,
1335 max_hops,
1336 );
1337
1338 for result in expansions {
1339 let mut new_row = row.clone();
1340 new_row.insert(op.dst, LoraValue::Node(result.dst_node_id));
1341
1342 if let Some(rel_var) = op.rel {
1343 let rel_list = LoraValue::List(
1345 result
1346 .rel_ids
1347 .into_iter()
1348 .map(LoraValue::Relationship)
1349 .collect(),
1350 );
1351 new_row.insert(rel_var, rel_list);
1352 }
1353
1354 out.push(new_row);
1355 }
1356 }
1357
1358 Ok(out)
1359 }
1360
1361 fn relationship_matches_properties(
1362 &self,
1363 actual: &Properties,
1364 expected_expr: Option<&ResolvedExpr>,
1365 row: &Row,
1366 ) -> ExecResult<bool> {
1367 let Some(expr) = expected_expr else {
1368 return Ok(true);
1369 };
1370
1371 let eval_ctx = EvalContext {
1372 storage: &*self.ctx.storage,
1373 params: &self.ctx.params,
1374 };
1375
1376 let expected = eval_expr(expr, row, &eval_ctx);
1377
1378 let LoraValue::Map(expected_map) = expected else {
1379 return Err(ExecutorError::ExpectedPropertyMap {
1380 found: value_kind(&expected),
1381 });
1382 };
1383
1384 Ok(expected_map.iter().all(|(key, expected_value)| {
1385 actual
1386 .get(key)
1387 .map(|actual_value| value_matches_property_value(expected_value, actual_value))
1388 .unwrap_or(false)
1389 }))
1390 }
1391
1392 fn exec_filter(&mut self, plan: &PhysicalPlan, op: &FilterExec) -> ExecResult<Vec<Row>> {
1393 let input_rows = self.execute_node(plan, op.input)?;
1394 let eval_ctx = EvalContext {
1395 storage: &*self.ctx.storage,
1396 params: &self.ctx.params,
1397 };
1398
1399 Ok(input_rows
1400 .into_iter()
1401 .filter(|row| eval_expr(&op.predicate, row, &eval_ctx).is_truthy())
1402 .collect())
1403 }
1404
1405 fn exec_projection(
1406 &mut self,
1407 plan: &PhysicalPlan,
1408 op: &ProjectionExec,
1409 ) -> ExecResult<Vec<Row>> {
1410 let input_rows = self.execute_node(plan, op.input)?;
1411 let eval_ctx = EvalContext {
1412 storage: &*self.ctx.storage,
1413 params: &self.ctx.params,
1414 };
1415
1416 let mut out = Vec::with_capacity(input_rows.len());
1417
1418 for row in input_rows {
1419 if op.include_existing {
1420 let mut projected = row;
1421 for item in &op.items {
1422 let value = eval_expr(&item.expr, &projected, &eval_ctx);
1423 if let Some(err) = take_eval_error() {
1424 return Err(ExecutorError::RuntimeError(err));
1425 }
1426 projected.insert_named(item.output, item.name.clone(), value);
1427 }
1428 out.push(projected);
1429 } else {
1430 let mut projected = Row::new();
1431 for item in &op.items {
1432 let value = eval_expr(&item.expr, &row, &eval_ctx);
1433 if let Some(err) = take_eval_error() {
1434 return Err(ExecutorError::RuntimeError(err));
1435 }
1436 projected.insert_named(item.output, item.name.clone(), value);
1437 }
1438 out.push(projected);
1439 }
1440 }
1441
1442 Ok(if op.distinct {
1443 dedup_rows_by_vars(out)
1444 } else {
1445 out
1446 })
1447 }
1448
1449 fn hydrate_value(&self, value: LoraValue) -> LoraValue {
1450 match value {
1451 LoraValue::Node(id) => self.hydrate_node(id),
1452 LoraValue::Relationship(id) => self.hydrate_relationship(id),
1453 LoraValue::List(values) => {
1454 LoraValue::List(values.into_iter().map(|v| self.hydrate_value(v)).collect())
1455 }
1456 LoraValue::Map(map) => LoraValue::Map(
1457 map.into_iter()
1458 .map(|(k, v)| (k, self.hydrate_value(v)))
1459 .collect(),
1460 ),
1461 other => other,
1462 }
1463 }
1464
1465 fn hydrate_node(&self, id: u64) -> LoraValue {
1466 self.ctx
1467 .storage
1468 .with_node(id, hydrate_node_record)
1469 .unwrap_or(LoraValue::Null)
1470 }
1471
1472 fn hydrate_relationship(&self, id: u64) -> LoraValue {
1473 self.ctx
1474 .storage
1475 .with_relationship(id, hydrate_relationship_record)
1476 .unwrap_or(LoraValue::Null)
1477 }
1478
1479 fn exec_unwind(&mut self, plan: &PhysicalPlan, op: &UnwindExec) -> ExecResult<Vec<Row>> {
1480 let input_rows = self.execute_node(plan, op.input)?;
1481 let eval_ctx = EvalContext {
1482 storage: &*self.ctx.storage,
1483 params: &self.ctx.params,
1484 };
1485
1486 let mut out = Vec::new();
1487
1488 for row in input_rows {
1489 match eval_expr(&op.expr, &row, &eval_ctx) {
1490 LoraValue::List(values) => {
1491 for value in values {
1492 let mut new_row = row.clone();
1493 new_row.insert(op.alias, value);
1494 out.push(new_row);
1495 }
1496 }
1497 LoraValue::Null => {}
1498 other => {
1499 let mut new_row = row;
1500 new_row.insert(op.alias, other);
1501 out.push(new_row);
1502 }
1503 }
1504 }
1505
1506 Ok(out)
1507 }
1508
1509 fn exec_hash_aggregation(
1510 &mut self,
1511 plan: &PhysicalPlan,
1512 op: &HashAggregationExec,
1513 ) -> ExecResult<Vec<Row>> {
1514 let input_rows = self.execute_node(plan, op.input)?;
1515 let eval_ctx = EvalContext {
1516 storage: &*self.ctx.storage,
1517 params: &self.ctx.params,
1518 };
1519
1520 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1521
1522 if op.group_by.is_empty() {
1523 groups.insert(Vec::new(), input_rows);
1524 } else {
1525 for row in input_rows {
1526 let key = op
1527 .group_by
1528 .iter()
1529 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1530 .collect::<Vec<_>>();
1531
1532 groups.entry(key).or_default().push(row);
1533 }
1534 }
1535
1536 let mut out = Vec::new();
1537
1538 for rows in groups.into_values() {
1539 let mut result = Row::new();
1540
1541 if let Some(first) = rows.first() {
1542 for proj in &op.group_by {
1543 let value = self.hydrate_value(eval_expr(&proj.expr, first, &eval_ctx));
1544 result.insert_named(proj.output, proj.name.clone(), value);
1545 }
1546 }
1547
1548 for proj in &op.aggregates {
1549 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1550 result.insert_named(proj.output, proj.name.clone(), value);
1551 }
1552
1553 out.push(result);
1554 }
1555
1556 Ok(out)
1557 }
1558
1559 fn exec_sort(&mut self, plan: &PhysicalPlan, op: &SortExec) -> ExecResult<Vec<Row>> {
1560 let mut rows = self.execute_node(plan, op.input)?;
1561 let eval_ctx = EvalContext {
1562 storage: &*self.ctx.storage,
1563 params: &self.ctx.params,
1564 };
1565
1566 rows.sort_by(|a, b| {
1567 for item in &op.items {
1568 let ord = compare_sort_item(item, a, b, &eval_ctx);
1569 if ord != Ordering::Equal {
1570 return ord;
1571 }
1572 }
1573 Ordering::Equal
1574 });
1575
1576 Ok(rows)
1577 }
1578
1579 fn exec_limit(&mut self, plan: &PhysicalPlan, op: &LimitExec) -> ExecResult<Vec<Row>> {
1580 let mut rows = self.execute_node(plan, op.input)?;
1581 let eval_ctx = EvalContext {
1582 storage: &*self.ctx.storage,
1583 params: &self.ctx.params,
1584 };
1585
1586 let limit = op
1587 .limit
1588 .as_ref()
1589 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1590 .unwrap_or(rows.len() as i64)
1591 .max(0) as usize;
1592
1593 let skip = op
1594 .skip
1595 .as_ref()
1596 .and_then(|e| eval_expr(e, &Row::new(), &eval_ctx).as_i64())
1597 .unwrap_or(0)
1598 .max(0) as usize;
1599
1600 if skip >= rows.len() {
1601 return Ok(Vec::new());
1602 }
1603
1604 rows.drain(0..skip);
1605 rows.truncate(limit);
1606 Ok(rows)
1607 }
1608
1609 fn exec_optional_match(
1610 &mut self,
1611 plan: &PhysicalPlan,
1612 op: &OptionalMatchExec,
1613 ) -> ExecResult<Vec<Row>> {
1614 let input_rows = self.execute_node(plan, op.input)?;
1615
1616 let inner_rows = self.execute_node(plan, op.inner)?;
1618
1619 let mut out = Vec::new();
1620
1621 for input_row in input_rows {
1622 let mut matched = false;
1623
1624 for inner_row in &inner_rows {
1625 let compatible = input_row
1626 .iter()
1627 .all(|(var, val)| match inner_row.get(*var) {
1628 Some(inner_val) => inner_val == val,
1629 None => true,
1630 });
1631 if !compatible {
1632 continue;
1633 }
1634
1635 let mut merged = input_row.clone();
1636 for (var, name, val) in inner_row.iter_named() {
1637 if !merged.contains_key(*var) {
1638 merged.insert_named(*var, name.into_owned(), val.clone());
1639 }
1640 }
1641 out.push(merged);
1642 matched = true;
1643 }
1644
1645 if !matched {
1646 let mut null_row = input_row;
1647 for &var_id in &op.new_vars {
1648 if !null_row.contains_key(var_id) {
1649 null_row.insert(var_id, LoraValue::Null);
1650 }
1651 }
1652 out.push(null_row);
1653 }
1654 }
1655
1656 Ok(out)
1657 }
1658
1659 fn exec_path_build(&mut self, plan: &PhysicalPlan, op: &PathBuildExec) -> ExecResult<Vec<Row>> {
1660 let input_rows = self.execute_node(plan, op.input)?;
1661 let mut rows: Vec<Row> = input_rows
1662 .into_iter()
1663 .map(|mut row| {
1664 let path = build_path_value(&row, &op.node_vars, &op.rel_vars, &*self.ctx.storage);
1665 row.insert(op.output, path);
1666 row
1667 })
1668 .collect();
1669
1670 if let Some(all) = op.shortest_path_all {
1671 rows = filter_shortest_paths(rows, op.output, all);
1672 }
1673 Ok(rows)
1674 }
1675
1676 fn exec_create(&mut self, plan: &PhysicalPlan, op: &CreateExec) -> ExecResult<Vec<Row>> {
1677 if crate::pull::subtree_is_fully_streaming(plan, op.input) {
1683 return self.exec_create_streaming_input(plan, op);
1684 }
1685
1686 let input_rows = self.execute_node(plan, op.input)?;
1687 let mut out = Vec::with_capacity(input_rows.len());
1688
1689 for mut row in input_rows {
1690 self.apply_create_pattern(&mut row, &op.pattern)?;
1691 out.push(row);
1692 }
1693
1694 Ok(out)
1695 }
1696
1697 fn streaming_apply<F>(
1717 &mut self,
1718 plan: &PhysicalPlan,
1719 input: PhysicalNodeId,
1720 mut apply: F,
1721 ) -> ExecResult<Vec<Row>>
1722 where
1723 F: FnMut(&mut Self, &mut Row) -> ExecResult<()>,
1724 {
1725 use std::sync::Arc;
1726
1727 let storage_ptr: *mut S = self.ctx.storage as *mut S;
1728 let params = Arc::new(self.ctx.params.clone());
1729
1730 let storage_ref: &S = unsafe { &*storage_ptr };
1732 let mut upstream = crate::pull::build_streaming(plan, input, storage_ref, params)?;
1733
1734 let mut out = Vec::new();
1735 while let Some(mut row) = upstream.next_row()? {
1736 apply(self, &mut row)?;
1737 out.push(row);
1738 }
1739
1740 Ok(out)
1741 }
1742
1743 fn exec_create_streaming_input(
1746 &mut self,
1747 plan: &PhysicalPlan,
1748 op: &CreateExec,
1749 ) -> ExecResult<Vec<Row>> {
1750 self.streaming_apply(plan, op.input, |this, row| {
1751 this.apply_create_pattern(row, &op.pattern)
1752 })
1753 }
1754
1755 fn apply_remove_item(&mut self, row: &Row, item: &ResolvedRemoveItem) -> ExecResult<()> {
1756 match item {
1757 ResolvedRemoveItem::Labels { variable, labels } => match row.get(*variable) {
1758 Some(LoraValue::Node(node_id)) => {
1759 let node_id = *node_id;
1760 for label in labels {
1761 self.ctx.storage.remove_node_label(node_id, label);
1762 }
1763 Ok(())
1764 }
1765 Some(other) => Err(ExecutorError::ExpectedNodeForRemoveLabels {
1766 found: value_kind(other),
1767 }),
1768 None => Err(ExecutorError::UnboundVariableForRemove {
1769 var: format!("{variable:?}"),
1770 }),
1771 },
1772
1773 ResolvedRemoveItem::Property { expr } => self.remove_property_from_expr(row, expr),
1774 }
1775 }
1776
1777 fn delete_value(&mut self, value: LoraValue, detach: bool) -> ExecResult<()> {
1778 match value {
1779 LoraValue::Null => Ok(()),
1780
1781 LoraValue::Node(node_id) => {
1782 if detach {
1783 self.ctx.storage.detach_delete_node(node_id);
1784 Ok(())
1785 } else {
1786 let ok = self.ctx.storage.delete_node(node_id);
1787 if ok {
1788 Ok(())
1789 } else {
1790 Err(ExecutorError::DeleteNodeWithRelationships { node_id })
1791 }
1792 }
1793 }
1794
1795 LoraValue::Relationship(rel_id) => {
1796 let ok = self.ctx.storage.delete_relationship(rel_id);
1797 if ok {
1798 Ok(())
1799 } else {
1800 Err(ExecutorError::DeleteRelationshipFailed { rel_id })
1801 }
1802 }
1803
1804 LoraValue::List(values) => {
1805 for v in values {
1806 self.delete_value(v, detach)?;
1807 }
1808 Ok(())
1809 }
1810
1811 other => Err(ExecutorError::InvalidDeleteTarget {
1812 found: value_kind(&other),
1813 }),
1814 }
1815 }
1816
1817 fn exec_merge(&mut self, plan: &PhysicalPlan, op: &MergeExec) -> ExecResult<Vec<Row>> {
1818 if crate::pull::subtree_is_fully_streaming(plan, op.input) {
1823 return self.streaming_apply(plan, op.input, |this, row| {
1824 let already_bound = this.pattern_part_is_bound(row, &op.pattern_part);
1825 let matched = if already_bound {
1826 true
1827 } else {
1828 this.try_match_merge_pattern(row, &op.pattern_part)?
1829 };
1830 if !matched {
1831 this.apply_create_pattern_part(row, &op.pattern_part)?;
1832 }
1833 for action in &op.actions {
1834 if action.on_match == matched {
1835 for item in &action.set.items {
1836 this.apply_set_item(row, item)?;
1837 }
1838 }
1839 }
1840 Ok(())
1841 });
1842 }
1843
1844 let input_rows = self.execute_node(plan, op.input)?;
1845 let mut out = Vec::with_capacity(input_rows.len());
1846
1847 for mut row in input_rows {
1848 let already_bound = self.pattern_part_is_bound(&row, &op.pattern_part);
1850
1851 let matched = if already_bound {
1852 true
1853 } else {
1854 self.try_match_merge_pattern(&mut row, &op.pattern_part)?
1856 };
1857
1858 if !matched {
1859 self.apply_create_pattern_part(&mut row, &op.pattern_part)?;
1860 }
1861
1862 for action in &op.actions {
1863 if action.on_match == matched {
1864 for item in &action.set.items {
1865 self.apply_set_item(&row, item)?;
1866 }
1867 }
1868 }
1869
1870 out.push(row);
1871 }
1872
1873 Ok(out)
1874 }
1875
1876 fn try_match_merge_pattern(
1879 &self,
1880 row: &mut Row,
1881 part: &ResolvedPatternPart,
1882 ) -> ExecResult<bool> {
1883 match &part.element {
1884 ResolvedPatternElement::Node {
1885 var,
1886 labels,
1887 properties,
1888 } => {
1889 let candidate_ids = if labels.is_empty() {
1892 self.ctx.storage.all_node_ids()
1893 } else {
1894 scan_node_ids_for_label_groups(&*self.ctx.storage, labels)
1895 };
1896
1897 let eval_ctx = EvalContext {
1899 storage: &*self.ctx.storage,
1900 params: &self.ctx.params,
1901 };
1902 let expected_props = properties.as_ref().map(|e| eval_expr(e, row, &eval_ctx));
1903
1904 for id in candidate_ids {
1905 let matched = self
1906 .ctx
1907 .storage
1908 .with_node(id, |node| {
1909 if !node_matches_label_groups(&node.labels, labels) {
1910 return false;
1911 }
1912 if let Some(LoraValue::Map(expected)) = &expected_props {
1913 let all_match = expected.iter().all(|(key, expected_value)| {
1914 node.properties
1915 .get(key)
1916 .map(|actual| {
1917 value_matches_property_value(expected_value, actual)
1918 })
1919 .unwrap_or(false)
1920 });
1921 if !all_match {
1922 return false;
1923 }
1924 }
1925 true
1926 })
1927 .unwrap_or(false);
1928
1929 if !matched {
1930 continue;
1931 }
1932
1933 if let Some(var_id) = var {
1935 row.insert(*var_id, LoraValue::Node(id));
1936 }
1937 return Ok(true);
1938 }
1939
1940 Ok(false)
1941 }
1942
1943 ResolvedPatternElement::ShortestPath { .. } => {
1944 Ok(false)
1946 }
1947
1948 ResolvedPatternElement::NodeChain { head, chain } => {
1949 let head_node_id = if let Some(var_id) = head.var {
1951 if let Some(LoraValue::Node(id)) = row.get(var_id) {
1952 *id
1953 } else {
1954 let node_matched = self.try_match_merge_pattern(
1956 row,
1957 &ResolvedPatternPart {
1958 binding: None,
1959 element: ResolvedPatternElement::Node {
1960 var: head.var,
1961 labels: head.labels.clone(),
1962 properties: head.properties.clone(),
1963 },
1964 },
1965 )?;
1966 if !node_matched {
1967 return Ok(false);
1968 }
1969 match row.get(var_id) {
1970 Some(LoraValue::Node(id)) => *id,
1971 _ => return Ok(false),
1972 }
1973 }
1974 } else {
1975 return Ok(false);
1976 };
1977
1978 let mut current_node_id = head_node_id;
1979
1980 for step in chain {
1981 let eval_ctx = EvalContext {
1982 storage: &*self.ctx.storage,
1983 params: &self.ctx.params,
1984 };
1985
1986 let _ = step.rel.types.first();
1987 let direction = step.rel.direction;
1988
1989 let edges =
1992 self.ctx
1993 .storage
1994 .expand_ids(current_node_id, direction, &step.rel.types);
1995
1996 let mut found = false;
1998 for (rel_id, node_id) in edges {
1999 let node_ok = self
2001 .ctx
2002 .storage
2003 .with_node(node_id, |node_rec| {
2004 if !node_matches_label_groups(&node_rec.labels, &step.node.labels) {
2005 return false;
2006 }
2007 if let Some(props_expr) = &step.node.properties {
2008 let expected = eval_expr(props_expr, row, &eval_ctx);
2009 if let LoraValue::Map(expected_map) = &expected {
2010 let all_match =
2011 expected_map.iter().all(|(key, expected_val)| {
2012 node_rec
2013 .properties
2014 .get(key)
2015 .map(|actual| {
2016 value_matches_property_value(
2017 expected_val,
2018 actual,
2019 )
2020 })
2021 .unwrap_or(false)
2022 });
2023 if !all_match {
2024 return false;
2025 }
2026 }
2027 }
2028 true
2029 })
2030 .unwrap_or(false);
2031 if !node_ok {
2032 continue;
2033 }
2034
2035 let rel_ok = self
2037 .ctx
2038 .storage
2039 .with_relationship(rel_id, |rel_rec| {
2040 if let Some(rel_props_expr) = &step.rel.properties {
2041 let expected = eval_expr(rel_props_expr, row, &eval_ctx);
2042 if let LoraValue::Map(expected_map) = &expected {
2043 let all_match =
2044 expected_map.iter().all(|(key, expected_val)| {
2045 rel_rec
2046 .properties
2047 .get(key)
2048 .map(|actual| {
2049 value_matches_property_value(
2050 expected_val,
2051 actual,
2052 )
2053 })
2054 .unwrap_or(false)
2055 });
2056 if !all_match {
2057 return false;
2058 }
2059 }
2060 }
2061 true
2062 })
2063 .unwrap_or(false);
2064 if !rel_ok {
2065 continue;
2066 }
2067
2068 if let Some(rel_var) = step.rel.var {
2070 row.insert(rel_var, LoraValue::Relationship(rel_id));
2071 }
2072 if let Some(node_var) = step.node.var {
2073 row.insert(node_var, LoraValue::Node(node_id));
2074 }
2075 current_node_id = node_id;
2076 found = true;
2077 break;
2078 }
2079
2080 if !found {
2081 return Ok(false);
2082 }
2083 }
2084
2085 Ok(true)
2086 }
2087 }
2088 }
2089
2090 fn exec_delete(&mut self, plan: &PhysicalPlan, op: &DeleteExec) -> ExecResult<Vec<Row>> {
2091 if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2092 let detach = op.detach;
2093 return self.streaming_apply(plan, op.input, |this, row| {
2094 for expr in &op.expressions {
2095 let value = {
2096 let eval_ctx = EvalContext {
2097 storage: &*this.ctx.storage,
2098 params: &this.ctx.params,
2099 };
2100 eval_expr(expr, row, &eval_ctx)
2101 };
2102 this.delete_value(value, detach)?;
2103 }
2104 Ok(())
2105 });
2106 }
2107
2108 let input_rows = self.execute_node(plan, op.input)?;
2109
2110 for row in &input_rows {
2111 for expr in &op.expressions {
2112 let value = {
2113 let eval_ctx = EvalContext {
2114 storage: &*self.ctx.storage,
2115 params: &self.ctx.params,
2116 };
2117 eval_expr(expr, row, &eval_ctx)
2118 };
2119
2120 self.delete_value(value, op.detach)?;
2121 }
2122 }
2123
2124 Ok(input_rows)
2125 }
2126
2127 fn exec_set(&mut self, plan: &PhysicalPlan, op: &SetExec) -> ExecResult<Vec<Row>> {
2128 if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2129 return self.streaming_apply(plan, op.input, |this, row| {
2130 for item in &op.items {
2131 this.apply_set_item(row, item)?;
2132 }
2133 Ok(())
2134 });
2135 }
2136
2137 let input_rows = self.execute_node(plan, op.input)?;
2138
2139 for row in &input_rows {
2140 for item in &op.items {
2141 self.apply_set_item(row, item)?;
2142 }
2143 }
2144
2145 Ok(input_rows)
2146 }
2147
2148 fn exec_remove(&mut self, plan: &PhysicalPlan, op: &RemoveExec) -> ExecResult<Vec<Row>> {
2149 if crate::pull::subtree_is_fully_streaming(plan, op.input) {
2150 return self.streaming_apply(plan, op.input, |this, row| {
2151 for item in &op.items {
2152 this.apply_remove_item(row, item)?;
2153 }
2154 Ok(())
2155 });
2156 }
2157
2158 let input_rows = self.execute_node(plan, op.input)?;
2159
2160 for row in &input_rows {
2161 for item in &op.items {
2162 self.apply_remove_item(row, item)?;
2163 }
2164 }
2165
2166 Ok(input_rows)
2167 }
2168
2169 fn apply_set_item(&mut self, row: &Row, item: &ResolvedSetItem) -> ExecResult<()> {
2170 match item {
2171 ResolvedSetItem::SetProperty { target, value } => {
2172 let new_value = {
2173 let eval_ctx = EvalContext {
2174 storage: &*self.ctx.storage,
2175 params: &self.ctx.params,
2176 };
2177 eval_expr(value, row, &eval_ctx)
2178 };
2179
2180 self.set_property_from_expr(row, target, new_value)
2181 }
2182
2183 ResolvedSetItem::SetVariable { variable, value } => {
2184 let entity_ref =
2186 row.get(*variable)
2187 .ok_or(ExecutorError::UnboundVariableForSet {
2188 var: format!("{variable:?}"),
2189 })?;
2190 let entity_target = entity_target_from_value(entity_ref)?;
2191
2192 let new_value = {
2193 let eval_ctx = EvalContext {
2194 storage: &*self.ctx.storage,
2195 params: &self.ctx.params,
2196 };
2197 eval_expr(value, row, &eval_ctx)
2198 };
2199
2200 self.overwrite_entity_target(entity_target, new_value)
2201 }
2202
2203 ResolvedSetItem::MutateVariable { variable, value } => {
2204 let entity_ref =
2205 row.get(*variable)
2206 .ok_or(ExecutorError::UnboundVariableForSet {
2207 var: format!("{variable:?}"),
2208 })?;
2209 let entity_target = entity_target_from_value(entity_ref)?;
2210
2211 let patch = {
2212 let eval_ctx = EvalContext {
2213 storage: &*self.ctx.storage,
2214 params: &self.ctx.params,
2215 };
2216 eval_expr(value, row, &eval_ctx)
2217 };
2218
2219 self.mutate_entity_target(entity_target, patch)
2220 }
2221
2222 ResolvedSetItem::SetLabels { variable, labels } => match row.get(*variable) {
2223 Some(LoraValue::Node(node_id)) => {
2224 let node_id = *node_id;
2225 for label in labels {
2226 self.ctx.storage.add_node_label(node_id, label);
2227 }
2228 Ok(())
2229 }
2230 Some(other) => Err(ExecutorError::ExpectedNodeForSetLabels {
2231 found: value_kind(other),
2232 }),
2233 None => Err(ExecutorError::UnboundVariableForSet {
2234 var: format!("{variable:?}"),
2235 }),
2236 },
2237 }
2238 }
2239
2240 fn set_property_from_expr(
2241 &mut self,
2242 row: &Row,
2243 target_expr: &ResolvedExpr,
2244 new_value: LoraValue,
2245 ) -> ExecResult<()> {
2246 let ResolvedExpr::Property { expr, property } = target_expr else {
2247 return Err(ExecutorError::UnsupportedSetTarget);
2248 };
2249
2250 let owner = {
2251 let eval_ctx = EvalContext {
2252 storage: &*self.ctx.storage,
2253 params: &self.ctx.params,
2254 };
2255 eval_expr(expr, row, &eval_ctx)
2256 };
2257
2258 match owner {
2259 LoraValue::Node(node_id) => {
2260 let prop = lora_value_to_property(new_value)
2261 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2262 self.ctx
2263 .storage
2264 .set_node_property(node_id, property.clone(), prop);
2265 Ok(())
2266 }
2267 LoraValue::Relationship(rel_id) => {
2268 let prop = lora_value_to_property(new_value)
2269 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2270 self.ctx
2271 .storage
2272 .set_relationship_property(rel_id, property.clone(), prop);
2273 Ok(())
2274 }
2275 other => Err(ExecutorError::InvalidSetTarget {
2276 found: value_kind(&other),
2277 }),
2278 }
2279 }
2280
2281 fn remove_property_from_expr(&mut self, row: &Row, expr: &ResolvedExpr) -> ExecResult<()> {
2282 let ResolvedExpr::Property {
2283 expr: owner_expr,
2284 property,
2285 } = expr
2286 else {
2287 return Err(ExecutorError::UnsupportedRemoveTarget);
2288 };
2289
2290 let owner = {
2291 let eval_ctx = EvalContext {
2292 storage: &*self.ctx.storage,
2293 params: &self.ctx.params,
2294 };
2295 eval_expr(owner_expr, row, &eval_ctx)
2296 };
2297
2298 match owner {
2299 LoraValue::Node(node_id) => {
2300 self.ctx.storage.remove_node_property(node_id, property);
2301 Ok(())
2302 }
2303 LoraValue::Relationship(rel_id) => {
2304 self.ctx
2305 .storage
2306 .remove_relationship_property(rel_id, property);
2307 Ok(())
2308 }
2309 other => Err(ExecutorError::InvalidRemoveTarget {
2310 found: value_kind(&other),
2311 }),
2312 }
2313 }
2314
2315 fn overwrite_entity_target(
2316 &mut self,
2317 target: EntityTarget,
2318 new_value: LoraValue,
2319 ) -> ExecResult<()> {
2320 let LoraValue::Map(map) = new_value else {
2321 return Err(ExecutorError::ExpectedPropertyMap {
2322 found: value_kind(&new_value),
2323 });
2324 };
2325
2326 let mut props: Properties = Properties::new();
2327 for (k, v) in map {
2328 let prop = lora_value_to_property(v)
2329 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2330 props.insert(k, prop);
2331 }
2332
2333 match target {
2334 EntityTarget::Node(node_id) => {
2335 self.ctx.storage.replace_node_properties(node_id, props);
2336 }
2337 EntityTarget::Relationship(rel_id) => {
2338 self.ctx
2339 .storage
2340 .replace_relationship_properties(rel_id, props);
2341 }
2342 }
2343 Ok(())
2344 }
2345
2346 fn mutate_entity_target(
2347 &mut self,
2348 target: EntityTarget,
2349 patch_value: LoraValue,
2350 ) -> ExecResult<()> {
2351 let LoraValue::Map(map) = patch_value else {
2352 return Err(ExecutorError::ExpectedPropertyMap {
2353 found: value_kind(&patch_value),
2354 });
2355 };
2356
2357 match target {
2358 EntityTarget::Node(node_id) => {
2359 for (k, v) in map {
2360 let prop = lora_value_to_property(v)
2361 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2362 self.ctx.storage.set_node_property(node_id, k, prop);
2363 }
2364 }
2365 EntityTarget::Relationship(rel_id) => {
2366 for (k, v) in map {
2367 let prop = lora_value_to_property(v)
2368 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2369 self.ctx.storage.set_relationship_property(rel_id, k, prop);
2370 }
2371 }
2372 }
2373 Ok(())
2374 }
2375
2376 pub(crate) fn apply_create_pattern(
2377 &mut self,
2378 row: &mut Row,
2379 pattern: &ResolvedPattern,
2380 ) -> ExecResult<()> {
2381 for part in &pattern.parts {
2382 self.apply_create_pattern_part(row, part)?;
2383 }
2384 Ok(())
2385 }
2386
2387 pub(crate) fn apply_write_op(&mut self, op: &PhysicalOp, row: &mut Row) -> ExecResult<()> {
2393 match op {
2394 PhysicalOp::Create(c) => self.apply_create_pattern(row, &c.pattern),
2395 PhysicalOp::Set(s) => {
2396 for item in &s.items {
2397 self.apply_set_item(row, item)?;
2398 }
2399 Ok(())
2400 }
2401 PhysicalOp::Delete(d) => {
2402 let detach = d.detach;
2403 for expr in &d.expressions {
2404 let value = {
2405 let eval_ctx = EvalContext {
2406 storage: &*self.ctx.storage,
2407 params: &self.ctx.params,
2408 };
2409 eval_expr(expr, row, &eval_ctx)
2410 };
2411 self.delete_value(value, detach)?;
2412 }
2413 Ok(())
2414 }
2415 PhysicalOp::Remove(r) => {
2416 for item in &r.items {
2417 self.apply_remove_item(row, item)?;
2418 }
2419 Ok(())
2420 }
2421 PhysicalOp::Merge(m) => {
2422 let already_bound = self.pattern_part_is_bound(row, &m.pattern_part);
2423 let matched = if already_bound {
2424 true
2425 } else {
2426 self.try_match_merge_pattern(row, &m.pattern_part)?
2427 };
2428 if !matched {
2429 self.apply_create_pattern_part(row, &m.pattern_part)?;
2430 }
2431 for action in &m.actions {
2432 if action.on_match == matched {
2433 for item in &action.set.items {
2434 self.apply_set_item(row, item)?;
2435 }
2436 }
2437 }
2438 Ok(())
2439 }
2440 other => Err(ExecutorError::RuntimeError(format!(
2441 "apply_write_op called on non-write op: {other:?}"
2442 ))),
2443 }
2444 }
2445
2446 fn apply_create_pattern_part(
2447 &mut self,
2448 row: &mut Row,
2449 part: &ResolvedPatternPart,
2450 ) -> ExecResult<()> {
2451 if part.binding.is_some() {
2452 trace!("create pattern part has path binding; path materialization not implemented");
2453 }
2454
2455 let _ = self.apply_create_pattern_element(row, &part.element)?;
2456 Ok(())
2457 }
2458
2459 fn apply_create_pattern_element(
2460 &mut self,
2461 row: &mut Row,
2462 element: &ResolvedPatternElement,
2463 ) -> ExecResult<Option<LoraValue>> {
2464 match element {
2465 ResolvedPatternElement::Node {
2466 var,
2467 labels,
2468 properties,
2469 } => {
2470 let node_id =
2471 self.materialize_node_pattern(row, *var, labels, properties.as_ref())?;
2472 Ok(Some(LoraValue::Node(node_id)))
2473 }
2474
2475 ResolvedPatternElement::NodeChain { head, chain } => {
2476 let mut current_node_id = self.materialize_node_pattern(
2477 row,
2478 head.var,
2479 &head.labels,
2480 head.properties.as_ref(),
2481 )?;
2482
2483 for link in chain {
2484 let next_node_id = self.materialize_node_pattern(
2485 row,
2486 link.node.var,
2487 &link.node.labels,
2488 link.node.properties.as_ref(),
2489 )?;
2490
2491 let _ = self.materialize_relationship_pattern(
2492 row,
2493 current_node_id,
2494 next_node_id,
2495 &link.rel,
2496 )?;
2497
2498 current_node_id = next_node_id;
2499 }
2500
2501 Ok(Some(LoraValue::Node(current_node_id)))
2502 }
2503
2504 ResolvedPatternElement::ShortestPath { .. } => {
2505 Ok(None)
2507 }
2508 }
2509 }
2510
2511 fn pattern_part_is_bound(&self, row: &Row, part: &ResolvedPatternPart) -> bool {
2512 match &part.element {
2513 ResolvedPatternElement::Node { var, .. } => var.and_then(|v| row.get(v)).is_some(),
2514
2515 ResolvedPatternElement::ShortestPath { .. } => false,
2516
2517 ResolvedPatternElement::NodeChain { head, chain } => {
2518 let head_ok = head.var.and_then(|v| row.get(v)).is_some();
2519
2520 let chain_ok = chain.iter().all(|link| {
2521 let node_ok = link.node.var.and_then(|v| row.get(v)).is_some();
2522 let rel_ok = match link.rel.var {
2526 Some(v) => row.get(v).is_some(),
2527 None => false,
2528 };
2529 node_ok && rel_ok
2530 });
2531
2532 head_ok && chain_ok
2533 }
2534 }
2535 }
2536
2537 fn materialize_node_pattern(
2538 &mut self,
2539 row: &mut Row,
2540 var: Option<lora_analyzer::symbols::VarId>,
2541 labels: &[Vec<String>],
2542 properties: Option<&ResolvedExpr>,
2543 ) -> ExecResult<u64> {
2544 if let Some(var_id) = var {
2545 if let Some(LoraValue::Node(id)) = row.get(var_id) {
2546 return Ok(*id);
2547 }
2548 }
2549
2550 let properties = match properties {
2551 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2552 None => Properties::new(),
2553 };
2554
2555 let flat_labels = flatten_label_groups(labels);
2556 debug!("creating node with labels={flat_labels:?}");
2557 let created = self.ctx.storage.create_node(flat_labels, properties);
2558
2559 if let Some(var_id) = var {
2560 row.insert(var_id, LoraValue::Node(created.id));
2561 }
2562
2563 Ok(created.id)
2564 }
2565
2566 fn materialize_relationship_pattern(
2567 &mut self,
2568 row: &mut Row,
2569 left_node_id: u64,
2570 right_node_id: u64,
2571 rel: &lora_analyzer::ResolvedRel,
2572 ) -> ExecResult<u64> {
2573 if let Some(var_id) = rel.var {
2574 if let Some(LoraValue::Relationship(id)) = row.get(var_id) {
2575 let id = *id;
2576 if let Some((src, dst)) = self.ctx.storage.relationship_endpoints(id) {
2577 let endpoints_match = match rel.direction {
2578 Direction::Right | Direction::Undirected => {
2579 src == left_node_id && dst == right_node_id
2580 }
2581 Direction::Left => src == right_node_id && dst == left_node_id,
2582 };
2583
2584 if endpoints_match {
2585 return Ok(id);
2586 }
2587 }
2588 }
2589 }
2590
2591 if rel.range.is_some() {
2592 return Err(ExecutorError::UnsupportedCreateRelationshipRange);
2593 }
2594
2595 let (src, dst) = match rel.direction {
2596 Direction::Right | Direction::Undirected => (left_node_id, right_node_id),
2597 Direction::Left => (right_node_id, left_node_id),
2598 };
2599
2600 let rel_type = rel
2601 .types
2602 .first()
2603 .ok_or(ExecutorError::MissingRelationshipType)?;
2604
2605 if rel_type.is_empty() {
2606 return Err(ExecutorError::MissingRelationshipType);
2607 }
2608
2609 let properties = match rel.properties.as_ref() {
2610 Some(expr) => eval_properties_expr(expr, row, &*self.ctx.storage, &self.ctx.params)?,
2611 None => Properties::new(),
2612 };
2613
2614 debug!("creating relationship: src={src}, dst={dst}, type={rel_type}");
2615
2616 let created = self
2617 .ctx
2618 .storage
2619 .create_relationship(src, dst, rel_type, properties)
2620 .ok_or_else(|| ExecutorError::RelationshipCreateFailed {
2621 src,
2622 dst,
2623 rel_type: rel_type.clone(),
2624 })?;
2625
2626 if let Some(var_id) = rel.var {
2627 row.insert(var_id, LoraValue::Relationship(created.id));
2628 }
2629
2630 Ok(created.id)
2631 }
2632}
2633
2634#[derive(Clone, Copy)]
2638enum EntityTarget {
2639 Node(NodeId),
2640 Relationship(u64),
2641}
2642
2643fn entity_target_from_value(value: &LoraValue) -> ExecResult<EntityTarget> {
2644 match value {
2645 LoraValue::Node(id) => Ok(EntityTarget::Node(*id)),
2646 LoraValue::Relationship(id) => Ok(EntityTarget::Relationship(*id)),
2647 other => Err(ExecutorError::InvalidSetTarget {
2648 found: value_kind(other),
2649 }),
2650 }
2651}
2652
2653pub(crate) fn dedup_rows_by_vars(rows: Vec<Row>) -> Vec<Row> {
2657 let mut seen: BTreeSet<Vec<GroupValueKey>> = BTreeSet::new();
2658 let mut out = Vec::new();
2659
2660 for row in rows {
2661 let key: Vec<GroupValueKey> = row
2662 .iter()
2663 .map(|(_, val)| GroupValueKey::from_value(val))
2664 .collect();
2665 if seen.insert(key) {
2666 out.push(row);
2667 }
2668 }
2669
2670 out
2671}
2672
2673pub(crate) fn dedup_rows(rows: Vec<Row>) -> Vec<Row> {
2677 let mut seen: BTreeSet<Vec<(String, GroupValueKey)>> = BTreeSet::new();
2678 let mut out = Vec::new();
2679
2680 for row in rows {
2681 let key: Vec<(String, GroupValueKey)> = row
2682 .iter_named()
2683 .map(|(_, name, val)| (name.into_owned(), GroupValueKey::from_value(val)))
2684 .collect();
2685 if seen.insert(key) {
2686 out.push(row);
2687 }
2688 }
2689
2690 out
2691}
2692
2693fn eval_properties_expr<S: GraphStorage>(
2694 expr: &ResolvedExpr,
2695 row: &Row,
2696 storage: &S,
2697 params: &std::collections::BTreeMap<String, LoraValue>,
2698) -> ExecResult<Properties> {
2699 let eval_ctx = EvalContext { storage, params };
2700
2701 match eval_expr(expr, row, &eval_ctx) {
2702 LoraValue::Map(map) => {
2703 let mut out = Properties::new();
2704 for (k, v) in map {
2705 let prop = lora_value_to_property(v)
2706 .map_err(|e| ExecutorError::RuntimeError(e.to_string()))?;
2707 out.insert(k, prop);
2708 }
2709 Ok(out)
2710 }
2711 other => Err(ExecutorError::ExpectedPropertyMap {
2712 found: value_kind(&other),
2713 }),
2714 }
2715}
2716
2717pub(crate) fn compute_aggregate_expr<S: GraphStorage>(
2718 expr: &ResolvedExpr,
2719 rows: &[Row],
2720 eval_ctx: &EvalContext<'_, S>,
2721) -> LoraValue {
2722 match expr {
2723 ResolvedExpr::Function {
2724 name,
2725 distinct,
2726 args,
2727 } => {
2728 let func = name.to_ascii_lowercase();
2729
2730 match func.as_str() {
2731 "count" => {
2732 if args.is_empty() {
2733 return LoraValue::Int(rows.len() as i64);
2734 }
2735
2736 let mut values = rows
2737 .iter()
2738 .map(|r| eval_expr(&args[0], r, eval_ctx))
2739 .filter(|v| !matches!(v, LoraValue::Null))
2740 .collect::<Vec<_>>();
2741
2742 if *distinct {
2743 values = dedup_values(values);
2744 }
2745
2746 LoraValue::Int(values.len() as i64)
2747 }
2748
2749 "collect" => {
2750 if args.is_empty() {
2751 return LoraValue::List(Vec::new());
2752 }
2753
2754 let mut values = rows
2755 .iter()
2756 .map(|r| eval_expr(&args[0], r, eval_ctx))
2757 .collect::<Vec<_>>();
2758
2759 if *distinct {
2760 values = dedup_values(values);
2761 }
2762
2763 LoraValue::List(values)
2764 }
2765
2766 "sum" => {
2767 if args.is_empty() {
2768 return LoraValue::Null;
2769 }
2770
2771 let mut values = rows
2772 .iter()
2773 .map(|r| eval_expr(&args[0], r, eval_ctx))
2774 .collect::<Vec<_>>();
2775
2776 if *distinct {
2777 values = dedup_values(values);
2778 }
2779
2780 let nums = values
2781 .into_iter()
2782 .filter_map(as_f64_lossy)
2783 .collect::<Vec<_>>();
2784
2785 if nums.is_empty() {
2786 LoraValue::Null
2787 } else if nums.iter().all(|n| n.fract() == 0.0) {
2788 LoraValue::Int(nums.iter().sum::<f64>() as i64)
2789 } else {
2790 LoraValue::Float(nums.iter().sum::<f64>())
2791 }
2792 }
2793
2794 "avg" => {
2795 if args.is_empty() {
2796 return LoraValue::Null;
2797 }
2798
2799 let mut values = rows
2800 .iter()
2801 .map(|r| eval_expr(&args[0], r, eval_ctx))
2802 .collect::<Vec<_>>();
2803
2804 if *distinct {
2805 values = dedup_values(values);
2806 }
2807
2808 let nums = values
2809 .into_iter()
2810 .filter_map(as_f64_lossy)
2811 .collect::<Vec<_>>();
2812
2813 if nums.is_empty() {
2814 LoraValue::Null
2815 } else {
2816 LoraValue::Float(nums.iter().sum::<f64>() / nums.len() as f64)
2817 }
2818 }
2819
2820 "min" => {
2821 if args.is_empty() {
2822 return LoraValue::Null;
2823 }
2824
2825 let mut values = rows
2826 .iter()
2827 .map(|r| eval_expr(&args[0], r, eval_ctx))
2828 .filter(|v| !matches!(v, LoraValue::Null))
2829 .collect::<Vec<_>>();
2830
2831 if *distinct {
2832 values = dedup_values(values);
2833 }
2834
2835 values
2836 .into_iter()
2837 .min_by(compare_values_total)
2838 .unwrap_or(LoraValue::Null)
2839 }
2840
2841 "max" => {
2842 if args.is_empty() {
2843 return LoraValue::Null;
2844 }
2845
2846 let mut values = rows
2847 .iter()
2848 .map(|r| eval_expr(&args[0], r, eval_ctx))
2849 .filter(|v| !matches!(v, LoraValue::Null))
2850 .collect::<Vec<_>>();
2851
2852 if *distinct {
2853 values = dedup_values(values);
2854 }
2855
2856 values
2857 .into_iter()
2858 .max_by(compare_values_total)
2859 .unwrap_or(LoraValue::Null)
2860 }
2861
2862 "stdev" | "stdevp" => {
2863 if args.is_empty() {
2864 return LoraValue::Null;
2865 }
2866
2867 let nums: Vec<f64> = rows
2868 .iter()
2869 .map(|r| eval_expr(&args[0], r, eval_ctx))
2870 .filter_map(as_f64_lossy)
2871 .collect();
2872
2873 let is_population = func == "stdevp";
2874
2875 if nums.is_empty() || (!is_population && nums.len() < 2) {
2876 return LoraValue::Float(0.0);
2877 }
2878
2879 let mean = nums.iter().sum::<f64>() / nums.len() as f64;
2880 let variance_sum: f64 = nums.iter().map(|x| (x - mean).powi(2)).sum();
2881 let denom = if is_population {
2882 nums.len() as f64
2883 } else {
2884 (nums.len() - 1) as f64
2885 };
2886 LoraValue::Float((variance_sum / denom).sqrt())
2887 }
2888
2889 "percentilecont" => {
2890 if args.len() < 2 {
2891 return LoraValue::Null;
2892 }
2893
2894 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2895 .as_f64()
2896 .unwrap_or(0.5);
2897 let mut nums: Vec<f64> = rows
2898 .iter()
2899 .map(|r| eval_expr(&args[0], r, eval_ctx))
2900 .filter_map(as_f64_lossy)
2901 .collect();
2902
2903 if nums.is_empty() {
2904 return LoraValue::Null;
2905 }
2906
2907 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2908
2909 let index = percentile * (nums.len() - 1) as f64;
2910 let lower = index.floor() as usize;
2911 let upper = index.ceil() as usize;
2912 let fraction = index - lower as f64;
2913
2914 if lower == upper || upper >= nums.len() {
2915 LoraValue::Float(nums[lower])
2916 } else {
2917 LoraValue::Float(nums[lower] * (1.0 - fraction) + nums[upper] * fraction)
2918 }
2919 }
2920
2921 "percentiledisc" => {
2922 if args.len() < 2 {
2923 return LoraValue::Null;
2924 }
2925
2926 let percentile = eval_expr(&args[1], &rows[0], eval_ctx)
2927 .as_f64()
2928 .unwrap_or(0.5);
2929 let mut nums: Vec<f64> = rows
2930 .iter()
2931 .map(|r| eval_expr(&args[0], r, eval_ctx))
2932 .filter_map(as_f64_lossy)
2933 .collect();
2934
2935 if nums.is_empty() {
2936 return LoraValue::Null;
2937 }
2938
2939 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
2940
2941 let index = (percentile * (nums.len() - 1) as f64).round() as usize;
2942 let index = index.min(nums.len() - 1);
2943 LoraValue::Float(nums[index])
2944 }
2945
2946 _ => rows
2947 .first()
2948 .map(|r| eval_expr(expr, r, eval_ctx))
2949 .unwrap_or(LoraValue::Null),
2950 }
2951 }
2952
2953 _ => rows
2954 .first()
2955 .map(|r| eval_expr(expr, r, eval_ctx))
2956 .unwrap_or(LoraValue::Null),
2957 }
2958}
2959
2960pub(crate) fn compare_sort_item<S: GraphStorage>(
2961 item: &ResolvedSortItem,
2962 a: &Row,
2963 b: &Row,
2964 eval_ctx: &EvalContext<'_, S>,
2965) -> Ordering {
2966 let av = eval_expr(&item.expr, a, eval_ctx);
2967 let bv = eval_expr(&item.expr, b, eval_ctx);
2968
2969 let ascending = matches!(item.direction, lora_ast::SortDirection::Asc);
2970 compare_values_for_sort(&av, &bv, ascending)
2971}
2972
2973fn dedup_values(values: Vec<LoraValue>) -> Vec<LoraValue> {
2974 let mut seen: BTreeSet<GroupValueKey> = BTreeSet::new();
2975 let mut out = Vec::new();
2976
2977 for value in values {
2978 let key = GroupValueKey::from_value(&value);
2979 if seen.insert(key) {
2980 out.push(value);
2981 }
2982 }
2983
2984 out
2985}
2986
2987fn as_f64_lossy(v: LoraValue) -> Option<f64> {
2988 match v {
2989 LoraValue::Int(i) => Some(i as f64),
2990 LoraValue::Float(f) => Some(f),
2991 _ => None,
2992 }
2993}
2994
2995fn compare_values_for_sort(a: &LoraValue, b: &LoraValue, ascending: bool) -> Ordering {
2996 let ord = match (a, b) {
2997 (LoraValue::Null, LoraValue::Null) => Ordering::Equal,
2998 (LoraValue::Null, _) => Ordering::Greater,
2999 (_, LoraValue::Null) => Ordering::Less,
3000 _ => compare_values_total(a, b),
3001 };
3002
3003 if ascending {
3004 ord
3005 } else {
3006 ord.reverse()
3007 }
3008}
3009
3010fn compare_values_total(a: &LoraValue, b: &LoraValue) -> Ordering {
3011 use LoraValue::*;
3012
3013 match (a, b) {
3014 (Bool(x), Bool(y)) => x.cmp(y),
3015 (Int(x), Int(y)) => x.cmp(y),
3016 (Float(x), Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
3017 (Int(x), Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal),
3018 (Float(x), Int(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal),
3019 (String(x), String(y)) => x.cmp(y),
3020 (Node(x), Node(y)) => x.cmp(y),
3021 (Relationship(x), Relationship(y)) => x.cmp(y),
3022 (Date(x), Date(y)) => x.cmp(y),
3023 (DateTime(x), DateTime(y)) => x.cmp(y),
3024 (Duration(x), Duration(y)) => x.cmp(y),
3025 (Vector(x), Vector(y)) => x.to_key_string().cmp(&y.to_key_string()),
3026 _ => type_rank(a)
3027 .cmp(&type_rank(b))
3028 .then_with(|| format!("{a:?}").cmp(&format!("{b:?}"))),
3029 }
3030}
3031
3032pub fn value_matches_property_value(expected: &LoraValue, actual: &PropertyValue) -> bool {
3033 match (expected, actual) {
3034 (LoraValue::Null, PropertyValue::Null) => true,
3035 (LoraValue::Bool(a), PropertyValue::Bool(b)) => a == b,
3036 (LoraValue::Int(a), PropertyValue::Int(b)) => a == b,
3037 (LoraValue::Float(a), PropertyValue::Float(b)) => a == b,
3038 (LoraValue::Int(a), PropertyValue::Float(b)) => (*a as f64) == *b,
3039 (LoraValue::Float(a), PropertyValue::Int(b)) => *a == (*b as f64),
3040 (LoraValue::String(a), PropertyValue::String(b)) => a == b,
3041
3042 (LoraValue::List(xs), PropertyValue::List(ys)) => {
3043 xs.len() == ys.len()
3044 && xs
3045 .iter()
3046 .zip(ys.iter())
3047 .all(|(x, y)| value_matches_property_value(x, y))
3048 }
3049
3050 (LoraValue::Map(xm), PropertyValue::Map(ym)) => xm.iter().all(|(k, xv)| {
3051 ym.get(k)
3052 .map(|yv| value_matches_property_value(xv, yv))
3053 .unwrap_or(false)
3054 }),
3055
3056 (LoraValue::Date(a), PropertyValue::Date(b)) => a == b,
3057 (LoraValue::DateTime(a), PropertyValue::DateTime(b)) => a == b,
3058 (LoraValue::LocalDateTime(a), PropertyValue::LocalDateTime(b)) => a == b,
3059 (LoraValue::Time(a), PropertyValue::Time(b)) => a == b,
3060 (LoraValue::LocalTime(a), PropertyValue::LocalTime(b)) => a == b,
3061 (LoraValue::Duration(a), PropertyValue::Duration(b)) => a == b,
3062 (LoraValue::Point(a), PropertyValue::Point(b)) => a == b,
3063 (LoraValue::Vector(a), PropertyValue::Vector(b)) => a == b,
3064
3065 _ => false,
3066 }
3067}
3068
3069pub(crate) fn node_matches_property_filter<S: GraphStorage>(
3070 storage: &S,
3071 node_id: NodeId,
3072 labels: &[Vec<String>],
3073 key: &str,
3074 expected: &LoraValue,
3075) -> bool {
3076 storage
3077 .with_node(node_id, |node| {
3078 node_matches_label_groups(&node.labels, labels)
3079 && node
3080 .properties
3081 .get(key)
3082 .map(|actual| value_matches_property_value(expected, actual))
3083 .unwrap_or(false)
3084 })
3085 .unwrap_or(false)
3086}
3087
3088fn single_label_hint(labels: &[Vec<String>]) -> Option<&str> {
3089 if labels.len() == 1 && labels[0].len() == 1 {
3090 Some(labels[0][0].as_str())
3091 } else {
3092 None
3093 }
3094}
3095
3096fn property_lookup_values(expected: &LoraValue) -> Option<Vec<PropertyValue>> {
3097 let property = lora_value_to_property(expected.clone()).ok()?;
3098 let mut values = vec![property.clone()];
3099
3100 match property {
3101 PropertyValue::Int(i) => {
3102 values.push(PropertyValue::Float(i as f64));
3103 }
3104 PropertyValue::Float(f)
3105 if f.is_finite()
3106 && f.fract() == 0.0
3107 && f >= i64::MIN as f64
3108 && f <= i64::MAX as f64 =>
3109 {
3110 values.push(PropertyValue::Int(f as i64));
3111 }
3112 _ => {}
3113 }
3114
3115 Some(values)
3116}
3117
3118pub(crate) struct NodePropertyCandidates {
3119 pub(crate) ids: Vec<NodeId>,
3120 pub(crate) prefiltered: bool,
3121}
3122
3123pub(crate) fn indexed_node_property_candidates<S: GraphStorage>(
3124 storage: &S,
3125 labels: &[Vec<String>],
3126 key: &str,
3127 expected: &LoraValue,
3128) -> NodePropertyCandidates {
3129 let Some(values) = property_lookup_values(expected) else {
3130 return NodePropertyCandidates {
3131 ids: scan_node_ids_for_label_groups(storage, labels),
3132 prefiltered: false,
3133 };
3134 };
3135
3136 let label_hint = single_label_hint(labels);
3137 let mut seen = BTreeSet::new();
3138 let mut out = Vec::new();
3139 for value in values {
3140 for id in storage.find_node_ids_by_property(label_hint, key, &value) {
3141 if seen.insert(id) {
3142 out.push(id);
3143 }
3144 }
3145 }
3146 NodePropertyCandidates {
3147 ids: out,
3148 prefiltered: labels.is_empty() || label_hint.is_some(),
3149 }
3150}
3151
3152pub(crate) fn build_path_value<S: GraphStorage>(
3158 row: &Row,
3159 node_vars: &[VarId],
3160 rel_vars: &[VarId],
3161 storage: &S,
3162) -> LoraValue {
3163 let mut raw_nodes = Vec::new();
3164 let mut rels = Vec::new();
3165 let mut has_var_len = false;
3166
3167 for &nv in node_vars {
3168 match row.get(nv) {
3169 Some(LoraValue::Node(id)) => raw_nodes.push(*id),
3170 Some(LoraValue::List(items)) => {
3171 for item in items {
3172 if let LoraValue::Node(id) = item {
3173 raw_nodes.push(*id);
3174 }
3175 }
3176 }
3177 _ => {}
3178 }
3179 }
3180
3181 for &rv in rel_vars {
3182 match row.get(rv) {
3183 Some(LoraValue::Relationship(id)) => rels.push(*id),
3184 Some(LoraValue::List(items)) => {
3185 has_var_len = true;
3186 for item in items {
3187 if let LoraValue::Relationship(id) = item {
3188 rels.push(*id);
3189 }
3190 }
3191 }
3192 _ => {}
3193 }
3194 }
3195
3196 let nodes = if has_var_len && !rels.is_empty() && raw_nodes.len() == 2 {
3200 let start = raw_nodes[0];
3201 let mut ordered = Vec::with_capacity(rels.len() + 1);
3202 ordered.push(start);
3203 let mut current = start;
3204 for &rel_id in &rels {
3205 if let Some((src, dst)) = storage.relationship_endpoints(rel_id) {
3206 let next = if src == current { dst } else { src };
3207 ordered.push(next);
3208 current = next;
3209 }
3210 }
3211 ordered
3212 } else {
3213 raw_nodes
3214 };
3215
3216 LoraValue::Path(LoraPath { nodes, rels })
3217}
3218
3219fn type_rank(v: &LoraValue) -> u8 {
3220 match v {
3221 LoraValue::Null => 0,
3222 LoraValue::Bool(_) => 1,
3223 LoraValue::Int(_) | LoraValue::Float(_) => 2,
3224 LoraValue::String(_) => 3,
3225 LoraValue::Date(_) => 4,
3226 LoraValue::DateTime(_) => 5,
3227 LoraValue::LocalDateTime(_) => 6,
3228 LoraValue::Time(_) => 7,
3229 LoraValue::LocalTime(_) => 8,
3230 LoraValue::Duration(_) => 9,
3231 LoraValue::Point(_) => 10,
3232 LoraValue::Vector(_) => 11,
3233 LoraValue::List(_) => 12,
3234 LoraValue::Map(_) => 13,
3235 LoraValue::Node(_) => 14,
3236 LoraValue::Relationship(_) => 15,
3237 LoraValue::Path(_) => 16,
3238 }
3239}
3240
3241pub(crate) fn node_matches_label_groups(node_labels: &[String], groups: &[Vec<String>]) -> bool {
3245 groups
3246 .iter()
3247 .all(|group| group.iter().any(|l| node_labels.iter().any(|nl| nl == l)))
3248}
3249
3250pub(crate) fn scan_node_ids_for_label_groups<S: GraphStorage>(
3253 storage: &S,
3254 groups: &[Vec<String>],
3255) -> Vec<lora_store::NodeId> {
3256 if groups.is_empty() {
3257 storage.all_node_ids()
3258 } else if groups.len() == 1 && groups[0].len() == 1 {
3259 storage.node_ids_by_label(&groups[0][0])
3260 } else if groups.len() == 1 && groups[0].len() > 1 {
3261 let mut seen = std::collections::BTreeSet::new();
3262 let mut out = Vec::new();
3263 for label in &groups[0] {
3264 for id in storage.node_ids_by_label(label) {
3265 if seen.insert(id) {
3266 out.push(id);
3267 }
3268 }
3269 }
3270 out
3271 } else {
3272 storage.node_ids_by_label(&groups[0][0])
3273 }
3274}
3275
3276pub(crate) fn label_group_candidates_prefiltered(groups: &[Vec<String>]) -> bool {
3277 groups.len() <= 1
3278}
3279
3280pub(crate) fn hydrate_node_record(node: &lora_store::NodeRecord) -> LoraValue {
3281 let mut map = BTreeMap::new();
3282 map.insert("kind".to_string(), LoraValue::String("node".to_string()));
3283 map.insert("id".to_string(), LoraValue::Int(node.id as i64));
3284 map.insert(
3285 "labels".to_string(),
3286 LoraValue::List(
3287 node.labels
3288 .iter()
3289 .map(|s| LoraValue::String(s.clone()))
3290 .collect(),
3291 ),
3292 );
3293 map.insert(
3294 "properties".to_string(),
3295 properties_to_value_map(&node.properties),
3296 );
3297 LoraValue::Map(map)
3298}
3299
3300pub(crate) fn hydrate_relationship_record(rel: &lora_store::RelationshipRecord) -> LoraValue {
3301 let mut map = BTreeMap::new();
3302 map.insert(
3303 "kind".to_string(),
3304 LoraValue::String("relationship".to_string()),
3305 );
3306 map.insert("id".to_string(), LoraValue::Int(rel.id as i64));
3307 map.insert("startId".to_string(), LoraValue::Int(rel.src as i64));
3308 map.insert("endId".to_string(), LoraValue::Int(rel.dst as i64));
3309 map.insert("type".to_string(), LoraValue::String(rel.rel_type.clone()));
3310 map.insert(
3311 "properties".to_string(),
3312 properties_to_value_map(&rel.properties),
3313 );
3314 LoraValue::Map(map)
3315}
3316
3317fn flatten_label_groups(groups: &[Vec<String>]) -> Vec<String> {
3320 groups.iter().flat_map(|g| g.iter().cloned()).collect()
3321}
3322
3323#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3324pub(crate) enum GroupValueKey {
3325 Null,
3326 Bool(bool),
3327 Int(i64),
3328 Float(String),
3329 String(String),
3330 List(Vec<GroupValueKey>),
3331 Map(Vec<(String, GroupValueKey)>),
3332 Node(u64),
3333 Relationship(u64),
3334}
3335
3336impl GroupValueKey {
3337 pub(crate) fn from_value(v: &LoraValue) -> Self {
3338 match v {
3339 LoraValue::Null => Self::Null,
3340 LoraValue::Bool(x) => Self::Bool(*x),
3341 LoraValue::Int(x) => Self::Int(*x),
3342 LoraValue::Float(x) => Self::Float(x.to_string()),
3343 LoraValue::String(x) => Self::String(x.clone()),
3344 LoraValue::List(xs) => Self::List(xs.iter().map(Self::from_value).collect()),
3345 LoraValue::Map(m) => Self::Map(
3346 m.iter()
3347 .map(|(k, v)| (k.clone(), Self::from_value(v)))
3348 .collect(),
3349 ),
3350 LoraValue::Node(id) => Self::Node(*id),
3351 LoraValue::Relationship(id) => Self::Relationship(*id),
3352 LoraValue::Path(_) => Self::Null,
3353 LoraValue::Date(d) => Self::String(d.to_string()),
3355 LoraValue::DateTime(dt) => Self::String(dt.to_string()),
3356 LoraValue::LocalDateTime(dt) => Self::String(dt.to_string()),
3357 LoraValue::Time(t) => Self::String(t.to_string()),
3358 LoraValue::LocalTime(t) => Self::String(t.to_string()),
3359 LoraValue::Duration(dur) => Self::String(dur.to_string()),
3360 LoraValue::Point(p) => Self::String(p.to_string()),
3361 LoraValue::Vector(v) => Self::String(format!("vector:{}", v.to_key_string())),
3362 }
3363 }
3364}
3365
3366const MAX_VAR_LEN_HOPS: u64 = 100;
3378
3379pub(crate) fn resolve_range(range: &RangeLiteral) -> (u64, u64) {
3380 let min_hops = range.start.unwrap_or(1);
3381 let max_hops = range.end.unwrap_or(MAX_VAR_LEN_HOPS);
3382 (min_hops, max_hops)
3383}
3384
3385pub(crate) struct VarLenResult {
3387 pub(crate) dst_node_id: NodeId,
3389 pub(crate) rel_ids: Vec<u64>,
3391}
3392
3393pub(crate) fn variable_length_expand<S: GraphStorage>(
3400 storage: &S,
3401 start_node_id: NodeId,
3402 direction: Direction,
3403 types: &[String],
3404 min_hops: u64,
3405 max_hops: u64,
3406) -> Vec<VarLenResult> {
3407 let mut results = Vec::new();
3408
3409 let mut frontier: Vec<(NodeId, Vec<u64>)> = vec![(start_node_id, Vec::new())];
3411
3412 for depth in 1..=max_hops {
3413 let is_last_hop = depth == max_hops;
3417 let mut next_frontier: Vec<(NodeId, Vec<u64>)> = Vec::new();
3418
3419 for (current_node, rels_used) in &frontier {
3420 for (rel_id, neighbor_id) in storage.expand_ids(*current_node, direction, types) {
3423 if rels_used.contains(&rel_id) {
3426 continue;
3427 }
3428
3429 if is_last_hop {
3430 if depth >= min_hops {
3433 let mut rel_ids = Vec::with_capacity(rels_used.len() + 1);
3434 rel_ids.extend_from_slice(rels_used);
3435 rel_ids.push(rel_id);
3436 results.push(VarLenResult {
3437 dst_node_id: neighbor_id,
3438 rel_ids,
3439 });
3440 }
3441 continue;
3442 }
3443
3444 let mut new_rels = Vec::with_capacity(rels_used.len() + 1);
3445 new_rels.extend_from_slice(rels_used);
3446 new_rels.push(rel_id);
3447
3448 if depth >= min_hops {
3449 results.push(VarLenResult {
3450 dst_node_id: neighbor_id,
3451 rel_ids: new_rels.clone(),
3452 });
3453 }
3454
3455 next_frontier.push((neighbor_id, new_rels));
3456 }
3457 }
3458
3459 if is_last_hop || next_frontier.is_empty() {
3460 break;
3461 }
3462
3463 frontier = next_frontier;
3464 }
3465
3466 if min_hops == 0 {
3468 results.insert(
3469 0,
3470 VarLenResult {
3471 dst_node_id: start_node_id,
3472 rel_ids: Vec::new(),
3473 },
3474 );
3475 }
3476
3477 results
3478}
3479
3480pub(crate) fn filter_shortest_paths(rows: Vec<Row>, path_var: VarId, all: bool) -> Vec<Row> {
3483 if rows.is_empty() {
3484 return rows;
3485 }
3486
3487 let lengths: Vec<usize> = rows
3489 .iter()
3490 .map(|row| match row.get(path_var) {
3491 Some(LoraValue::Path(p)) => p.rels.len(),
3492 _ => usize::MAX,
3493 })
3494 .collect();
3495
3496 let min_len = lengths.iter().copied().min().unwrap_or(usize::MAX);
3497
3498 let mut result: Vec<Row> = rows
3499 .into_iter()
3500 .zip(lengths.iter())
3501 .filter(|(_, len)| **len == min_len)
3502 .map(|(row, _)| row)
3503 .collect();
3504
3505 if !all && result.len() > 1 {
3506 result.truncate(1);
3507 }
3508
3509 result
3510}