1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, CreateEdgeSpec,
12 CreateNodeSpec, Direction, Expr, Literal, LogicalPlan, RemoveSpec, ReturnItem, SetAssignment,
13 SortItem, UnaryOp, YieldSpec,
14};
15use meshdb_storage::RocksDbStorageEngine;
16use std::cell::RefCell;
17use std::cmp::Ordering;
18use std::collections::{HashMap, HashSet};
19
20#[derive(Default)]
26pub struct Tombstones {
27 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
28 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
29}
30
31pub struct ExecCtx<'a> {
32 pub store: &'a dyn GraphReader,
33 pub writer: &'a dyn GraphWriter,
34 pub params: &'a ParamMap,
39 pub procedures: &'a ProcedureRegistry,
44 pub outer_rows: &'a [&'a Row],
52 pub tombstones: &'a Tombstones,
56}
57
58pub(crate) struct NoOpWriter;
59impl GraphWriter for NoOpWriter {
60 fn put_node(&self, _: &Node) -> Result<()> {
61 Ok(())
62 }
63 fn put_edge(&self, _: &Edge) -> Result<()> {
64 Ok(())
65 }
66 fn delete_edge(&self, _: EdgeId) -> Result<()> {
67 Ok(())
68 }
69 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
70 Ok(())
71 }
72}
73
74impl<'a> ExecCtx<'a> {
75 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
81 where
82 'a: 'b,
83 {
84 EvalCtx {
85 row,
86 params: self.params,
87 reader: self.store,
88 procedures: self.procedures,
89 outer_rows: self.outer_rows,
90 tombstones: self.tombstones,
91 }
92 }
93
94 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
100 if let Some(v) = row.get(name) {
101 return Some(v);
102 }
103 for outer in self.outer_rows {
104 if let Some(v) = outer.get(name) {
105 return Some(v);
106 }
107 }
108 None
109 }
110}
111
112pub trait Operator {
113 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
114}
115
116pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
120 let params = ParamMap::new();
121 execute_with_reader(
122 plan,
123 store as &dyn GraphReader,
124 store as &dyn GraphWriter,
125 ¶ms,
126 )
127}
128
129pub fn execute_with_writer(
134 plan: &LogicalPlan,
135 store: &RocksDbStorageEngine,
136 writer: &dyn GraphWriter,
137) -> Result<Vec<Row>> {
138 let params = ParamMap::new();
139 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
140}
141
142pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
147 let text = meshdb_cypher::format_plan(plan);
148 let mut row = Row::new();
149 row.insert("plan".to_string(), Value::Property(Property::String(text)));
150 vec![row]
151}
152
153pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
154 let result_rows = execute(plan, store)?;
155 let row_count = result_rows.len() as i64;
156 let plan_text = meshdb_cypher::format_plan(plan);
157 let summary = format!("{plan_text}\nRows: {row_count}");
158 let mut row = Row::new();
159 row.insert(
160 "profile".to_string(),
161 Value::Property(Property::String(summary)),
162 );
163 row.insert(
164 "rows".to_string(),
165 Value::Property(Property::Int64(row_count)),
166 );
167 Ok(vec![row])
168}
169
170pub fn execute_with_reader(
171 plan: &LogicalPlan,
172 reader: &dyn GraphReader,
173 writer: &dyn GraphWriter,
174 params: &ParamMap,
175) -> Result<Vec<Row>> {
176 let empty_procs = ProcedureRegistry::new();
177 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
178}
179
180pub fn execute_with_reader_and_procs(
186 plan: &LogicalPlan,
187 reader: &dyn GraphReader,
188 writer: &dyn GraphWriter,
189 params: &ParamMap,
190 procedures: &ProcedureRegistry,
191) -> Result<Vec<Row>> {
192 crate::eval::reset_statement_time();
197 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
202 return Ok(rows);
203 }
204 let suppress_output = is_write_only_plan(plan);
205 let mut op = build_op(plan);
206 let tombstones = Tombstones::default();
207 let ctx = ExecCtx {
208 store: reader,
209 writer,
210 params,
211 procedures,
212 outer_rows: &[],
213 tombstones: &tombstones,
214 };
215 let mut rows = Vec::new();
216 while let Some(row) = op.next(&ctx)? {
217 rows.push(row);
218 }
219 if suppress_output {
220 Ok(Vec::new())
221 } else {
222 Ok(rows)
223 }
224}
225
226fn is_write_only_plan(plan: &LogicalPlan) -> bool {
227 match plan {
231 LogicalPlan::CreatePath { .. }
232 | LogicalPlan::Delete { .. }
233 | LogicalPlan::SetProperty { .. }
234 | LogicalPlan::Remove { .. }
235 | LogicalPlan::Foreach { .. }
236 | LogicalPlan::MergeNode { .. }
237 | LogicalPlan::MergeEdge { .. } => true,
238 _ => false,
239 }
240}
241
242fn try_execute_ddl(
252 plan: &LogicalPlan,
253 reader: &dyn GraphReader,
254 writer: &dyn GraphWriter,
255) -> Result<Option<Vec<Row>>> {
256 match plan {
257 LogicalPlan::CreatePropertyIndex { label, property } => {
258 writer.create_property_index(label, property)?;
259 Ok(Some(vec![ddl_ack_row("created", label, property)]))
260 }
261 LogicalPlan::DropPropertyIndex { label, property } => {
262 writer.drop_property_index(label, property)?;
263 Ok(Some(vec![ddl_ack_row("dropped", label, property)]))
264 }
265 LogicalPlan::ShowPropertyIndexes => {
266 let specs = reader.list_property_indexes()?;
270 let rows = specs
271 .into_iter()
272 .map(|(label, property)| {
273 let mut row = Row::default();
274 row.insert("label".into(), Value::Property(Property::String(label)));
275 row.insert(
276 "property".into(),
277 Value::Property(Property::String(property)),
278 );
279 row.insert(
280 "state".into(),
281 Value::Property(Property::String("online".into())),
282 );
283 row
284 })
285 .collect();
286 Ok(Some(rows))
287 }
288 _ => Ok(None),
289 }
290}
291
292fn ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
293 let mut row = Row::default();
294 row.insert(
295 "state".into(),
296 Value::Property(Property::String(state.into())),
297 );
298 row.insert(
299 "label".into(),
300 Value::Property(Property::String(label.into())),
301 );
302 row.insert(
303 "property".into(),
304 Value::Property(Property::String(property.into())),
305 );
306 row
307}
308
309fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
310 build_op_inner(plan, None)
311}
312
313pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
314 macro_rules! child {
315 ($p:expr) => {
316 build_op_inner($p, seed)
317 };
318 }
319 match plan {
320 LogicalPlan::CreatePath {
321 input,
322 nodes,
323 edges,
324 } => Box::new(CreatePathOp::new(
325 input.as_ref().map(|p| child!(p)),
326 nodes.clone(),
327 edges.clone(),
328 )),
329 LogicalPlan::CartesianProduct { left, right } => {
330 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
331 }
332 LogicalPlan::Delete {
333 input,
334 detach,
335 vars,
336 exprs,
337 } => Box::new(DeleteOp::new(
338 child!(input),
339 *detach,
340 vars.clone(),
341 exprs.clone(),
342 )),
343 LogicalPlan::SetProperty { input, assignments } => {
344 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
345 }
346 LogicalPlan::Remove { input, items } => {
347 Box::new(RemoveOp::new(child!(input), items.clone()))
348 }
349 LogicalPlan::LoadCsv {
350 input,
351 path_expr,
352 var,
353 with_headers,
354 } => Box::new(LoadCsvOp::new(
355 input.as_ref().map(|p| child!(p)),
356 path_expr.clone(),
357 var.clone(),
358 *with_headers,
359 )),
360 LogicalPlan::Foreach {
361 input,
362 var,
363 list_expr,
364 set_assignments,
365 remove_items,
366 } => Box::new(ForeachOp::new(
367 child!(input),
368 var.clone(),
369 list_expr.clone(),
370 set_assignments.clone(),
371 remove_items.clone(),
372 )),
373 LogicalPlan::CallSubquery { input, body } => {
374 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
375 }
376 LogicalPlan::OptionalApply {
377 input,
378 body,
379 null_vars,
380 } => Box::new(OptionalApplyOp::new(
381 child!(input),
382 (**body).clone(),
383 null_vars.clone(),
384 )),
385 LogicalPlan::ProcedureCall {
386 input,
387 qualified_name,
388 args,
389 yield_spec,
390 standalone,
391 } => Box::new(ProcedureCallOp::new(
392 input.as_ref().map(|p| child!(p)),
393 qualified_name.clone(),
394 args.clone(),
395 yield_spec.clone(),
396 *standalone,
397 )),
398 LogicalPlan::SeedRow => match seed {
399 Some(r) => Box::new(SeededRowOp {
400 row: Some(r.clone()),
401 }),
402 None => Box::new(SeedRowOp { done: false }),
403 },
404 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
405 LogicalPlan::NodeScanByLabels { var, labels } => {
406 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
407 }
408 LogicalPlan::EdgeExpand {
409 input,
410 src_var,
411 edge_var,
412 dst_var,
413 dst_labels,
414 edge_properties,
415 edge_types,
416 direction,
417 edge_constraint_var,
418 } => Box::new(EdgeExpandOp::new(
419 child!(input),
420 src_var.clone(),
421 edge_var.clone(),
422 dst_var.clone(),
423 dst_labels.clone(),
424 edge_properties.clone(),
425 edge_types.clone(),
426 *direction,
427 edge_constraint_var.clone(),
428 )),
429 LogicalPlan::OptionalEdgeExpand {
430 input,
431 src_var,
432 edge_var,
433 dst_var,
434 dst_labels,
435 dst_properties,
436 edge_types,
437 direction,
438 dst_constraint_var,
439 edge_constraint_var,
440 } => Box::new(OptionalEdgeExpandOp::new(
441 child!(input),
442 src_var.clone(),
443 edge_var.clone(),
444 dst_var.clone(),
445 dst_labels.clone(),
446 dst_properties.clone(),
447 edge_types.clone(),
448 *direction,
449 dst_constraint_var.clone(),
450 edge_constraint_var.clone(),
451 )),
452 LogicalPlan::VarLengthExpand {
453 input,
454 src_var,
455 edge_var,
456 dst_var,
457 dst_labels,
458 edge_types,
459 edge_properties,
460 direction,
461 min_hops,
462 max_hops,
463 path_var,
464 optional,
465 dst_constraint_var,
466 bound_edge_list_var,
467 excluded_edge_vars,
468 } => Box::new(VarLengthExpandOp::new(
469 child!(input),
470 src_var.clone(),
471 edge_var.clone(),
472 dst_var.clone(),
473 dst_labels.clone(),
474 edge_types.clone(),
475 edge_properties.clone(),
476 *direction,
477 *min_hops,
478 *max_hops,
479 path_var.clone(),
480 *optional,
481 dst_constraint_var.clone(),
482 bound_edge_list_var.clone(),
483 excluded_edge_vars.clone(),
484 )),
485 LogicalPlan::Filter { input, predicate } => {
486 Box::new(FilterOp::new(child!(input), predicate.clone()))
487 }
488 LogicalPlan::Project { input, items } => {
489 Box::new(ProjectOp::new(child!(input), items.clone()))
490 }
491 LogicalPlan::Aggregate {
492 input,
493 group_keys,
494 aggregates,
495 } => Box::new(AggregateOp::new(
496 child!(input),
497 group_keys.clone(),
498 aggregates.clone(),
499 )),
500 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
501 LogicalPlan::CoalesceNullRow { input, null_vars } => {
502 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
503 }
504 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
505 LogicalPlan::OrderBy { input, sort_items } => {
506 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
507 }
508 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
509 LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
510 LogicalPlan::MergeNode {
511 input,
512 var,
513 labels,
514 properties,
515 on_create,
516 on_match,
517 } => Box::new(MergeNodeOp::new(
518 input.as_ref().map(|p| child!(p)),
519 var.clone(),
520 labels.clone(),
521 properties.clone(),
522 on_create.clone(),
523 on_match.clone(),
524 )),
525 LogicalPlan::MergeEdge {
526 input,
527 edge_var,
528 src_var,
529 dst_var,
530 edge_type,
531 undirected,
532 properties,
533 on_create,
534 on_match,
535 } => Box::new(MergeEdgeOp::new(
536 child!(input),
537 edge_var.clone(),
538 src_var.clone(),
539 dst_var.clone(),
540 edge_type.clone(),
541 *undirected,
542 properties.clone(),
543 on_create.clone(),
544 on_match.clone(),
545 )),
546 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
547 LogicalPlan::UnwindChain { input, var, expr } => {
548 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
549 }
550 LogicalPlan::IndexSeek {
551 var,
552 label,
553 property,
554 value,
555 } => Box::new(IndexSeekOp::new(
556 var.clone(),
557 label.clone(),
558 property.clone(),
559 value.clone(),
560 )),
561 LogicalPlan::Union { branches, all } => {
566 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
567 Box::new(UnionOp::new(branch_ops, *all))
568 }
569 LogicalPlan::BindPath {
570 input,
571 path_var,
572 node_vars,
573 edge_vars,
574 } => Box::new(BindPathOp::new(
575 child!(input),
576 path_var.clone(),
577 node_vars.clone(),
578 edge_vars.clone(),
579 )),
580 LogicalPlan::ShortestPath {
581 input,
582 src_var,
583 dst_var,
584 path_var,
585 edge_types,
586 direction,
587 max_hops,
588 kind,
589 } => Box::new(ShortestPathOp::new(
590 child!(input),
591 src_var.clone(),
592 dst_var.clone(),
593 path_var.clone(),
594 edge_types.clone(),
595 *direction,
596 *max_hops,
597 *kind,
598 )),
599 LogicalPlan::CreatePropertyIndex { .. }
600 | LogicalPlan::DropPropertyIndex { .. }
601 | LogicalPlan::ShowPropertyIndexes => {
602 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
603 }
604 }
605}
606
607struct UnwindOp {
608 var: String,
609 expr: Expr,
610 items: Option<Vec<Value>>,
611 cursor: usize,
612}
613
614impl UnwindOp {
615 fn new(var: String, expr: Expr) -> Self {
616 Self {
617 var,
618 expr,
619 items: None,
620 cursor: 0,
621 }
622 }
623}
624
625impl Operator for UnwindOp {
626 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
627 if self.items.is_none() {
628 let empty = Row::new();
629 let ectx = EvalCtx {
630 row: &empty,
631 params: ctx.params,
632 reader: ctx.store,
633 procedures: ctx.procedures,
634 outer_rows: ctx.outer_rows,
635 tombstones: ctx.tombstones,
636 };
637 let val = eval_expr(&self.expr, &ectx)?;
638 self.items = Some(coerce_unwind_list(val)?);
639 }
640 let items = self.items.as_ref().unwrap();
641 if self.cursor < items.len() {
642 let v = items[self.cursor].clone();
643 self.cursor += 1;
644 let mut row = Row::new();
645 row.insert(self.var.clone(), v);
646 Ok(Some(row))
647 } else {
648 Ok(None)
649 }
650 }
651}
652
653struct UnwindChainOp {
659 input: Box<dyn Operator>,
660 var: String,
661 expr: Expr,
662 current_row: Option<Row>,
663 items: Vec<Value>,
664 cursor: usize,
665}
666
667impl UnwindChainOp {
668 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
669 Self {
670 input,
671 var,
672 expr,
673 current_row: None,
674 items: Vec::new(),
675 cursor: 0,
676 }
677 }
678}
679
680impl Operator for UnwindChainOp {
681 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
682 loop {
683 if let Some(base) = &self.current_row {
684 if self.cursor < self.items.len() {
685 let v = self.items[self.cursor].clone();
686 self.cursor += 1;
687 let mut row = base.clone();
688 row.insert(self.var.clone(), v);
689 return Ok(Some(row));
690 }
691 self.current_row = None;
692 self.items.clear();
693 self.cursor = 0;
694 }
695 let base = match self.input.next(ctx)? {
696 Some(r) => r,
697 None => return Ok(None),
698 };
699 let ectx = EvalCtx {
700 row: &base,
701 params: ctx.params,
702 reader: ctx.store,
703 procedures: ctx.procedures,
704 outer_rows: ctx.outer_rows,
705 tombstones: ctx.tombstones,
706 };
707 let val = eval_expr(&self.expr, &ectx)?;
708 self.items = coerce_unwind_list(val)?;
709 self.current_row = Some(base);
710 }
711 }
712}
713
714fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
719 match val {
720 Value::List(items) => Ok(items),
721 Value::Property(Property::List(props)) => {
722 Ok(props.into_iter().map(Value::Property).collect())
723 }
724 Value::Null => Ok(Vec::new()),
725 _ => Err(Error::TypeMismatch),
726 }
727}
728
729struct CreatePathOp {
730 input: Option<Box<dyn Operator>>,
731 nodes: Vec<CreateNodeSpec>,
732 edges: Vec<CreateEdgeSpec>,
733 done: bool,
734 buffered: Option<Vec<Row>>,
735 cursor: usize,
736}
737
738impl CreatePathOp {
739 fn new(
740 input: Option<Box<dyn Operator>>,
741 nodes: Vec<CreateNodeSpec>,
742 edges: Vec<CreateEdgeSpec>,
743 ) -> Self {
744 Self {
745 input,
746 nodes,
747 edges,
748 done: false,
749 buffered: None,
750 cursor: 0,
751 }
752 }
753
754 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
755 let mut out = row.clone();
756 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
757 for spec in &self.nodes {
758 match spec {
759 CreateNodeSpec::New {
760 var,
761 labels,
762 properties,
763 } => {
764 let mut node = Node::new();
765 for label in labels {
766 node.labels.push(label.clone());
767 }
768 for (k, expr) in properties {
776 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
777 let prop = value_to_property(value)?;
778 if matches!(prop, Property::Null) {
779 continue;
780 }
781 node.properties.insert(k.clone(), prop);
782 }
783 ctx.writer.put_node(&node)?;
784 node_ids.push(node.id);
785 if let Some(v) = var {
786 out.insert(v.clone(), Value::Node(node));
787 }
788 }
789 CreateNodeSpec::Reference(name) => {
790 let id = match out.get(name) {
791 Some(Value::Node(n)) => n.id,
792 _ => return Err(Error::UnboundVariable(name.clone())),
793 };
794 node_ids.push(id);
795 }
796 }
797 }
798 for spec in &self.edges {
799 let src = node_ids[spec.src_idx];
800 let dst = node_ids[spec.dst_idx];
801 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
802 for (k, expr) in &spec.properties {
803 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
804 let prop = value_to_property(value)?;
805 if matches!(prop, Property::Null) {
806 continue;
807 }
808 edge.properties.insert(k.clone(), prop);
809 }
810 ctx.writer.put_edge(&edge)?;
811 if let Some(v) = &spec.var {
812 out.insert(v.clone(), Value::Edge(edge));
813 }
814 }
815 Ok(out)
816 }
817}
818
819impl Operator for CreatePathOp {
820 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
821 if self.input.is_some() {
822 if let Some(buffered) = self.buffered.as_mut() {
826 if self.cursor < buffered.len() {
827 let row = buffered[self.cursor].clone();
828 self.cursor += 1;
829 return Ok(Some(self.apply(ctx, &row)?));
830 }
831 return Ok(None);
832 }
833 let mut rows: Vec<Row> = Vec::new();
834 {
835 let input = self.input.as_mut().unwrap();
836 while let Some(row) = input.next(ctx)? {
837 rows.push(row);
838 }
839 }
840 self.buffered = Some(rows);
841 self.cursor = 0;
842 self.next(ctx)
844 } else {
845 if self.done {
846 return Ok(None);
847 }
848 self.done = true;
849 let empty = Row::new();
850 Ok(Some(self.apply(ctx, &empty)?))
851 }
852 }
853}
854
855struct CartesianProductOp {
856 left: Box<dyn Operator>,
857 right_plan: LogicalPlan,
858 left_row: Option<Row>,
859 right_op: Option<Box<dyn Operator>>,
860}
861
862impl CartesianProductOp {
863 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
864 Self {
865 left,
866 right_plan,
867 left_row: None,
868 right_op: None,
869 }
870 }
871}
872
873impl Operator for CartesianProductOp {
874 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
875 loop {
876 if self.left_row.is_none() {
877 match self.left.next(ctx)? {
878 None => return Ok(None),
879 Some(row) => {
880 self.left_row = Some(row);
881 self.right_op = Some(build_op(&self.right_plan));
882 }
883 }
884 }
885 let right_op = self.right_op.as_mut().expect("right_op set");
886 let left_ref = self.left_row.as_ref().unwrap();
891 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
892 stacked.push(left_ref);
893 stacked.extend_from_slice(ctx.outer_rows);
894 let inner_ctx = ExecCtx {
895 store: ctx.store,
896 writer: ctx.writer,
897 params: ctx.params,
898 procedures: ctx.procedures,
899 outer_rows: &stacked,
900 tombstones: ctx.tombstones,
901 };
902 match right_op.next(&inner_ctx)? {
903 Some(right_row) => {
904 let mut combined = left_ref.clone();
905 for (k, v) in right_row {
906 combined.insert(k, v);
907 }
908 return Ok(Some(combined));
909 }
910 None => {
911 self.left_row = None;
912 self.right_op = None;
913 }
914 }
915 }
916 }
917}
918
919struct DeleteOp {
920 input: Box<dyn Operator>,
921 detach: bool,
922 #[allow(dead_code)]
923 vars: Vec<String>,
924 exprs: Vec<Expr>,
925 buffered: Option<Vec<Row>>,
933 cursor: usize,
934}
935
936impl DeleteOp {
937 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
938 Self {
939 input,
940 detach,
941 vars,
942 exprs,
943 buffered: None,
944 cursor: 0,
945 }
946 }
947
948 fn apply_deletes(
958 &self,
959 ctx: &ExecCtx,
960 row: &Row,
961 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
962 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
963 ) -> Result<()> {
964 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
965 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
966 for expr in &self.exprs {
967 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
968 match v {
969 Value::Node(n) => node_ids.push(n.id),
970 Value::Edge(e) => edge_ids.push(e.id),
971 Value::Path { nodes, edges } => {
972 for e in edges {
973 edge_ids.push(e.id);
974 }
975 for n in nodes {
976 node_ids.push(n.id);
977 }
978 }
979 Value::Null | Value::Property(Property::Null) => continue,
980 _ => return Err(Error::TypeMismatch),
981 }
982 }
983 for eid in &edge_ids {
984 if seen_edges.insert(*eid) {
985 ctx.writer.delete_edge(*eid)?;
986 ctx.tombstones.edges.borrow_mut().insert(*eid);
987 }
988 }
989 for nid in &node_ids {
990 if !seen_nodes.insert(*nid) {
991 continue;
992 }
993 if self.detach {
994 for (eid, _) in ctx.store.outgoing(*nid)? {
999 ctx.tombstones.edges.borrow_mut().insert(eid);
1000 }
1001 for (eid, _) in ctx.store.incoming(*nid)? {
1002 ctx.tombstones.edges.borrow_mut().insert(eid);
1003 }
1004 ctx.writer.detach_delete_node(*nid)?;
1005 } else {
1006 let out = ctx.store.outgoing(*nid)?;
1007 let inc = ctx.store.incoming(*nid)?;
1008 let still_attached = out
1009 .iter()
1010 .chain(inc.iter())
1011 .any(|(eid, _)| !seen_edges.contains(eid));
1012 if still_attached {
1013 return Err(Error::CannotDeleteAttachedNode);
1014 }
1015 ctx.writer.detach_delete_node(*nid)?;
1016 }
1017 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1018 }
1019 Ok(())
1020 }
1021}
1022
1023impl Operator for DeleteOp {
1024 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1025 if self.buffered.is_none() {
1033 let mut rows: Vec<Row> = Vec::new();
1034 while let Some(row) = self.input.next(ctx)? {
1035 rows.push(row);
1036 }
1037 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1038 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1039 for row in &rows {
1040 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1041 }
1042 self.buffered = Some(rows);
1043 self.cursor = 0;
1044 }
1045 let rows = self.buffered.as_ref().unwrap();
1046 if self.cursor < rows.len() {
1047 let row = rows[self.cursor].clone();
1048 self.cursor += 1;
1049 return Ok(Some(row));
1050 }
1051 Ok(None)
1052 }
1053}
1054
1055struct SetPropertyOp {
1056 input: Box<dyn Operator>,
1057 assignments: Vec<SetAssignment>,
1058}
1059
1060impl SetPropertyOp {
1061 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1062 Self { input, assignments }
1063 }
1064}
1065
1066impl Operator for SetPropertyOp {
1067 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1068 match self.input.next(ctx)? {
1069 None => Ok(None),
1070 Some(mut row) => {
1071 enum Action {
1073 SetKey {
1074 var: String,
1075 key: String,
1076 prop: Property,
1077 },
1078 AddLabels {
1079 var: String,
1080 labels: Vec<String>,
1081 },
1082 Replace {
1083 var: String,
1084 props: Vec<(String, Property)>,
1085 },
1086 Merge {
1087 var: String,
1088 props: Vec<(String, Property)>,
1089 },
1090 }
1091 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1092 for a in &self.assignments {
1093 match a {
1094 SetAssignment::Property { var, key, value } => {
1095 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1096 let prop = value_to_property(evaluated)?;
1097 actions.push(Action::SetKey {
1098 var: var.clone(),
1099 key: key.clone(),
1100 prop,
1101 });
1102 }
1103 SetAssignment::Labels { var, labels } => {
1104 actions.push(Action::AddLabels {
1105 var: var.clone(),
1106 labels: labels.clone(),
1107 });
1108 }
1109 SetAssignment::Replace { var, properties } => {
1110 let props = properties
1115 .iter()
1116 .map(|(k, expr)| {
1117 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1118 Ok((k.clone(), value_to_property(v)?))
1119 })
1120 .collect::<Result<Vec<(String, Property)>>>()?;
1121 actions.push(Action::Replace {
1122 var: var.clone(),
1123 props,
1124 });
1125 }
1126 SetAssignment::Merge { var, properties } => {
1127 let props = properties
1128 .iter()
1129 .map(|(k, expr)| {
1130 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1131 Ok((k.clone(), value_to_property(v)?))
1132 })
1133 .collect::<Result<Vec<(String, Property)>>>()?;
1134 actions.push(Action::Merge {
1135 var: var.clone(),
1136 props,
1137 });
1138 }
1139 SetAssignment::ReplaceFromExpr {
1140 var,
1141 source,
1142 replace,
1143 } => {
1144 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1145 let props = extract_property_map(&v)?;
1146 if *replace {
1147 actions.push(Action::Replace {
1148 var: var.clone(),
1149 props,
1150 });
1151 } else {
1152 actions.push(Action::Merge {
1153 var: var.clone(),
1154 props,
1155 });
1156 }
1157 }
1158 }
1159 }
1160
1161 let mut updated_nodes: HashSet<String> = HashSet::new();
1163 let mut updated_edges: HashSet<String> = HashSet::new();
1164 for action in actions {
1165 match action {
1166 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1167 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1171 continue
1172 }
1173 Some(Value::Node(n)) => {
1176 if matches!(prop, Property::Null) {
1177 n.properties.remove(&key);
1178 } else {
1179 n.properties.insert(key, prop);
1180 }
1181 updated_nodes.insert(var);
1182 }
1183 Some(Value::Edge(e)) => {
1184 if matches!(prop, Property::Null) {
1185 e.properties.remove(&key);
1186 } else {
1187 e.properties.insert(key, prop);
1188 }
1189 updated_edges.insert(var);
1190 }
1191 _ => return Err(Error::UnboundVariable(var)),
1192 },
1193 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1194 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1195 continue
1196 }
1197 Some(Value::Node(n)) => {
1198 for label in labels {
1199 if !n.labels.contains(&label) {
1200 n.labels.push(label);
1201 }
1202 }
1203 updated_nodes.insert(var);
1204 }
1205 _ => return Err(Error::UnboundVariable(var)),
1206 },
1207 Action::Replace { var, props } => match row.get_mut(&var) {
1208 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1209 continue
1210 }
1211 Some(Value::Node(n)) => {
1212 n.properties.clear();
1213 for (k, v) in props {
1214 if !matches!(v, Property::Null) {
1215 n.properties.insert(k, v);
1216 }
1217 }
1218 updated_nodes.insert(var);
1219 }
1220 Some(Value::Edge(e)) => {
1221 e.properties.clear();
1222 for (k, v) in props {
1223 if !matches!(v, Property::Null) {
1224 e.properties.insert(k, v);
1225 }
1226 }
1227 updated_edges.insert(var);
1228 }
1229 _ => return Err(Error::UnboundVariable(var)),
1230 },
1231 Action::Merge { var, props } => match row.get_mut(&var) {
1232 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1233 continue
1234 }
1235 Some(Value::Node(n)) => {
1236 for (k, v) in props {
1237 if matches!(v, Property::Null) {
1238 n.properties.remove(&k);
1239 } else {
1240 n.properties.insert(k, v);
1241 }
1242 }
1243 updated_nodes.insert(var);
1244 }
1245 Some(Value::Edge(e)) => {
1246 for (k, v) in props {
1247 if matches!(v, Property::Null) {
1248 e.properties.remove(&k);
1249 } else {
1250 e.properties.insert(k, v);
1251 }
1252 }
1253 updated_edges.insert(var);
1254 }
1255 _ => return Err(Error::UnboundVariable(var)),
1256 },
1257 }
1258 }
1259
1260 for var in &updated_nodes {
1262 if let Some(Value::Node(n)) = row.get(var) {
1263 ctx.writer.put_node(n)?;
1264 }
1265 }
1266 for var in &updated_edges {
1267 if let Some(Value::Edge(e)) = row.get(var) {
1268 ctx.writer.put_edge(e)?;
1269 }
1270 }
1271
1272 Ok(Some(row))
1273 }
1274 }
1275 }
1276}
1277
1278struct RemoveOp {
1279 input: Box<dyn Operator>,
1280 items: Vec<RemoveSpec>,
1281}
1282
1283impl RemoveOp {
1284 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1285 Self { input, items }
1286 }
1287}
1288
1289impl Operator for RemoveOp {
1290 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1291 match self.input.next(ctx)? {
1292 None => Ok(None),
1293 Some(mut row) => {
1294 let mut updated_nodes: HashSet<String> = HashSet::new();
1295 let mut updated_edges: HashSet<String> = HashSet::new();
1296 for item in &self.items {
1297 match item {
1298 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1299 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1302 continue
1303 }
1304 Some(Value::Node(n)) => {
1305 n.properties.remove(key);
1306 updated_nodes.insert(var.clone());
1307 }
1308 Some(Value::Edge(e)) => {
1309 e.properties.remove(key);
1310 updated_edges.insert(var.clone());
1311 }
1312 _ => return Err(Error::UnboundVariable(var.clone())),
1313 },
1314 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1315 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1316 continue
1317 }
1318 Some(Value::Node(n)) => {
1319 n.labels.retain(|l| !labels.contains(l));
1320 updated_nodes.insert(var.clone());
1321 }
1322 _ => return Err(Error::UnboundVariable(var.clone())),
1323 },
1324 }
1325 }
1326 for var in &updated_nodes {
1327 if let Some(Value::Node(n)) = row.get(var) {
1328 ctx.writer.put_node(n)?;
1329 }
1330 }
1331 for var in &updated_edges {
1332 if let Some(Value::Edge(e)) = row.get(var) {
1333 ctx.writer.put_edge(e)?;
1334 }
1335 }
1336 Ok(Some(row))
1337 }
1338 }
1339 }
1340}
1341
1342struct LoadCsvOp {
1343 input: Option<Box<dyn Operator>>,
1344 path_expr: Expr,
1345 var: String,
1346 with_headers: bool,
1347 rows: Option<Vec<Value>>,
1348 cursor: usize,
1349}
1350
1351impl LoadCsvOp {
1352 fn new(
1353 input: Option<Box<dyn Operator>>,
1354 path_expr: Expr,
1355 var: String,
1356 with_headers: bool,
1357 ) -> Self {
1358 Self {
1359 input,
1360 path_expr,
1361 var,
1362 with_headers,
1363 rows: None,
1364 cursor: 0,
1365 }
1366 }
1367
1368 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1369 let ectx = ctx.eval_ctx(base_row);
1370 let path_val = eval_expr(&self.path_expr, &ectx)?;
1371 let path = match path_val {
1372 Value::Property(Property::String(s)) => s,
1373 _ => return Err(Error::TypeMismatch),
1374 };
1375 let content = std::fs::read_to_string(&path).map_err(|e| {
1376 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1377 })?;
1378 let mut lines = content.lines();
1379 let headers: Option<Vec<String>> = if self.with_headers {
1380 lines
1381 .next()
1382 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1383 } else {
1384 None
1385 };
1386 let mut csv_rows = Vec::new();
1387 for line in lines {
1388 if line.trim().is_empty() {
1389 continue;
1390 }
1391 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1392 if let Some(hdrs) = &headers {
1393 let mut map = std::collections::HashMap::new();
1394 for (i, h) in hdrs.iter().enumerate() {
1395 let val = fields.get(i).cloned().unwrap_or_default();
1396 map.insert(h.clone(), Property::String(val));
1397 }
1398 csv_rows.push(Value::Property(Property::Map(map)));
1399 } else {
1400 let list: Vec<Value> = fields
1401 .into_iter()
1402 .map(|f| Value::Property(Property::String(f)))
1403 .collect();
1404 csv_rows.push(Value::List(list));
1405 }
1406 }
1407 self.rows = Some(csv_rows);
1408 self.cursor = 0;
1409 Ok(())
1410 }
1411}
1412
1413impl Operator for LoadCsvOp {
1414 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1415 if self.rows.is_none() {
1416 let base = if let Some(input) = &mut self.input {
1417 match input.next(ctx)? {
1418 Some(r) => r,
1419 None => return Ok(None),
1420 }
1421 } else {
1422 Row::new()
1423 };
1424 self.load(ctx, &base)?;
1425 }
1426 let rows = self.rows.as_ref().unwrap();
1427 if self.cursor < rows.len() {
1428 let val = rows[self.cursor].clone();
1429 self.cursor += 1;
1430 let mut row = Row::new();
1431 row.insert(self.var.clone(), val);
1432 Ok(Some(row))
1433 } else {
1434 Ok(None)
1435 }
1436 }
1437}
1438
1439struct ForeachOp {
1440 input: Box<dyn Operator>,
1441 var: String,
1442 list_expr: Expr,
1443 set_assignments: Vec<SetAssignment>,
1444 remove_items: Vec<RemoveSpec>,
1445}
1446
1447impl ForeachOp {
1448 fn new(
1449 input: Box<dyn Operator>,
1450 var: String,
1451 list_expr: Expr,
1452 set_assignments: Vec<SetAssignment>,
1453 remove_items: Vec<RemoveSpec>,
1454 ) -> Self {
1455 Self {
1456 input,
1457 var,
1458 list_expr,
1459 set_assignments,
1460 remove_items,
1461 }
1462 }
1463}
1464
1465impl Operator for ForeachOp {
1466 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1467 let Some(row) = self.input.next(ctx)? else {
1468 return Ok(None);
1469 };
1470 let ectx = ctx.eval_ctx(&row);
1471 let list_val = eval_expr(&self.list_expr, &ectx)?;
1472 let items = match list_val {
1473 Value::List(items) => items,
1474 Value::Property(Property::List(props)) => {
1475 props.into_iter().map(Value::Property).collect()
1476 }
1477 Value::Null | Value::Property(Property::Null) => Vec::new(),
1478 _ => return Err(Error::TypeMismatch),
1479 };
1480 for item in items {
1481 let mut scratch = row.clone();
1482 scratch.insert(self.var.clone(), item);
1483 for a in &self.set_assignments {
1484 match a {
1485 SetAssignment::Property { var, key, value } => {
1486 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1487 let prop = value_to_property(evaluated)?;
1488 match scratch.get_mut(var) {
1489 Some(Value::Node(n)) => {
1490 n.properties.insert(key.clone(), prop);
1491 }
1492 Some(Value::Edge(e)) => {
1493 e.properties.insert(key.clone(), prop);
1494 }
1495 _ => return Err(Error::UnboundVariable(var.clone())),
1496 }
1497 }
1498 SetAssignment::Labels { var, labels } => {
1499 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1500 for l in labels {
1501 if !n.labels.contains(l) {
1502 n.labels.push(l.clone());
1503 }
1504 }
1505 }
1506 }
1507 _ => {}
1508 }
1509 }
1510 for ri in &self.remove_items {
1511 match ri {
1512 RemoveSpec::Property { var, key } => {
1513 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1514 n.properties.remove(key);
1515 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1516 e.properties.remove(key);
1517 }
1518 }
1519 RemoveSpec::Labels { var, labels } => {
1520 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1521 n.labels.retain(|l| !labels.contains(l));
1522 }
1523 }
1524 }
1525 }
1526 for (_, val) in scratch.iter() {
1528 match val {
1529 Value::Node(n) => ctx.writer.put_node(n)?,
1530 Value::Edge(e) => ctx.writer.put_edge(e)?,
1531 _ => {}
1532 }
1533 }
1534 }
1535 Ok(Some(row))
1536 }
1537}
1538
1539struct SeedRowOp {
1540 done: bool,
1541}
1542
1543impl Operator for SeedRowOp {
1544 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1545 if self.done {
1546 return Ok(None);
1547 }
1548 self.done = true;
1549 Ok(Some(Row::new()))
1550 }
1551}
1552
1553struct SeededRowOp {
1554 row: Option<Row>,
1555}
1556
1557impl Operator for SeededRowOp {
1558 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1559 Ok(self.row.take())
1560 }
1561}
1562
1563struct CallSubqueryOp {
1564 input: Box<dyn Operator>,
1565 body_plan: LogicalPlan,
1566 pending: Vec<Row>,
1567 pending_idx: usize,
1568}
1569
1570impl CallSubqueryOp {
1571 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1572 Self {
1573 input,
1574 body_plan,
1575 pending: Vec::new(),
1576 pending_idx: 0,
1577 }
1578 }
1579}
1580
1581impl Operator for CallSubqueryOp {
1582 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1583 loop {
1584 if self.pending_idx < self.pending.len() {
1585 let row = self.pending[self.pending_idx].clone();
1586 self.pending_idx += 1;
1587 return Ok(Some(row));
1588 }
1589 let outer_row = match self.input.next(ctx)? {
1590 Some(r) => r,
1591 None => return Ok(None),
1592 };
1593 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1594 let mut results = Vec::new();
1595 while let Some(body_row) = body_op.next(ctx)? {
1596 let mut merged = outer_row.clone();
1597 for (k, v) in body_row {
1598 merged.insert(k, v);
1599 }
1600 results.push(merged);
1601 }
1602 if results.is_empty() {
1603 continue;
1604 }
1605 self.pending = results;
1606 self.pending_idx = 0;
1607 }
1608 }
1609}
1610
1611struct OptionalApplyOp {
1618 input: Box<dyn Operator>,
1619 body_plan: LogicalPlan,
1620 null_vars: Vec<String>,
1621 pending: Vec<Row>,
1622 pending_idx: usize,
1623}
1624
1625impl OptionalApplyOp {
1626 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1627 Self {
1628 input,
1629 body_plan,
1630 null_vars,
1631 pending: Vec::new(),
1632 pending_idx: 0,
1633 }
1634 }
1635}
1636
1637impl Operator for OptionalApplyOp {
1638 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1639 loop {
1640 if self.pending_idx < self.pending.len() {
1641 let row = self.pending[self.pending_idx].clone();
1642 self.pending_idx += 1;
1643 return Ok(Some(row));
1644 }
1645 let outer_row = match self.input.next(ctx)? {
1646 Some(r) => r,
1647 None => return Ok(None),
1648 };
1649 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1650 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1654 stacked.push(&outer_row);
1655 stacked.extend_from_slice(ctx.outer_rows);
1656 let inner_ctx = ExecCtx {
1657 store: ctx.store,
1658 writer: ctx.writer,
1659 params: ctx.params,
1660 procedures: ctx.procedures,
1661 outer_rows: &stacked,
1662 tombstones: ctx.tombstones,
1663 };
1664 let mut results = Vec::new();
1665 while let Some(body_row) = body_op.next(&inner_ctx)? {
1666 let mut merged = outer_row.clone();
1667 for (k, v) in body_row {
1668 merged.insert(k, v);
1669 }
1670 results.push(merged);
1671 }
1672 if results.is_empty() {
1673 let mut fallback = outer_row;
1674 for v in &self.null_vars {
1675 fallback.insert(v.clone(), Value::Null);
1676 }
1677 return Ok(Some(fallback));
1678 }
1679 self.pending = results;
1680 self.pending_idx = 0;
1681 }
1682 }
1683}
1684
1685struct ProcedureCallOp {
1704 input: Option<Box<dyn Operator>>,
1705 qualified_name: Vec<String>,
1706 args: Option<Vec<Expr>>,
1707 yield_spec: Option<YieldSpec>,
1708 standalone: bool,
1709 buffered: Vec<Row>,
1710 buffered_idx: usize,
1711 done: bool,
1714}
1715
1716impl ProcedureCallOp {
1717 fn new(
1718 input: Option<Box<dyn Operator>>,
1719 qualified_name: Vec<String>,
1720 args: Option<Vec<Expr>>,
1721 yield_spec: Option<YieldSpec>,
1722 standalone: bool,
1723 ) -> Self {
1724 Self {
1725 input,
1726 qualified_name,
1727 args,
1728 yield_spec,
1729 standalone,
1730 buffered: Vec::new(),
1731 buffered_idx: 0,
1732 done: false,
1733 }
1734 }
1735
1736 fn resolve_projection(
1742 &self,
1743 proc: &crate::procedures::Procedure,
1744 ) -> Result<Vec<(String, String)>> {
1745 match &self.yield_spec {
1746 None => {
1747 if !self.standalone {
1748 if proc.outputs.is_empty() {
1757 return Ok(Vec::new());
1758 }
1759 return Err(Error::Procedure(format!(
1760 "procedure '{}' has outputs but no YIELD clause",
1761 self.qualified_name.join(".")
1762 )));
1763 }
1764 Ok(proc
1765 .outputs
1766 .iter()
1767 .map(|o| (o.name.clone(), o.name.clone()))
1768 .collect())
1769 }
1770 Some(YieldSpec::Star) => {
1771 if !self.standalone {
1772 return Err(Error::Procedure(
1773 "YIELD * is only allowed on standalone CALL".into(),
1774 ));
1775 }
1776 Ok(proc
1777 .outputs
1778 .iter()
1779 .map(|o| (o.name.clone(), o.name.clone()))
1780 .collect())
1781 }
1782 Some(YieldSpec::Items(items)) => {
1783 let mut projection = Vec::with_capacity(items.len());
1784 let mut seen_aliases: std::collections::HashSet<String> =
1785 std::collections::HashSet::new();
1786 for yi in items {
1787 if !proc.outputs.iter().any(|o| o.name == yi.column) {
1788 return Err(Error::Procedure(format!(
1789 "procedure '{}' has no output column '{}'",
1790 self.qualified_name.join("."),
1791 yi.column
1792 )));
1793 }
1794 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
1795 if !seen_aliases.insert(alias.clone()) {
1796 return Err(Error::Procedure(format!(
1797 "variable '{alias}' already bound by YIELD"
1798 )));
1799 }
1800 projection.push((yi.column.clone(), alias));
1801 }
1802 Ok(projection)
1803 }
1804 }
1805 }
1806
1807 fn evaluate_args(
1814 &self,
1815 ctx: &ExecCtx,
1816 row: &Row,
1817 proc: &crate::procedures::Procedure,
1818 ) -> Result<Vec<Value>> {
1819 match &self.args {
1820 Some(exprs) => {
1821 if exprs.len() != proc.inputs.len() {
1822 return Err(Error::Procedure(format!(
1823 "procedure '{}' expects {} argument(s), got {}",
1824 self.qualified_name.join("."),
1825 proc.inputs.len(),
1826 exprs.len()
1827 )));
1828 }
1829 let eval_ctx = ctx.eval_ctx(row);
1830 let mut values = Vec::with_capacity(exprs.len());
1831 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
1832 let v = eval_expr(expr, &eval_ctx)?;
1833 if !spec.ty.accepts(&v) {
1834 return Err(Error::Procedure(format!(
1835 "argument '{}' has wrong type for procedure '{}'",
1836 spec.name,
1837 self.qualified_name.join(".")
1838 )));
1839 }
1840 values.push(coerce_arg(v, spec.ty));
1841 }
1842 Ok(values)
1843 }
1844 None => {
1845 if !self.standalone {
1847 return Err(Error::Procedure(
1848 "in-query CALL requires explicit argument list".into(),
1849 ));
1850 }
1851 let mut values = Vec::with_capacity(proc.inputs.len());
1852 for spec in &proc.inputs {
1853 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
1854 Error::Procedure(format!(
1855 "missing parameter ${} for procedure '{}'",
1856 spec.name,
1857 self.qualified_name.join(".")
1858 ))
1859 })?;
1860 if !spec.ty.accepts(&v) {
1861 return Err(Error::Procedure(format!(
1862 "parameter '{}' has wrong type",
1863 spec.name
1864 )));
1865 }
1866 values.push(coerce_arg(v, spec.ty));
1867 }
1868 Ok(values)
1869 }
1870 }
1871 }
1872
1873 fn invoke_once(
1878 &self,
1879 ctx: &ExecCtx,
1880 input_row: &Row,
1881 proc: &crate::procedures::Procedure,
1882 projection: &[(String, String)],
1883 out: &mut Vec<Row>,
1884 ) -> Result<()> {
1885 if proc.outputs.is_empty() {
1889 if !self.standalone {
1890 out.push(input_row.clone());
1891 }
1892 return Ok(());
1893 }
1894 let args = self.evaluate_args(ctx, input_row, proc)?;
1895 for proc_row in &proc.rows {
1896 if !proc.row_matches(proc_row, &args) {
1897 continue;
1898 }
1899 let mut merged = if self.standalone {
1900 Row::new()
1901 } else {
1902 input_row.clone()
1903 };
1904 for (src, alias) in projection {
1905 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
1906 merged.insert(alias.clone(), v);
1907 }
1908 out.push(merged);
1909 }
1910 Ok(())
1911 }
1912}
1913
1914fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
1919 use crate::procedures::ProcType;
1920 if matches!(ty, ProcType::Float) {
1921 if let Value::Property(Property::Int64(n)) = v {
1922 return Value::Property(Property::Float64(n as f64));
1923 }
1924 }
1925 v
1926}
1927
1928impl Operator for ProcedureCallOp {
1929 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1930 loop {
1931 if self.buffered_idx < self.buffered.len() {
1932 let row = self.buffered[self.buffered_idx].clone();
1933 self.buffered_idx += 1;
1934 return Ok(Some(row));
1935 }
1936 self.buffered.clear();
1937 self.buffered_idx = 0;
1938
1939 let proc = match ctx.procedures.get(&self.qualified_name) {
1940 Some(p) => p,
1941 None => {
1942 return Err(Error::Procedure(format!(
1943 "procedure '{}' not found",
1944 self.qualified_name.join(".")
1945 )));
1946 }
1947 };
1948 let projection = self.resolve_projection(proc)?;
1949
1950 let input_row = match &mut self.input {
1951 Some(inp) => match inp.next(ctx)? {
1952 Some(r) => r,
1953 None => return Ok(None),
1954 },
1955 None => {
1956 if self.done {
1957 return Ok(None);
1958 }
1959 self.done = true;
1960 Row::new()
1961 }
1962 };
1963
1964 let mut produced = Vec::new();
1965 self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
1966 if produced.is_empty() {
1967 if self.input.is_some() {
1968 continue;
1969 }
1970 return Ok(None);
1971 }
1972 self.buffered = produced;
1973 }
1974 }
1975}
1976
1977fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
1983 match v {
1984 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
1985 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
1986 Value::Map(pairs) => pairs
1987 .iter()
1988 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
1989 .collect(),
1990 Value::Property(Property::Map(entries)) => Ok(entries
1991 .iter()
1992 .map(|(k, p)| (k.clone(), p.clone()))
1993 .collect()),
1994 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
1995 _ => Err(Error::InvalidSetValue),
1996 }
1997}
1998
1999fn value_to_property(v: Value) -> Result<Property> {
2000 match v {
2001 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2002 Value::Property(p) => Ok(p),
2003 Value::Null => Ok(Property::Null),
2004 Value::List(items) => {
2005 let props: Vec<Property> = items
2006 .into_iter()
2007 .map(value_to_property)
2008 .collect::<Result<_>>()?;
2009 Ok(Property::List(props))
2010 }
2011 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2015 Err(Error::InvalidSetValue)
2016 }
2017 }
2018}
2019
2020struct NodeScanAllOp {
2021 var: String,
2022 ids: Option<Vec<NodeId>>,
2023 cursor: usize,
2024}
2025
2026impl NodeScanAllOp {
2027 fn new(var: String) -> Self {
2028 Self {
2029 var,
2030 ids: None,
2031 cursor: 0,
2032 }
2033 }
2034}
2035
2036impl Operator for NodeScanAllOp {
2037 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2038 if self.ids.is_none() {
2039 self.ids = Some(ctx.store.all_node_ids()?);
2040 }
2041 let ids = self.ids.as_ref().unwrap();
2042 while self.cursor < ids.len() {
2043 let id = ids[self.cursor];
2044 self.cursor += 1;
2045 if let Some(node) = ctx.store.get_node(id)? {
2046 let mut row = Row::new();
2047 row.insert(self.var.clone(), Value::Node(node));
2048 return Ok(Some(row));
2049 }
2050 }
2051 Ok(None)
2052 }
2053}
2054
2055struct NodeScanByLabelsOp {
2056 var: String,
2057 labels: Vec<String>,
2058 ids: Option<Vec<NodeId>>,
2059 cursor: usize,
2060}
2061
2062impl NodeScanByLabelsOp {
2063 fn new(var: String, labels: Vec<String>) -> Self {
2064 Self {
2065 var,
2066 labels,
2067 ids: None,
2068 cursor: 0,
2069 }
2070 }
2071}
2072
2073impl Operator for NodeScanByLabelsOp {
2074 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2075 if self.ids.is_none() {
2076 let primary = self
2078 .labels
2079 .first()
2080 .expect("NodeScanByLabels must have at least one label");
2081 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2082 }
2083 let ids = self.ids.as_ref().unwrap();
2084 while self.cursor < ids.len() {
2085 let id = ids[self.cursor];
2086 self.cursor += 1;
2087 if let Some(node) = ctx.store.get_node(id)? {
2088 if has_all_labels(&node, &self.labels) {
2089 let mut row = Row::new();
2090 row.insert(self.var.clone(), Value::Node(node));
2091 return Ok(Some(row));
2092 }
2093 }
2094 }
2095 Ok(None)
2096 }
2097}
2098
2099fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2100 labels.iter().all(|l| node.labels.contains(l))
2101}
2102
2103struct IndexSeekOp {
2115 var: String,
2116 label: String,
2117 property: String,
2118 value_expr: Expr,
2119 results: Option<Vec<NodeId>>,
2120 cursor: usize,
2121}
2122
2123impl IndexSeekOp {
2124 fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2125 Self {
2126 var,
2127 label,
2128 property,
2129 value_expr,
2130 results: None,
2131 cursor: 0,
2132 }
2133 }
2134}
2135
2136impl Operator for IndexSeekOp {
2137 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2138 if self.results.is_none() {
2139 let empty = Row::new();
2140 let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2141 let property = match value {
2142 Value::Property(p) => p,
2143 Value::Null => Property::Null,
2144 Value::Node(_)
2145 | Value::Edge(_)
2146 | Value::List(_)
2147 | Value::Map(_)
2148 | Value::Path { .. } => {
2149 return Err(Error::InvalidSetValue);
2150 }
2151 };
2152 let ids = ctx
2153 .store
2154 .nodes_by_property(&self.label, &self.property, &property)?;
2155 self.results = Some(ids);
2156 }
2157 let ids = self.results.as_ref().unwrap();
2158 while self.cursor < ids.len() {
2159 let id = ids[self.cursor];
2160 self.cursor += 1;
2161 if let Some(node) = ctx.store.get_node(id)? {
2162 let mut row = Row::new();
2163 row.insert(self.var.clone(), Value::Node(node));
2164 return Ok(Some(row));
2165 }
2166 }
2167 Ok(None)
2168 }
2169}
2170
2171fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2172 props.iter().all(|(k, v)| {
2173 node.properties
2174 .get(k)
2175 .map(|stored| stored == v)
2176 .unwrap_or(false)
2177 })
2178}
2179
2180struct MergeNodeOp {
2181 var: String,
2182 labels: Vec<String>,
2183 properties: Vec<(String, Expr)>,
2187 on_create: Vec<SetAssignment>,
2192 on_match: Vec<SetAssignment>,
2196 input: Option<Box<dyn Operator>>,
2203 merged_nodes: Vec<Node>,
2210 merge_done: bool,
2214 current_input_row: Option<Row>,
2218 cursor: usize,
2219}
2220
2221impl MergeNodeOp {
2222 fn new(
2223 input: Option<Box<dyn Operator>>,
2224 var: String,
2225 labels: Vec<String>,
2226 properties: Vec<(String, Expr)>,
2227 on_create: Vec<SetAssignment>,
2228 on_match: Vec<SetAssignment>,
2229 ) -> Self {
2230 Self {
2231 var,
2232 labels,
2233 properties,
2234 on_create,
2235 on_match,
2236 input,
2237 merged_nodes: Vec::new(),
2238 merge_done: false,
2239 current_input_row: None,
2240 cursor: 0,
2241 }
2242 }
2243
2244 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2256 let resolved_props: Vec<(String, Property)> = self
2257 .properties
2258 .iter()
2259 .map(|(k, expr)| {
2260 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2261 Ok((k.clone(), value_to_property(v)?))
2262 })
2263 .collect::<Result<Vec<_>>>()?;
2264
2265 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2266 ctx.store.nodes_by_label(primary)?
2267 } else {
2268 ctx.store.all_node_ids()?
2269 };
2270 let mut merged_nodes: Vec<Node> = Vec::new();
2271 for id in candidate_ids {
2272 if let Some(node) = ctx.store.get_node(id)? {
2273 if has_all_labels(&node, &self.labels)
2274 && matches_pattern_props(&node, &resolved_props)
2275 {
2276 merged_nodes.push(node);
2277 }
2278 }
2279 }
2280
2281 if merged_nodes.is_empty() {
2282 let mut node = Node::new();
2283 for label in &self.labels {
2284 node.labels.push(label.clone());
2285 }
2286 for (k, prop) in resolved_props {
2287 node.properties.insert(k, prop);
2288 }
2289 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2290 ctx.writer.put_node(&node)?;
2291 merged_nodes.push(node);
2292 } else if !self.on_match.is_empty() {
2293 for node in merged_nodes.iter_mut() {
2294 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2295 ctx.writer.put_node(node)?;
2296 }
2297 }
2298 Ok(merged_nodes)
2299 }
2300}
2301
2302impl Operator for MergeNodeOp {
2303 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2304 if self.input.is_none() {
2309 if !self.merge_done {
2310 let empty = Row::new();
2311 let nodes = self.run_merge_for(ctx, &empty)?;
2312 self.merged_nodes = nodes;
2313 self.merge_done = true;
2314 }
2315 if self.cursor < self.merged_nodes.len() {
2316 let node = self.merged_nodes[self.cursor].clone();
2317 self.cursor += 1;
2318 let mut row = Row::new();
2319 row.insert(self.var.clone(), Value::Node(node));
2320 return Ok(Some(row));
2321 }
2322 return Ok(None);
2323 }
2324
2325 loop {
2332 if let Some(base) = self.current_input_row.as_ref() {
2333 if self.cursor < self.merged_nodes.len() {
2334 let node = self.merged_nodes[self.cursor].clone();
2335 self.cursor += 1;
2336 let mut row = base.clone();
2337 row.insert(self.var.clone(), Value::Node(node));
2338 return Ok(Some(row));
2339 }
2340 }
2341 match self.input.as_mut().unwrap().next(ctx)? {
2342 None => return Ok(None),
2343 Some(row) => {
2344 let nodes = self.run_merge_for(ctx, &row)?;
2345 self.merged_nodes = nodes;
2346 self.cursor = 0;
2347 self.current_input_row = Some(row);
2348 }
2349 }
2350 }
2351 }
2352}
2353
2354struct MergeEdgeOp {
2373 input: Box<dyn Operator>,
2374 edge_var: String,
2375 src_var: String,
2376 dst_var: String,
2377 edge_type: String,
2378 undirected: bool,
2379 properties: Vec<(String, Expr)>,
2383 on_create: Vec<SetAssignment>,
2384 on_match: Vec<SetAssignment>,
2385 pending: std::collections::VecDeque<Row>,
2392}
2393
2394impl MergeEdgeOp {
2395 #[allow(clippy::too_many_arguments)]
2396 fn new(
2397 input: Box<dyn Operator>,
2398 edge_var: String,
2399 src_var: String,
2400 dst_var: String,
2401 edge_type: String,
2402 undirected: bool,
2403 properties: Vec<(String, Expr)>,
2404 on_create: Vec<SetAssignment>,
2405 on_match: Vec<SetAssignment>,
2406 ) -> Self {
2407 Self {
2408 input,
2409 edge_var,
2410 src_var,
2411 dst_var,
2412 edge_type,
2413 undirected,
2414 properties,
2415 on_create,
2416 on_match,
2417 pending: std::collections::VecDeque::new(),
2418 }
2419 }
2420}
2421
2422impl Operator for MergeEdgeOp {
2423 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2424 loop {
2425 if let Some(row) = self.pending.pop_front() {
2426 return Ok(Some(row));
2427 }
2428 let Some(row) = self.input.next(ctx)? else {
2429 return Ok(None);
2430 };
2431 let src_node = match row.get(&self.src_var) {
2436 Some(Value::Node(n)) => n.clone(),
2437 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2438 };
2439 let dst_node = match row.get(&self.dst_var) {
2440 Some(Value::Node(n)) => n.clone(),
2441 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2442 };
2443
2444 let required_props: Vec<(String, Property)> = self
2448 .properties
2449 .iter()
2450 .map(|(k, expr)| {
2451 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2452 Ok((k.clone(), value_to_property(v)?))
2453 })
2454 .collect::<Result<Vec<_>>>()?;
2455 let edge_matches = |edge: &Edge| -> bool {
2456 required_props.iter().all(|(k, want)| {
2457 edge.properties
2458 .get(k)
2459 .map(|have| have == want)
2460 .unwrap_or(false)
2461 })
2462 };
2463
2464 let mut matched: Vec<Edge> = Vec::new();
2471 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2472 if neighbor_id != dst_node.id {
2473 continue;
2474 }
2475 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2476 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2477 matched.push(edge);
2478 }
2479 }
2480 }
2481 if self.undirected {
2482 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2483 if neighbor_id != dst_node.id {
2484 continue;
2485 }
2486 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2487 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2488 matched.push(edge);
2489 }
2490 }
2491 }
2492 }
2493
2494 if matched.is_empty() {
2495 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2496 for (k, p) in &required_props {
2497 new_edge.properties.insert(k.clone(), p.clone());
2498 }
2499 let mut row_out = row.clone();
2500 apply_merge_edge_actions(
2501 &mut new_edge,
2502 &self.on_create,
2503 &self.edge_var,
2504 ctx,
2505 &mut row_out,
2506 )?;
2507 ctx.writer.put_edge(&new_edge)?;
2508 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2509 self.pending.push_back(row_out);
2510 } else {
2511 for mut existing in matched {
2512 let mut row_out = row.clone();
2513 if !self.on_match.is_empty() {
2514 apply_merge_edge_actions(
2515 &mut existing,
2516 &self.on_match,
2517 &self.edge_var,
2518 ctx,
2519 &mut row_out,
2520 )?;
2521 ctx.writer.put_edge(&existing)?;
2522 }
2523 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2524 self.pending.push_back(row_out);
2525 }
2526 }
2527 }
2528 }
2529}
2530
2531fn apply_merge_edge_actions(
2541 edge: &mut Edge,
2542 actions: &[SetAssignment],
2543 var: &str,
2544 exec_ctx: &ExecCtx,
2545 outer: &mut Row,
2546) -> Result<()> {
2547 if actions.is_empty() {
2548 return Ok(());
2549 }
2550 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2553 for action in actions {
2554 match action {
2555 SetAssignment::Property {
2556 var: target,
2557 key,
2558 value,
2559 } => {
2560 let sub_ctx = exec_ctx.eval_ctx(outer);
2561 let evaluated = eval_expr(value, &sub_ctx)?;
2562 let prop = value_to_property(evaluated)?;
2563 if target == var {
2564 if matches!(prop, Property::Null) {
2565 edge.properties.remove(key);
2566 } else {
2567 edge.properties.insert(key.clone(), prop);
2568 }
2569 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2570 } else {
2571 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2572 }
2573 }
2574 SetAssignment::Merge {
2575 var: target,
2576 properties,
2577 } => {
2578 let sub_ctx = exec_ctx.eval_ctx(outer);
2579 let resolved: Vec<(String, Property)> = properties
2580 .iter()
2581 .map(|(k, expr)| {
2582 let v = eval_expr(expr, &sub_ctx)?;
2583 Ok((k.clone(), value_to_property(v)?))
2584 })
2585 .collect::<Result<Vec<_>>>()?;
2586 if target == var {
2587 for (k, p) in resolved {
2588 edge.properties.insert(k, p);
2589 }
2590 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2591 } else {
2592 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2593 }
2594 }
2595 SetAssignment::Replace {
2596 var: target,
2597 properties,
2598 } => {
2599 let sub_ctx = exec_ctx.eval_ctx(outer);
2600 let resolved: Vec<(String, Property)> = properties
2601 .iter()
2602 .map(|(k, expr)| {
2603 let v = eval_expr(expr, &sub_ctx)?;
2604 Ok((k.clone(), value_to_property(v)?))
2605 })
2606 .collect::<Result<Vec<_>>>()?;
2607 if target == var {
2608 edge.properties.clear();
2609 for (k, p) in resolved {
2610 edge.properties.insert(k, p);
2611 }
2612 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2613 } else {
2614 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2615 }
2616 }
2617 SetAssignment::Labels {
2618 var: target,
2619 labels,
2620 } => {
2621 if target == var {
2622 return Err(Error::UnboundVariable(target.clone()));
2624 }
2625 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2626 }
2627 SetAssignment::ReplaceFromExpr {
2628 var: target,
2629 source,
2630 replace,
2631 } => {
2632 let sub_ctx = exec_ctx.eval_ctx(outer);
2633 let v = eval_expr(source, &sub_ctx)?;
2634 let props = extract_property_map(&v)?;
2635 if target == var {
2636 if *replace {
2637 edge.properties.clear();
2638 }
2639 for (k, p) in props {
2640 edge.properties.insert(k, p);
2641 }
2642 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2643 } else {
2644 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2645 }
2646 }
2647 }
2648 }
2649 Ok(())
2650}
2651
2652fn apply_set_prop_to_outer(
2657 outer: &mut Row,
2658 exec_ctx: &ExecCtx,
2659 target: &str,
2660 key: &str,
2661 prop: Property,
2662) -> Result<()> {
2663 match outer.get_mut(target) {
2664 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2665 return Ok(());
2668 }
2669 Some(Value::Node(n)) => {
2670 if matches!(prop, Property::Null) {
2671 n.properties.remove(key);
2672 } else {
2673 n.properties.insert(key.to_string(), prop);
2674 }
2675 exec_ctx.writer.put_node(n)?;
2676 }
2677 Some(Value::Edge(e)) => {
2678 if matches!(prop, Property::Null) {
2679 e.properties.remove(key);
2680 } else {
2681 e.properties.insert(key.to_string(), prop);
2682 }
2683 exec_ctx.writer.put_edge(e)?;
2684 }
2685 _ => return Err(Error::UnboundVariable(target.to_string())),
2686 }
2687 Ok(())
2688}
2689
2690fn apply_set_map_to_outer(
2694 outer: &mut Row,
2695 exec_ctx: &ExecCtx,
2696 target: &str,
2697 props: Vec<(String, Property)>,
2698 replace: bool,
2699) -> Result<()> {
2700 match outer.get_mut(target) {
2701 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2702 Some(Value::Node(n)) => {
2703 if replace {
2704 n.properties.clear();
2705 }
2706 for (k, p) in props {
2707 if replace || !matches!(p, Property::Null) {
2708 n.properties.insert(k, p);
2709 } else {
2710 n.properties.remove(&k);
2711 }
2712 }
2713 exec_ctx.writer.put_node(n)?;
2714 Ok(())
2715 }
2716 Some(Value::Edge(e)) => {
2717 if replace {
2718 e.properties.clear();
2719 }
2720 for (k, p) in props {
2721 if replace || !matches!(p, Property::Null) {
2722 e.properties.insert(k, p);
2723 } else {
2724 e.properties.remove(&k);
2725 }
2726 }
2727 exec_ctx.writer.put_edge(e)?;
2728 Ok(())
2729 }
2730 _ => Err(Error::UnboundVariable(target.to_string())),
2731 }
2732}
2733
2734fn apply_set_labels_to_outer(
2736 outer: &mut Row,
2737 exec_ctx: &ExecCtx,
2738 target: &str,
2739 labels: &[String],
2740) -> Result<()> {
2741 match outer.get_mut(target) {
2742 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2743 Some(Value::Node(n)) => {
2744 for label in labels {
2745 if !n.labels.contains(label) {
2746 n.labels.push(label.clone());
2747 }
2748 }
2749 exec_ctx.writer.put_node(n)?;
2750 Ok(())
2751 }
2752 _ => Err(Error::UnboundVariable(target.to_string())),
2753 }
2754}
2755
2756fn apply_merge_actions(
2765 node: &mut Node,
2766 actions: &[SetAssignment],
2767 var: &str,
2768 exec_ctx: &ExecCtx,
2769 base_row: &Row,
2770) -> Result<()> {
2771 if actions.is_empty() {
2772 return Ok(());
2773 }
2774 let mut row = base_row.clone();
2777 row.insert(var.to_string(), Value::Node(node.clone()));
2778 for action in actions {
2779 let sub_ctx = exec_ctx.eval_ctx(&row);
2780 match action {
2781 SetAssignment::Property {
2782 var: target,
2783 key,
2784 value,
2785 } => {
2786 if target != var {
2787 return Err(Error::UnboundVariable(target.clone()));
2788 }
2789 let evaluated = eval_expr(value, &sub_ctx)?;
2790 let prop = value_to_property(evaluated)?;
2791 node.properties.insert(key.clone(), prop);
2792 row.insert(var.to_string(), Value::Node(node.clone()));
2793 }
2794 SetAssignment::Labels {
2795 var: target,
2796 labels,
2797 } => {
2798 if target != var {
2799 return Err(Error::UnboundVariable(target.clone()));
2800 }
2801 for label in labels {
2802 if !node.labels.contains(label) {
2803 node.labels.push(label.clone());
2804 }
2805 }
2806 row.insert(var.to_string(), Value::Node(node.clone()));
2807 }
2808 SetAssignment::Replace {
2809 var: target,
2810 properties,
2811 } => {
2812 if target != var {
2813 return Err(Error::UnboundVariable(target.clone()));
2814 }
2815 let resolved: Vec<(String, Property)> = properties
2816 .iter()
2817 .map(|(k, expr)| {
2818 let v = eval_expr(expr, &sub_ctx)?;
2819 Ok((k.clone(), value_to_property(v)?))
2820 })
2821 .collect::<Result<Vec<_>>>()?;
2822 node.properties.clear();
2823 for (k, p) in resolved {
2824 node.properties.insert(k, p);
2825 }
2826 row.insert(var.to_string(), Value::Node(node.clone()));
2827 }
2828 SetAssignment::Merge {
2829 var: target,
2830 properties,
2831 } => {
2832 if target != var {
2833 return Err(Error::UnboundVariable(target.clone()));
2834 }
2835 let resolved: Vec<(String, Property)> = properties
2836 .iter()
2837 .map(|(k, expr)| {
2838 let v = eval_expr(expr, &sub_ctx)?;
2839 Ok((k.clone(), value_to_property(v)?))
2840 })
2841 .collect::<Result<Vec<_>>>()?;
2842 for (k, p) in resolved {
2843 node.properties.insert(k, p);
2844 }
2845 row.insert(var.to_string(), Value::Node(node.clone()));
2846 }
2847 SetAssignment::ReplaceFromExpr {
2848 var: target,
2849 source,
2850 replace,
2851 } => {
2852 if target != var {
2853 return Err(Error::UnboundVariable(target.clone()));
2854 }
2855 let v = eval_expr(source, &sub_ctx)?;
2856 let props = extract_property_map(&v)?;
2857 if *replace {
2858 node.properties.clear();
2859 }
2860 for (k, p) in props {
2861 node.properties.insert(k, p);
2862 }
2863 row.insert(var.to_string(), Value::Node(node.clone()));
2864 }
2865 }
2866 }
2867 Ok(())
2868}
2869
2870struct EdgeExpandOp {
2871 input: Box<dyn Operator>,
2872 src_var: String,
2873 edge_var: Option<String>,
2874 dst_var: String,
2875 dst_labels: Vec<String>,
2876 edge_properties: Vec<(String, Expr)>,
2877 edge_types: Vec<String>,
2878 direction: Direction,
2879 edge_constraint_var: Option<String>,
2885 current_row: Option<Row>,
2886 pending: Vec<(EdgeId, NodeId)>,
2887 pending_idx: usize,
2888}
2889
2890impl EdgeExpandOp {
2891 #[allow(clippy::too_many_arguments)]
2892 fn new(
2893 input: Box<dyn Operator>,
2894 src_var: String,
2895 edge_var: Option<String>,
2896 dst_var: String,
2897 dst_labels: Vec<String>,
2898 edge_properties: Vec<(String, Expr)>,
2899 edge_types: Vec<String>,
2900 direction: Direction,
2901 edge_constraint_var: Option<String>,
2902 ) -> Self {
2903 Self {
2904 input,
2905 src_var,
2906 edge_var,
2907 dst_var,
2908 dst_labels,
2909 edge_properties,
2910 edge_types,
2911 direction,
2912 edge_constraint_var,
2913 current_row: None,
2914 pending: Vec::new(),
2915 pending_idx: 0,
2916 }
2917 }
2918}
2919
2920impl Operator for EdgeExpandOp {
2921 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2922 loop {
2923 while self.pending_idx < self.pending.len() {
2924 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
2925 self.pending_idx += 1;
2926
2927 let edge = match ctx.store.get_edge(edge_id)? {
2928 Some(e) => e,
2929 None => continue,
2930 };
2931 if !self.edge_types.is_empty()
2932 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
2933 {
2934 continue;
2935 }
2936 if let Some(constraint_var) = &self.edge_constraint_var {
2944 let base = self
2945 .current_row
2946 .as_ref()
2947 .expect("pending edges without source row");
2948 let expected = match ctx.lookup_binding(base, constraint_var) {
2949 Some(Value::Edge(e)) => Some(e.id),
2950 _ => None,
2951 };
2952 match expected {
2953 Some(id) if id != edge.id => continue,
2954 None => continue,
2955 _ => {}
2956 }
2957 }
2958 if !self.edge_properties.is_empty() {
2963 let base = self
2964 .current_row
2965 .as_ref()
2966 .expect("pending edges without source row");
2967 let ectx = ctx.eval_ctx(base);
2968 let mut ok = true;
2969 for (key, expr) in &self.edge_properties {
2970 let expected = eval_expr(expr, &ectx)?;
2971 let actual = match edge.properties.get(key) {
2972 Some(v) => Value::Property(v.clone()),
2973 None => {
2974 ok = false;
2975 break;
2976 }
2977 };
2978 if !values_equal(&actual, &expected) {
2979 ok = false;
2980 break;
2981 }
2982 }
2983 if !ok {
2984 continue;
2985 }
2986 }
2987
2988 let neighbor = match ctx.store.get_node(neighbor_id)? {
2989 Some(n) => n,
2990 None => continue,
2991 };
2992 if !has_all_labels(&neighbor, &self.dst_labels) {
2993 continue;
2994 }
2995
2996 let base = self
2997 .current_row
2998 .as_ref()
2999 .expect("pending edges without source row");
3000 let mut out = base.clone();
3001 if let Some(ev) = &self.edge_var {
3002 out.insert(ev.clone(), Value::Edge(edge));
3003 }
3004 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3005 return Ok(Some(out));
3006 }
3007
3008 match self.input.next(ctx)? {
3009 None => return Ok(None),
3010 Some(row) => {
3011 let src_id = match row.get(&self.src_var) {
3012 Some(Value::Node(n)) => n.id,
3013 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3018 continue
3019 }
3020 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3021 };
3022 self.pending = match self.direction {
3023 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3024 Direction::Incoming => ctx.store.incoming(src_id)?,
3025 Direction::Both => {
3026 let mut all = ctx.store.outgoing(src_id)?;
3032 let mut seen: std::collections::HashSet<EdgeId> =
3033 all.iter().map(|(e, _)| *e).collect();
3034 for (e, n) in ctx.store.incoming(src_id)? {
3035 if seen.insert(e) {
3036 all.push((e, n));
3037 }
3038 }
3039 all
3040 }
3041 };
3042 self.pending_idx = 0;
3043 self.current_row = Some(row);
3044 }
3045 }
3046 }
3047 }
3048}
3049
3050struct OptionalEdgeExpandOp {
3065 input: Box<dyn Operator>,
3066 src_var: String,
3067 edge_var: Option<String>,
3068 dst_var: String,
3069 dst_labels: Vec<String>,
3070 dst_properties: Vec<(String, Expr)>,
3071 edge_types: Vec<String>,
3072 direction: Direction,
3073 dst_constraint_var: Option<String>,
3079 edge_constraint_var: Option<String>,
3084 current_row: Option<Row>,
3085 pending: Vec<(EdgeId, NodeId)>,
3086 pending_idx: usize,
3087 yielded_for_current: bool,
3088}
3089
3090impl OptionalEdgeExpandOp {
3091 #[allow(clippy::too_many_arguments)]
3092 fn new(
3093 input: Box<dyn Operator>,
3094 src_var: String,
3095 edge_var: Option<String>,
3096 dst_var: String,
3097 dst_labels: Vec<String>,
3098 dst_properties: Vec<(String, Expr)>,
3099 edge_types: Vec<String>,
3100 direction: Direction,
3101 dst_constraint_var: Option<String>,
3102 edge_constraint_var: Option<String>,
3103 ) -> Self {
3104 Self {
3105 input,
3106 src_var,
3107 edge_var,
3108 dst_var,
3109 dst_labels,
3110 dst_properties,
3111 edge_types,
3112 direction,
3113 dst_constraint_var,
3114 edge_constraint_var,
3115 current_row: None,
3116 pending: Vec::new(),
3117 pending_idx: 0,
3118 yielded_for_current: false,
3119 }
3120 }
3121}
3122
3123impl Operator for OptionalEdgeExpandOp {
3124 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3125 loop {
3126 while self.pending_idx < self.pending.len() {
3127 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3128 self.pending_idx += 1;
3129
3130 let edge = match ctx.store.get_edge(edge_id)? {
3131 Some(e) => e,
3132 None => continue,
3133 };
3134 if !self.edge_types.is_empty()
3135 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3136 {
3137 continue;
3138 }
3139 if let Some(constraint_var) = &self.edge_constraint_var {
3145 let base = self
3146 .current_row
3147 .as_ref()
3148 .expect("pending without source row");
3149 let expected = match ctx.lookup_binding(base, constraint_var) {
3150 Some(Value::Edge(e)) => Some(e.id),
3151 _ => None,
3152 };
3153 match expected {
3154 Some(id) if id != edge.id => continue,
3155 None => continue,
3156 _ => {}
3157 }
3158 }
3159
3160 let neighbor = match ctx.store.get_node(neighbor_id)? {
3161 Some(n) => n,
3162 None => continue,
3163 };
3164 if !has_all_labels(&neighbor, &self.dst_labels) {
3165 continue;
3166 }
3167 if let Some(constraint_var) = &self.dst_constraint_var {
3174 let base = self
3175 .current_row
3176 .as_ref()
3177 .expect("pending without source row");
3178 let bound_id = match base.get(constraint_var) {
3179 Some(Value::Node(n)) => Some(n.id),
3180 Some(Value::Null)
3181 | Some(Value::Property(meshdb_core::Property::Null))
3182 | None => None,
3183 _ => None,
3184 };
3185 match bound_id {
3186 Some(id) if id != neighbor.id => continue,
3187 None => continue,
3188 _ => {}
3189 }
3190 }
3191 if !self.dst_properties.is_empty() {
3192 let base = self
3193 .current_row
3194 .as_ref()
3195 .expect("pending without source row");
3196 let ectx = ctx.eval_ctx(base);
3197 let mut props_ok = true;
3198 for (key, expr) in &self.dst_properties {
3199 let expected = eval_expr(expr, &ectx)?;
3200 let actual = neighbor
3201 .properties
3202 .get(key)
3203 .cloned()
3204 .map(Value::Property)
3205 .unwrap_or(Value::Null);
3206 if !values_equal(&expected, &actual) {
3207 props_ok = false;
3208 break;
3209 }
3210 }
3211 if !props_ok {
3212 continue;
3213 }
3214 }
3215
3216 let base = self
3217 .current_row
3218 .as_ref()
3219 .expect("pending edges without source row");
3220 let mut out = base.clone();
3221 if let Some(ev) = &self.edge_var {
3222 out.insert(ev.clone(), Value::Edge(edge));
3223 }
3224 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3225 self.yielded_for_current = true;
3226 return Ok(Some(out));
3227 }
3228
3229 if let Some(base) = self.current_row.take() {
3239 if !self.yielded_for_current {
3240 let mut out = base;
3241 if let Some(ev) = &self.edge_var {
3242 let preserve = self
3243 .edge_constraint_var
3244 .as_ref()
3245 .map(|c| c == ev)
3246 .unwrap_or(false);
3247 if !preserve {
3248 out.insert(ev.clone(), Value::Null);
3249 }
3250 }
3251 let preserve_dst = self
3252 .dst_constraint_var
3253 .as_ref()
3254 .map(|c| c == &self.dst_var)
3255 .unwrap_or(false);
3256 if !preserve_dst {
3257 out.insert(self.dst_var.clone(), Value::Null);
3258 }
3259 self.yielded_for_current = true;
3260 return Ok(Some(out));
3261 }
3262 }
3263
3264 match self.input.next(ctx)? {
3265 None => return Ok(None),
3266 Some(row) => {
3267 let src_id = match row.get(&self.src_var) {
3268 Some(Value::Node(n)) => n.id,
3269 Some(Value::Null) => {
3276 self.pending = Vec::new();
3277 self.pending_idx = 0;
3278 self.yielded_for_current = false;
3279 self.current_row = Some(row);
3280 continue;
3281 }
3282 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3283 };
3284 self.pending = match self.direction {
3285 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3286 Direction::Incoming => ctx.store.incoming(src_id)?,
3287 Direction::Both => {
3288 let mut all = ctx.store.outgoing(src_id)?;
3294 let mut seen: std::collections::HashSet<EdgeId> =
3295 all.iter().map(|(e, _)| *e).collect();
3296 for (e, n) in ctx.store.incoming(src_id)? {
3297 if seen.insert(e) {
3298 all.push((e, n));
3299 }
3300 }
3301 all
3302 }
3303 };
3304 self.pending_idx = 0;
3305 self.yielded_for_current = false;
3306 self.current_row = Some(row);
3307 }
3308 }
3309 }
3310 }
3311}
3312
3313struct VarLengthExpandOp {
3314 input: Box<dyn Operator>,
3315 src_var: String,
3316 edge_var: Option<String>,
3317 dst_var: String,
3318 dst_labels: Vec<String>,
3319 edge_types: Vec<String>,
3320 edge_properties: Vec<(String, Expr)>,
3326 direction: Direction,
3327 min_hops: u64,
3328 max_hops: u64,
3329 path_var: Option<String>,
3330 optional: bool,
3336 dst_constraint_var: Option<String>,
3343 bound_edge_list_var: Option<String>,
3347 excluded_edge_vars: Vec<String>,
3355 current_row: Option<Row>,
3356 pending_paths: Vec<Vec<Edge>>,
3357 pending_node_paths: Vec<Vec<NodeId>>,
3358 pending_targets: Vec<NodeId>,
3359 pending_idx: usize,
3360}
3361
3362impl VarLengthExpandOp {
3363 #[allow(clippy::too_many_arguments)]
3364 fn new(
3365 input: Box<dyn Operator>,
3366 src_var: String,
3367 edge_var: Option<String>,
3368 dst_var: String,
3369 dst_labels: Vec<String>,
3370 edge_types: Vec<String>,
3371 edge_properties: Vec<(String, Expr)>,
3372 direction: Direction,
3373 min_hops: u64,
3374 max_hops: u64,
3375 path_var: Option<String>,
3376 optional: bool,
3377 dst_constraint_var: Option<String>,
3378 bound_edge_list_var: Option<String>,
3379 excluded_edge_vars: Vec<String>,
3380 ) -> Self {
3381 Self {
3382 input,
3383 src_var,
3384 edge_var,
3385 dst_var,
3386 dst_labels,
3387 edge_types,
3388 edge_properties,
3389 direction,
3390 min_hops,
3391 max_hops,
3392 path_var,
3393 optional,
3394 dst_constraint_var,
3395 bound_edge_list_var,
3396 excluded_edge_vars,
3397 current_row: None,
3398 pending_paths: Vec::new(),
3399 pending_node_paths: Vec::new(),
3400 pending_targets: Vec::new(),
3401 pending_idx: 0,
3402 }
3403 }
3404
3405 fn enumerate(
3406 &self,
3407 ctx: &ExecCtx,
3408 start: NodeId,
3409 input_row: &Row,
3410 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3411 let mut paths: Vec<Vec<Edge>> = Vec::new();
3412 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3413 let mut targets: Vec<NodeId> = Vec::new();
3414 let mut edge_buf: Vec<Edge> = Vec::new();
3415 let mut node_buf: Vec<NodeId> = vec![start];
3416 let mut used: HashSet<EdgeId> = HashSet::new();
3423 for var in &self.excluded_edge_vars {
3424 match ctx.lookup_binding(input_row, var) {
3425 Some(Value::Edge(e)) => {
3426 used.insert(e.id);
3427 }
3428 Some(Value::List(items)) => {
3429 for item in items {
3430 if let Value::Edge(e) = item {
3431 used.insert(e.id);
3432 }
3433 }
3434 }
3435 _ => {}
3436 }
3437 }
3438 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3442 Vec::new()
3443 } else {
3444 let ectx = ctx.eval_ctx(input_row);
3445 self.edge_properties
3446 .iter()
3447 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3448 .collect::<Result<Vec<_>>>()?
3449 };
3450 self.dfs(
3451 ctx,
3452 start,
3453 &expected_edge_props,
3454 &mut edge_buf,
3455 &mut node_buf,
3456 &mut used,
3457 &mut paths,
3458 &mut node_paths,
3459 &mut targets,
3460 )?;
3461 Ok((paths, node_paths, targets))
3462 }
3463
3464 #[allow(clippy::too_many_arguments)]
3465 fn dfs(
3466 &self,
3467 ctx: &ExecCtx,
3468 current_node: NodeId,
3469 expected_edge_props: &[(String, Value)],
3470 edge_buf: &mut Vec<Edge>,
3471 node_buf: &mut Vec<NodeId>,
3472 used: &mut HashSet<EdgeId>,
3473 out_paths: &mut Vec<Vec<Edge>>,
3474 out_node_paths: &mut Vec<Vec<NodeId>>,
3475 out_targets: &mut Vec<NodeId>,
3476 ) -> Result<()> {
3477 let depth = edge_buf.len() as u64;
3478
3479 if depth >= self.min_hops && depth <= self.max_hops {
3480 let terminal_ok = match ctx.store.get_node(current_node)? {
3481 Some(node) => has_all_labels(&node, &self.dst_labels),
3482 None => false,
3483 };
3484 if terminal_ok {
3485 out_paths.push(edge_buf.clone());
3486 out_node_paths.push(node_buf.clone());
3487 out_targets.push(current_node);
3488 }
3489 }
3490
3491 if depth >= self.max_hops {
3492 return Ok(());
3493 }
3494
3495 let neighbors = match self.direction {
3496 Direction::Outgoing => ctx.store.outgoing(current_node)?,
3497 Direction::Incoming => ctx.store.incoming(current_node)?,
3498 Direction::Both => {
3499 let mut all = ctx.store.outgoing(current_node)?;
3500 all.extend(ctx.store.incoming(current_node)?);
3501 all
3502 }
3503 };
3504
3505 for (eid, neighbor_id) in neighbors {
3506 if used.contains(&eid) {
3507 continue;
3508 }
3509 let edge = match ctx.store.get_edge(eid)? {
3510 Some(e) => e,
3511 None => continue,
3512 };
3513 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3514 {
3515 continue;
3516 }
3517 if !expected_edge_props.is_empty() {
3522 let mut ok = true;
3523 for (key, expected) in expected_edge_props {
3524 let actual = match edge.properties.get(key) {
3525 Some(v) => Value::Property(v.clone()),
3526 None => {
3527 ok = false;
3528 break;
3529 }
3530 };
3531 if !values_equal(&actual, expected) {
3532 ok = false;
3533 break;
3534 }
3535 }
3536 if !ok {
3537 continue;
3538 }
3539 }
3540 used.insert(eid);
3541 edge_buf.push(edge);
3542 node_buf.push(neighbor_id);
3543 self.dfs(
3544 ctx,
3545 neighbor_id,
3546 expected_edge_props,
3547 edge_buf,
3548 node_buf,
3549 used,
3550 out_paths,
3551 out_node_paths,
3552 out_targets,
3553 )?;
3554 edge_buf.pop();
3555 node_buf.pop();
3556 used.remove(&eid);
3557 }
3558
3559 Ok(())
3560 }
3561}
3562
3563fn replay_edge_list(
3581 ctx: &ExecCtx,
3582 row: &Row,
3583 list_var: &str,
3584 src_id: Option<NodeId>,
3585 direction: Direction,
3586 edge_types: &[String],
3587) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3588 let start = match src_id {
3589 Some(id) => id,
3590 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3591 };
3592 let list = match ctx.lookup_binding(row, list_var) {
3593 Some(Value::List(items)) => items.clone(),
3594 Some(Value::Property(meshdb_core::Property::List(items))) => items
3595 .iter()
3596 .cloned()
3597 .map(Value::Property)
3598 .collect::<Vec<_>>(),
3599 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3600 };
3601 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3602 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3603 node_buf.push(start);
3604 let mut current = start;
3605 for item in list {
3606 let edge = match item {
3607 Value::Edge(e) => e,
3608 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3609 };
3610 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3611 return Ok((Vec::new(), Vec::new(), Vec::new()));
3612 }
3613 let next_node = match direction {
3614 Direction::Outgoing => {
3615 if edge.source != current {
3616 return Ok((Vec::new(), Vec::new(), Vec::new()));
3617 }
3618 edge.target
3619 }
3620 Direction::Incoming => {
3621 if edge.target != current {
3622 return Ok((Vec::new(), Vec::new(), Vec::new()));
3623 }
3624 edge.source
3625 }
3626 Direction::Both => {
3627 if edge.source == current {
3628 edge.target
3629 } else if edge.target == current {
3630 edge.source
3631 } else {
3632 return Ok((Vec::new(), Vec::new(), Vec::new()));
3633 }
3634 }
3635 };
3636 if ctx.store.get_node(next_node)?.is_none() {
3640 return Ok((Vec::new(), Vec::new(), Vec::new()));
3641 }
3642 edge_buf.push(edge);
3643 node_buf.push(next_node);
3644 current = next_node;
3645 }
3646 Ok((vec![edge_buf], vec![node_buf], vec![current]))
3647}
3648
3649impl Operator for VarLengthExpandOp {
3650 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3651 loop {
3652 while self.pending_idx < self.pending_paths.len() {
3653 let i = self.pending_idx;
3654 self.pending_idx += 1;
3655
3656 let target_id = self.pending_targets[i];
3657 let target = match ctx.store.get_node(target_id)? {
3658 Some(n) => n,
3659 None => continue,
3660 };
3661
3662 let base = self
3663 .current_row
3664 .as_ref()
3665 .expect("pending without source row");
3666 let mut out = base.clone();
3667 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3668 if let Some(ev) = &self.edge_var {
3669 let edges: Vec<Value> = self.pending_paths[i]
3670 .iter()
3671 .cloned()
3672 .map(Value::Edge)
3673 .collect();
3674 out.insert(ev.clone(), Value::List(edges));
3675 }
3676 if let Some(pv) = &self.path_var {
3677 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3678 for nid in &self.pending_node_paths[i] {
3679 match ctx.store.get_node(*nid)? {
3680 Some(n) => nodes.push(n),
3681 None => continue,
3682 }
3683 }
3684 let edges = self.pending_paths[i].clone();
3685 out.insert(pv.clone(), Value::Path { nodes, edges });
3686 }
3687 return Ok(Some(out));
3688 }
3689
3690 match self.input.next(ctx)? {
3691 None => return Ok(None),
3692 Some(row) => {
3693 let src_id = match row.get(&self.src_var) {
3694 Some(Value::Node(n)) => Some(n.id),
3695 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3701 None
3702 }
3703 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3704 };
3705 let (mut paths, mut node_paths, mut targets) =
3713 if let Some(list_var) = &self.bound_edge_list_var {
3714 replay_edge_list(
3715 ctx,
3716 &row,
3717 list_var,
3718 src_id,
3719 self.direction,
3720 &self.edge_types,
3721 )?
3722 } else {
3723 match src_id {
3724 Some(id) => self.enumerate(ctx, id, &row)?,
3725 None => (Vec::new(), Vec::new(), Vec::new()),
3726 }
3727 };
3728 if let Some(constraint_var) = &self.dst_constraint_var {
3735 let target_id = match row.get(constraint_var) {
3736 Some(Value::Node(n)) => Some(n.id),
3737 _ => None,
3738 };
3739 match target_id {
3740 Some(id) => {
3741 let mut kept_paths = Vec::new();
3742 let mut kept_node_paths = Vec::new();
3743 let mut kept_targets = Vec::new();
3744 for ((p, np), t) in paths
3745 .drain(..)
3746 .zip(node_paths.drain(..))
3747 .zip(targets.drain(..))
3748 {
3749 if t == id {
3750 kept_paths.push(p);
3751 kept_node_paths.push(np);
3752 kept_targets.push(t);
3753 }
3754 }
3755 paths = kept_paths;
3756 node_paths = kept_node_paths;
3757 targets = kept_targets;
3758 }
3759 None => {
3760 paths.clear();
3761 node_paths.clear();
3762 targets.clear();
3763 }
3764 }
3765 }
3766 if paths.is_empty() && self.optional {
3767 let mut out = row;
3772 if let Some(ev) = &self.edge_var {
3773 out.insert(ev.clone(), Value::Null);
3774 }
3775 out.insert(self.dst_var.clone(), Value::Null);
3776 if let Some(pv) = &self.path_var {
3777 out.insert(pv.clone(), Value::Null);
3778 }
3779 return Ok(Some(out));
3780 }
3781 self.pending_paths = paths;
3782 self.pending_node_paths = node_paths;
3783 self.pending_targets = targets;
3784 self.pending_idx = 0;
3785 self.current_row = Some(row);
3786 }
3787 }
3788 }
3789 }
3790}
3791
3792struct FilterOp {
3793 input: Box<dyn Operator>,
3794 predicate: Expr,
3795}
3796
3797impl FilterOp {
3798 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
3799 Self { input, predicate }
3800 }
3801}
3802
3803impl Operator for FilterOp {
3804 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3805 while let Some(row) = self.input.next(ctx)? {
3806 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
3807 Ok(v) => v,
3808 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
3811 Err(e) => return Err(e),
3812 };
3813 if to_bool(&v).unwrap_or(false) {
3814 return Ok(Some(row));
3815 }
3816 }
3817 Ok(None)
3818 }
3819}
3820
3821struct IdentityOp {
3824 input: Box<dyn Operator>,
3825}
3826
3827impl IdentityOp {
3828 fn new(input: Box<dyn Operator>) -> Self {
3829 Self { input }
3830 }
3831}
3832
3833impl Operator for IdentityOp {
3834 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3835 self.input.next(ctx)
3836 }
3837}
3838
3839struct CoalesceNullRowOp {
3845 input: Box<dyn Operator>,
3846 null_vars: Vec<String>,
3847 produced_any: bool,
3848 done: bool,
3849}
3850
3851impl CoalesceNullRowOp {
3852 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
3853 Self {
3854 input,
3855 null_vars,
3856 produced_any: false,
3857 done: false,
3858 }
3859 }
3860}
3861
3862impl Operator for CoalesceNullRowOp {
3863 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3864 if self.done {
3865 return Ok(None);
3866 }
3867 match self.input.next(ctx)? {
3868 Some(row) => {
3869 self.produced_any = true;
3870 Ok(Some(row))
3871 }
3872 None => {
3873 self.done = true;
3874 if self.produced_any {
3875 Ok(None)
3876 } else {
3877 let mut row = Row::new();
3878 for v in &self.null_vars {
3879 row.insert(v.clone(), Value::Null);
3880 }
3881 Ok(Some(row))
3882 }
3883 }
3884 }
3885 }
3886}
3887
3888struct ProjectOp {
3889 input: Box<dyn Operator>,
3890 items: Vec<ReturnItem>,
3891}
3892
3893impl ProjectOp {
3894 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
3895 Self { input, items }
3896 }
3897}
3898
3899impl Operator for ProjectOp {
3900 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3901 match self.input.next(ctx)? {
3902 Some(row) => {
3903 let mut out = Row::new();
3904 for (i, item) in self.items.iter().enumerate() {
3905 let name = item.alias.clone().unwrap_or_else(|| {
3906 item.raw_text
3907 .clone()
3908 .unwrap_or_else(|| default_name(&item.expr, i))
3909 });
3910 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
3911 out.insert(name, value);
3912 }
3913 Ok(Some(out))
3914 }
3915 None => Ok(None),
3916 }
3917 }
3918}
3919
3920fn default_name(expr: &Expr, idx: usize) -> String {
3921 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
3922}
3923
3924fn render_expr_name(expr: &Expr) -> Option<String> {
3925 Some(match expr {
3926 Expr::Identifier(s) => s.clone(),
3927 Expr::Property { var, key } => format!("{var}.{key}"),
3928 Expr::PropertyAccess { base, key } => {
3929 if matches!(
3933 base.as_ref(),
3934 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
3935 ) {
3936 format!("({}).{key}", render_expr_name(base)?)
3937 } else {
3938 format!("{}.{key}", render_expr_name(base)?)
3939 }
3940 }
3941 Expr::Parameter(name) => format!("${name}"),
3942 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
3943 Expr::Literal(Literal::Integer(i)) => i.to_string(),
3944 Expr::Literal(Literal::Float(f)) => f.to_string(),
3945 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
3946 Expr::Literal(Literal::Null) => "NULL".into(),
3947 Expr::Call { name, args } => {
3948 let arg_str = match args {
3949 CallArgs::Star => "*".into(),
3950 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
3951 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
3952 "DISTINCT "
3953 } else {
3954 ""
3955 };
3956 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
3957 if inner.len() != es.len() {
3958 return None;
3959 }
3960 format!("{prefix}{}", inner.join(", "))
3961 }
3962 };
3963 format!("{name}({arg_str})")
3964 }
3965 Expr::BinaryOp { op, left, right } => {
3966 let op_str = match op {
3967 BinaryOp::Add => " + ",
3968 BinaryOp::Sub => " - ",
3969 BinaryOp::Mul => " * ",
3970 BinaryOp::Div => " / ",
3971 BinaryOp::Mod => " % ",
3972 BinaryOp::Pow => " ^ ",
3973 };
3974 format!(
3975 "{}{op_str}{}",
3976 render_expr_name(left)?,
3977 render_expr_name(right)?
3978 )
3979 }
3980 Expr::UnaryOp { op, operand } => {
3981 let op_str = match op {
3982 UnaryOp::Neg => "-",
3983 };
3984 format!("{op_str}{}", render_expr_name(operand)?)
3985 }
3986 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
3987 Expr::IsNull { negated, inner } => {
3988 if *negated {
3989 format!("{} IS NOT NULL", render_expr_name(inner)?)
3990 } else {
3991 format!("{} IS NULL", render_expr_name(inner)?)
3992 }
3993 }
3994 Expr::Compare { op, left, right } => {
3995 let op_str = match op {
3996 CompareOp::Eq => " = ",
3997 CompareOp::Ne => " <> ",
3998 CompareOp::Lt => " < ",
3999 CompareOp::Le => " <= ",
4000 CompareOp::Gt => " > ",
4001 CompareOp::Ge => " >= ",
4002 CompareOp::StartsWith => " STARTS WITH ",
4003 CompareOp::EndsWith => " ENDS WITH ",
4004 CompareOp::Contains => " CONTAINS ",
4005 CompareOp::RegexMatch => " =~ ",
4006 };
4007 format!(
4008 "{}{op_str}{}",
4009 render_expr_name(left)?,
4010 render_expr_name(right)?
4011 )
4012 }
4013 Expr::List(items) => {
4014 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4015 if inner.len() != items.len() {
4016 return None;
4017 }
4018 format!("[{}]", inner.join(", "))
4019 }
4020 Expr::Map(entries) => {
4021 let inner: Vec<String> = entries
4022 .iter()
4023 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4024 .collect::<Option<Vec<_>>>()?;
4025 format!("{{{}}}", inner.join(", "))
4026 }
4027 Expr::IndexAccess { base, index } => {
4028 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4029 }
4030 Expr::InList { element, list } => {
4031 format!(
4032 "{} IN {}",
4033 render_expr_name(element)?,
4034 render_expr_name(list)?
4035 )
4036 }
4037 Expr::HasLabels { expr, labels } => {
4038 let mut s = format!("({}", render_expr_name(expr)?);
4039 for l in labels {
4040 s.push(':');
4041 s.push_str(l);
4042 }
4043 s.push(')');
4044 s
4045 }
4046 _ => return None,
4047 })
4048}
4049
4050struct DistinctOp {
4051 input: Box<dyn Operator>,
4052 seen: HashSet<String>,
4053}
4054
4055impl DistinctOp {
4056 fn new(input: Box<dyn Operator>) -> Self {
4057 Self {
4058 input,
4059 seen: HashSet::new(),
4060 }
4061 }
4062}
4063
4064impl Operator for DistinctOp {
4065 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4066 while let Some(row) = self.input.next(ctx)? {
4067 let key = row_key(&row);
4068 if self.seen.insert(key) {
4069 return Ok(Some(row));
4070 }
4071 }
4072 Ok(None)
4073 }
4074}
4075
4076struct BindPathOp {
4092 input: Box<dyn Operator>,
4093 path_var: String,
4094 node_vars: Vec<String>,
4095 edge_vars: Vec<String>,
4096}
4097
4098impl BindPathOp {
4099 fn new(
4100 input: Box<dyn Operator>,
4101 path_var: String,
4102 node_vars: Vec<String>,
4103 edge_vars: Vec<String>,
4104 ) -> Self {
4105 Self {
4106 input,
4107 path_var,
4108 node_vars,
4109 edge_vars,
4110 }
4111 }
4112}
4113
4114impl Operator for BindPathOp {
4115 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4116 let Some(mut row) = self.input.next(ctx)? else {
4117 return Ok(None);
4118 };
4119 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4123 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4124 let mut abort = false;
4125 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4132 nodes.push(n.clone());
4133 } else {
4134 abort = true;
4135 }
4136 if !abort {
4137 for (i, ev) in self.edge_vars.iter().enumerate() {
4138 match row.get(ev) {
4139 Some(Value::Edge(e)) => {
4140 edges.push(e.clone());
4141 match row.get(&self.node_vars[i + 1]) {
4142 Some(Value::Node(n)) => nodes.push(n.clone()),
4143 _ => {
4144 abort = true;
4145 break;
4146 }
4147 }
4148 }
4149 Some(Value::Path {
4150 nodes: sub_nodes,
4151 edges: sub_edges,
4152 }) => {
4153 edges.extend(sub_edges.iter().cloned());
4159 if sub_nodes.len() > 1 {
4160 nodes.extend(sub_nodes[1..].iter().cloned());
4161 }
4162 }
4163 _ => {
4164 abort = true;
4165 break;
4166 }
4167 }
4168 }
4169 }
4170 if abort {
4171 row.insert(self.path_var.clone(), Value::Null);
4172 } else {
4173 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4174 }
4175 Ok(Some(row))
4176 }
4177}
4178
4179struct ShortestPathOp {
4198 input: Box<dyn Operator>,
4199 src_var: String,
4200 dst_var: String,
4201 path_var: String,
4202 edge_types: Vec<String>,
4203 direction: meshdb_cypher::Direction,
4204 max_hops: u64,
4205 kind: meshdb_cypher::ShortestKind,
4206 pending: std::collections::VecDeque<(Row, Value)>,
4213}
4214
4215impl ShortestPathOp {
4216 #[allow(clippy::too_many_arguments)]
4217 fn new(
4218 input: Box<dyn Operator>,
4219 src_var: String,
4220 dst_var: String,
4221 path_var: String,
4222 edge_types: Vec<String>,
4223 direction: meshdb_cypher::Direction,
4224 max_hops: u64,
4225 kind: meshdb_cypher::ShortestKind,
4226 ) -> Self {
4227 Self {
4228 input,
4229 src_var,
4230 dst_var,
4231 path_var,
4232 edge_types,
4233 direction,
4234 max_hops,
4235 kind,
4236 pending: std::collections::VecDeque::new(),
4237 }
4238 }
4239}
4240
4241impl Operator for ShortestPathOp {
4242 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4243 loop {
4244 if let Some((mut row, path)) = self.pending.pop_front() {
4249 row.insert(self.path_var.clone(), path);
4250 return Ok(Some(row));
4251 }
4252 let Some(row) = self.input.next(ctx)? else {
4253 return Ok(None);
4254 };
4255 let src = match row.get(&self.src_var) {
4256 Some(Value::Node(n)) => n.clone(),
4257 _ => continue,
4258 };
4259 let dst = match row.get(&self.dst_var) {
4260 Some(Value::Node(n)) => n.clone(),
4261 _ => continue,
4262 };
4263 let paths = bfs_shortest_paths(
4264 &src,
4265 &dst,
4266 &self.edge_types,
4267 self.direction,
4268 self.max_hops,
4269 self.kind,
4270 ctx.store,
4271 )?;
4272 if paths.is_empty() {
4273 continue;
4275 }
4276 for path in paths {
4277 self.pending.push_back((row.clone(), path));
4278 }
4279 }
4280 }
4281}
4282
4283fn bfs_shortest_paths(
4302 src: &Node,
4303 dst: &Node,
4304 edge_types: &[String],
4305 direction: meshdb_cypher::Direction,
4306 max_hops: u64,
4307 kind: meshdb_cypher::ShortestKind,
4308 reader: &dyn crate::reader::GraphReader,
4309) -> Result<Vec<Value>> {
4310 use meshdb_cypher::Direction;
4311
4312 if src.id == dst.id {
4313 return Ok(vec![Value::Path {
4314 nodes: vec![src.clone()],
4315 edges: vec![],
4316 }]);
4317 }
4318
4319 let mut dist: HashMap<NodeId, u64> = HashMap::new();
4325 dist.insert(src.id, 0);
4326 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4327
4328 let mut frontier: Vec<NodeId> = vec![src.id];
4329 let mut depth: u64 = 0;
4330 let mut found = false;
4331
4332 while !frontier.is_empty() && depth < max_hops && !found {
4333 let mut next_frontier: Vec<NodeId> = Vec::new();
4334 for node_id in &frontier {
4335 let neighbors = match direction {
4336 Direction::Outgoing => reader.outgoing(*node_id)?,
4337 Direction::Incoming => reader.incoming(*node_id)?,
4338 Direction::Both => {
4339 let mut out = reader.outgoing(*node_id)?;
4340 out.extend(reader.incoming(*node_id)?);
4341 out
4342 }
4343 };
4344 for (edge_id, neighbor_id) in neighbors {
4345 if !edge_types.is_empty() {
4348 let edge = match reader.get_edge(edge_id)? {
4349 Some(e) => e,
4350 None => continue,
4351 };
4352 if !edge_types.iter().any(|t| t == &edge.edge_type) {
4353 continue;
4354 }
4355 }
4356 match dist.get(&neighbor_id) {
4357 Some(&d) if d == depth + 1 => {
4358 parents
4364 .entry(neighbor_id)
4365 .or_default()
4366 .push((*node_id, edge_id));
4367 }
4368 Some(_) => {
4369 }
4373 None => {
4374 dist.insert(neighbor_id, depth + 1);
4375 parents
4376 .entry(neighbor_id)
4377 .or_default()
4378 .push((*node_id, edge_id));
4379 if neighbor_id == dst.id {
4380 found = true;
4381 } else {
4382 next_frontier.push(neighbor_id);
4383 }
4384 }
4385 }
4386 }
4387 }
4388 depth += 1;
4389 if !found {
4390 frontier = next_frontier;
4391 }
4392 }
4393
4394 if !found {
4395 return Ok(Vec::new());
4396 }
4397
4398 let mut out: Vec<Value> = Vec::new();
4402 let mut nodes_rev: Vec<Node> = Vec::new();
4403 let mut edges_rev: Vec<Edge> = Vec::new();
4404 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4405 collect_shortest_paths(
4406 src,
4407 dst,
4408 &parents,
4409 reader,
4410 &mut nodes_rev,
4411 &mut edges_rev,
4412 &mut out,
4413 only_first,
4414 )?;
4415 Ok(out)
4416}
4417
4418#[allow(clippy::too_many_arguments)]
4430fn collect_shortest_paths(
4431 src: &Node,
4432 current: &Node,
4433 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4434 reader: &dyn crate::reader::GraphReader,
4435 nodes_rev: &mut Vec<Node>,
4436 edges_rev: &mut Vec<Edge>,
4437 out: &mut Vec<Value>,
4438 only_first: bool,
4439) -> Result<()> {
4440 if current.id == src.id {
4441 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4446 nodes.push(src.clone());
4447 nodes.extend(nodes_rev.iter().rev().cloned());
4448 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4449 out.push(Value::Path { nodes, edges });
4450 return Ok(());
4451 }
4452 let Some(parent_edges) = parents.get(¤t.id) else {
4453 return Ok(());
4457 };
4458 for (parent_id, edge_id) in parent_edges {
4459 if only_first && !out.is_empty() {
4460 return Ok(());
4461 }
4462 let edge = reader
4463 .get_edge(*edge_id)?
4464 .expect("BFS inserted this edge id; it must still exist");
4465 let parent_node = reader
4466 .get_node(*parent_id)?
4467 .expect("BFS visited this node id; it must still exist");
4468 nodes_rev.push(current.clone());
4469 edges_rev.push(edge);
4470 collect_shortest_paths(
4471 src,
4472 &parent_node,
4473 parents,
4474 reader,
4475 nodes_rev,
4476 edges_rev,
4477 out,
4478 only_first,
4479 )?;
4480 nodes_rev.pop();
4481 edges_rev.pop();
4482 }
4483 Ok(())
4484}
4485
4486struct UnionOp {
4495 branches: Vec<Box<dyn Operator>>,
4496 current: usize,
4497 seen: Option<HashSet<String>>,
4498}
4499
4500impl UnionOp {
4501 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4502 Self {
4503 branches,
4504 current: 0,
4505 seen: if all { None } else { Some(HashSet::new()) },
4506 }
4507 }
4508}
4509
4510impl Operator for UnionOp {
4511 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4512 while self.current < self.branches.len() {
4513 match self.branches[self.current].next(ctx)? {
4514 Some(row) => {
4515 if let Some(seen) = self.seen.as_mut() {
4516 let key = row_key(&row);
4517 if !seen.insert(key) {
4518 continue;
4519 }
4520 }
4521 return Ok(Some(row));
4522 }
4523 None => {
4524 self.current += 1;
4525 }
4526 }
4527 }
4528 Ok(None)
4529 }
4530}
4531
4532struct OrderByOp {
4533 input: Box<dyn Operator>,
4534 sort_items: Vec<SortItem>,
4535 sorted: Option<Vec<Row>>,
4536 cursor: usize,
4537}
4538
4539impl OrderByOp {
4540 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4541 Self {
4542 input,
4543 sort_items,
4544 sorted: None,
4545 cursor: 0,
4546 }
4547 }
4548}
4549
4550impl Operator for OrderByOp {
4551 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4552 if self.sorted.is_none() {
4553 let mut rows: Vec<Row> = Vec::new();
4554 while let Some(row) = self.input.next(ctx)? {
4555 rows.push(row);
4556 }
4557 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4558 for row in rows {
4559 let mut keys = Vec::with_capacity(self.sort_items.len());
4560 for item in &self.sort_items {
4561 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4562 }
4563 keyed.push((keys, row));
4564 }
4565 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4566 keyed.sort_by(|a, b| {
4567 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4568 let ord = compare_values(va, vb);
4569 let ord = if descs[i] { ord.reverse() } else { ord };
4570 if ord != Ordering::Equal {
4571 return ord;
4572 }
4573 }
4574 Ordering::Equal
4575 });
4576 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4577 }
4578 let rows = self.sorted.as_ref().unwrap();
4579 if self.cursor < rows.len() {
4580 let row = rows[self.cursor].clone();
4581 self.cursor += 1;
4582 Ok(Some(row))
4583 } else {
4584 Ok(None)
4585 }
4586 }
4587}
4588
4589struct AggregateOp {
4590 input: Box<dyn Operator>,
4591 group_keys: Vec<ReturnItem>,
4592 aggregates: Vec<AggregateSpec>,
4593 results: Option<Vec<Row>>,
4594 cursor: usize,
4595}
4596
4597impl AggregateOp {
4598 fn new(
4599 input: Box<dyn Operator>,
4600 group_keys: Vec<ReturnItem>,
4601 aggregates: Vec<AggregateSpec>,
4602 ) -> Self {
4603 Self {
4604 input,
4605 group_keys,
4606 aggregates,
4607 results: None,
4608 cursor: 0,
4609 }
4610 }
4611
4612 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4613 let mut groups: HashMap<String, GroupState> = HashMap::new();
4614 let mut order: Vec<String> = Vec::new();
4615
4616 let mut saw_any = false;
4619
4620 while let Some(row) = self.input.next(ctx)? {
4621 saw_any = true;
4622 let mut key_values = Vec::with_capacity(self.group_keys.len());
4623 for item in &self.group_keys {
4624 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4625 }
4626 let mut hash_key = String::new();
4627 for v in &key_values {
4628 hash_key.push_str(&value_key(v));
4629 hash_key.push('|');
4630 }
4631 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4632 order.push(hash_key.clone());
4633 GroupState {
4634 key_values: key_values.clone(),
4635 agg_states: self
4636 .aggregates
4637 .iter()
4638 .map(|a| AggState::initial(a.function))
4639 .collect(),
4640 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4641 }
4642 });
4643 for (i, spec) in self.aggregates.iter().enumerate() {
4644 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4645 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4646 if matches!(v, Value::Null) {
4647 continue;
4648 }
4649 let key = value_key(&v);
4650 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4651 if !seen.insert(key) {
4652 continue;
4653 }
4654 }
4655 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4656 if let Some(extra_expr) = &spec.extra_arg {
4660 let need_resolve = matches!(
4661 &entry.agg_states[i],
4662 AggState::PercentileDisc {
4663 percentile: None,
4664 ..
4665 } | AggState::PercentileCont {
4666 percentile: None,
4667 ..
4668 }
4669 );
4670 if need_resolve {
4671 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4672 let p = match pv {
4673 Value::Property(Property::Float64(f)) => f,
4674 Value::Property(Property::Int64(i)) => i as f64,
4675 _ => 0.0,
4676 };
4677 if !(0.0..=1.0).contains(&p) || p.is_nan() {
4681 return Err(Error::Procedure(format!("percentile out of range: {p}")));
4682 }
4683 match &mut entry.agg_states[i] {
4684 AggState::PercentileDisc { percentile, .. }
4685 | AggState::PercentileCont { percentile, .. } => {
4686 *percentile = Some(p);
4687 }
4688 _ => {}
4689 }
4690 }
4691 }
4692 }
4693 }
4694
4695 let mut out = Vec::new();
4696 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4697 let mut row = Row::new();
4699 for spec in &self.aggregates {
4700 row.insert(
4701 spec.alias.clone(),
4702 AggState::initial(spec.function).finalize(),
4703 );
4704 }
4705 out.push(row);
4706 } else {
4707 for key in order {
4708 let state = groups.remove(&key).unwrap();
4709 let mut row = Row::new();
4710 for (i, item) in self.group_keys.iter().enumerate() {
4711 let name = item
4712 .alias
4713 .clone()
4714 .unwrap_or_else(|| default_name(&item.expr, i));
4715 row.insert(name, state.key_values[i].clone());
4716 }
4717 for (i, spec) in self.aggregates.iter().enumerate() {
4718 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4719 }
4720 out.push(row);
4721 }
4722 }
4723 self.results = Some(out);
4724 Ok(())
4725 }
4726}
4727
4728impl Operator for AggregateOp {
4729 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4730 if self.results.is_none() {
4731 self.compute(ctx)?;
4732 }
4733 let rows = self.results.as_ref().unwrap();
4734 if self.cursor < rows.len() {
4735 let row = rows[self.cursor].clone();
4736 self.cursor += 1;
4737 Ok(Some(row))
4738 } else {
4739 Ok(None)
4740 }
4741 }
4742}
4743
4744struct GroupState {
4745 key_values: Vec<Value>,
4746 agg_states: Vec<AggState>,
4747 distinct_seen: Vec<Option<HashSet<String>>>,
4748}
4749
4750enum AggState {
4751 Count(i64),
4752 Sum {
4753 int_part: i64,
4754 float_part: f64,
4755 is_float: bool,
4756 },
4757 Avg {
4758 total: f64,
4759 count: i64,
4760 },
4761 Min(Option<Value>),
4762 Max(Option<Value>),
4763 Collect(Vec<Value>),
4764 StDev {
4765 sum: f64,
4766 sum_sq: f64,
4767 count: i64,
4768 },
4769 StDevP {
4770 sum: f64,
4771 sum_sq: f64,
4772 count: i64,
4773 },
4774 PercentileDisc {
4775 items: Vec<Value>,
4776 percentile: Option<f64>,
4777 },
4778 PercentileCont {
4779 items: Vec<Value>,
4780 percentile: Option<f64>,
4781 },
4782}
4783
4784impl AggState {
4785 fn initial(func: AggregateFn) -> Self {
4786 match func {
4787 AggregateFn::Count => AggState::Count(0),
4788 AggregateFn::Sum => AggState::Sum {
4789 int_part: 0,
4790 float_part: 0.0,
4791 is_float: false,
4792 },
4793 AggregateFn::Avg => AggState::Avg {
4794 total: 0.0,
4795 count: 0,
4796 },
4797 AggregateFn::Min => AggState::Min(None),
4798 AggregateFn::Max => AggState::Max(None),
4799 AggregateFn::Collect => AggState::Collect(Vec::new()),
4800 AggregateFn::StDev => AggState::StDev {
4801 sum: 0.0,
4802 sum_sq: 0.0,
4803 count: 0,
4804 },
4805 AggregateFn::StDevP => AggState::StDevP {
4806 sum: 0.0,
4807 sum_sq: 0.0,
4808 count: 0,
4809 },
4810 AggregateFn::PercentileDisc => AggState::PercentileDisc {
4811 items: Vec::new(),
4812 percentile: None,
4813 },
4814 AggregateFn::PercentileCont => AggState::PercentileCont {
4815 items: Vec::new(),
4816 percentile: None,
4817 },
4818 }
4819 }
4820
4821 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
4822 match self {
4823 AggState::Count(c) => match arg {
4824 AggregateArg::Star => *c += 1,
4825 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
4826 if !matches!(eval_expr(e, ctx)?, Value::Null) {
4827 *c += 1;
4828 }
4829 }
4830 },
4831 AggState::Sum {
4832 int_part,
4833 float_part,
4834 is_float,
4835 } => {
4836 let v = expr_arg_value(arg, ctx)?;
4837 match v {
4838 Value::Null => {}
4839 Value::Property(Property::Int64(i)) => *int_part += i,
4840 Value::Property(Property::Float64(f)) => {
4841 *float_part += f;
4842 *is_float = true;
4843 }
4844 _ => return Err(Error::AggregateTypeError),
4845 }
4846 }
4847 AggState::Avg { total, count } => {
4848 let v = expr_arg_value(arg, ctx)?;
4849 match v {
4850 Value::Null => {}
4851 Value::Property(Property::Int64(i)) => {
4852 *total += i as f64;
4853 *count += 1;
4854 }
4855 Value::Property(Property::Float64(f)) => {
4856 *total += f;
4857 *count += 1;
4858 }
4859 _ => return Err(Error::AggregateTypeError),
4860 }
4861 }
4862 AggState::Min(slot) => {
4863 let v = expr_arg_value(arg, ctx)?;
4870 if matches!(v, Value::Null | Value::Property(Property::Null)) {
4871 } else {
4873 match slot {
4874 None => *slot = Some(v),
4875 Some(cur) => {
4876 if compare_values(&v, cur) == Ordering::Less {
4877 *cur = v;
4878 }
4879 }
4880 }
4881 }
4882 }
4883 AggState::Max(slot) => {
4884 let v = expr_arg_value(arg, ctx)?;
4885 if matches!(v, Value::Null | Value::Property(Property::Null)) {
4886 } else {
4888 match slot {
4889 None => *slot = Some(v),
4890 Some(cur) => {
4891 if compare_values(&v, cur) == Ordering::Greater {
4892 *cur = v;
4893 }
4894 }
4895 }
4896 }
4897 }
4898 AggState::Collect(items) => {
4899 let v = expr_arg_value(arg, ctx)?;
4900 if !matches!(v, Value::Null) {
4901 items.push(v);
4902 }
4903 }
4904 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
4905 let v = expr_arg_value(arg, ctx)?;
4906 if !matches!(v, Value::Null) {
4907 items.push(v);
4908 }
4909 }
4910 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
4911 let v = expr_arg_value(arg, ctx)?;
4912 match v {
4913 Value::Null => {}
4914 Value::Property(Property::Int64(i)) => {
4915 let f = i as f64;
4916 *sum += f;
4917 *sum_sq += f * f;
4918 *count += 1;
4919 }
4920 Value::Property(Property::Float64(f)) => {
4921 *sum += f;
4922 *sum_sq += f * f;
4923 *count += 1;
4924 }
4925 _ => return Err(Error::AggregateTypeError),
4926 }
4927 }
4928 }
4929 Ok(())
4930 }
4931
4932 fn finalize(&self) -> Value {
4933 match self {
4934 AggState::Count(c) => Value::Property(Property::Int64(*c)),
4935 AggState::Sum {
4936 int_part,
4937 float_part,
4938 is_float,
4939 } => {
4940 if *is_float {
4941 Value::Property(Property::Float64(*float_part + *int_part as f64))
4942 } else {
4943 Value::Property(Property::Int64(*int_part))
4944 }
4945 }
4946 AggState::Avg { total, count } => {
4947 if *count == 0 {
4948 Value::Null
4949 } else {
4950 Value::Property(Property::Float64(*total / *count as f64))
4951 }
4952 }
4953 AggState::Min(slot) | AggState::Max(slot) => match slot {
4954 Some(v) => v.clone(),
4955 None => Value::Null,
4956 },
4957 AggState::Collect(items) => Value::List(items.clone()),
4958 AggState::StDevP { sum, sum_sq, count } => {
4959 if *count == 0 {
4960 Value::Property(Property::Float64(0.0))
4961 } else {
4962 let n = *count as f64;
4963 let variance = *sum_sq / n - (*sum / n).powi(2);
4964 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
4965 }
4966 }
4967 AggState::StDev { sum, sum_sq, count } => {
4968 if *count < 2 {
4969 Value::Property(Property::Float64(0.0))
4970 } else {
4971 let n = *count as f64;
4972 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
4973 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
4974 }
4975 }
4976 AggState::PercentileDisc { items, percentile } => {
4977 percentile_disc(items, percentile.unwrap_or(0.0))
4978 }
4979 AggState::PercentileCont { items, percentile } => {
4980 percentile_cont(items, percentile.unwrap_or(0.0))
4981 }
4982 }
4983 }
4984}
4985
4986fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
4987 match arg {
4988 AggregateArg::Star => Err(Error::AggregateTypeError),
4989 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
4990 }
4991}
4992
4993fn value_to_f64(v: &Value) -> f64 {
4997 match v {
4998 Value::Property(Property::Int64(i)) => *i as f64,
4999 Value::Property(Property::Float64(f)) => *f,
5000 _ => f64::NAN,
5001 }
5002}
5003
5004fn percentile_disc(items: &[Value], p: f64) -> Value {
5009 let mut nums: Vec<(f64, Value)> = items
5010 .iter()
5011 .map(|v| (value_to_f64(v), v.clone()))
5012 .filter(|(f, _)| !f.is_nan())
5013 .collect();
5014 if nums.is_empty() {
5015 return Value::Null;
5016 }
5017 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5018 let p = p.clamp(0.0, 1.0);
5019 let n = nums.len();
5020 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5022 nums[idx.min(n - 1)].1.clone()
5023}
5024
5025fn percentile_cont(items: &[Value], p: f64) -> Value {
5029 let mut nums: Vec<f64> = items
5030 .iter()
5031 .map(value_to_f64)
5032 .filter(|f| !f.is_nan())
5033 .collect();
5034 if nums.is_empty() {
5035 return Value::Null;
5036 }
5037 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5038 let p = p.clamp(0.0, 1.0);
5039 let n = nums.len();
5040 if n == 1 {
5041 return Value::Property(Property::Float64(nums[0]));
5042 }
5043 let pos = p * (n as f64 - 1.0);
5044 let lo = pos.floor() as usize;
5045 let hi = pos.ceil() as usize;
5046 let frac = pos - lo as f64;
5047 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5048 Value::Property(Property::Float64(v))
5049}
5050
5051struct SkipOp {
5052 input: Box<dyn Operator>,
5053 count_expr: Expr,
5054 remaining: Option<i64>,
5055}
5056
5057impl SkipOp {
5058 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5059 Self {
5060 input,
5061 count_expr,
5062 remaining: None,
5063 }
5064 }
5065}
5066
5067impl Operator for SkipOp {
5068 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5069 if self.remaining.is_none() {
5070 let empty = Row::new();
5071 let ectx = ctx.eval_ctx(&empty);
5072 let val = eval_expr(&self.count_expr, &ectx)?;
5073 self.remaining = Some(expr_to_count(val)?);
5074 }
5075 let rem = self.remaining.as_mut().unwrap();
5076 while *rem > 0 {
5077 if self.input.next(ctx)?.is_none() {
5078 return Ok(None);
5079 }
5080 *rem -= 1;
5081 }
5082 self.input.next(ctx)
5083 }
5084}
5085
5086struct LimitOp {
5087 input: Box<dyn Operator>,
5088 count_expr: Expr,
5089 remaining: Option<i64>,
5090}
5091
5092impl LimitOp {
5093 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5094 Self {
5095 input,
5096 count_expr,
5097 remaining: None,
5098 }
5099 }
5100}
5101
5102impl Operator for LimitOp {
5103 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5104 if self.remaining.is_none() {
5105 let empty = Row::new();
5106 let ectx = ctx.eval_ctx(&empty);
5107 let val = eval_expr(&self.count_expr, &ectx)?;
5108 self.remaining = Some(expr_to_count(val)?);
5109 }
5110 let rem = self.remaining.as_mut().unwrap();
5111 if *rem <= 0 {
5112 return Ok(None);
5113 }
5114 match self.input.next(ctx)? {
5115 Some(row) => {
5116 *rem -= 1;
5117 Ok(Some(row))
5118 }
5119 None => Ok(None),
5120 }
5121 }
5122}
5123
5124fn expr_to_count(val: Value) -> Result<i64> {
5125 match val {
5126 Value::Null | Value::Property(Property::Null) => Ok(0),
5127 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5128 _ => Err(Error::TypeMismatch),
5133 }
5134}