1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12 ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13 Literal, LogicalPlan, PropertyType as CypherPropertyType, RemoveSpec, ReturnItem,
14 SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17 ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18 PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24#[derive(Default)]
30pub struct Tombstones {
31 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36 pub store: &'a dyn GraphReader,
37 pub writer: &'a dyn GraphWriter,
38 pub params: &'a ParamMap,
43 pub procedures: &'a ProcedureRegistry,
48 pub outer_rows: &'a [&'a Row],
56 pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64 fn put_node(&self, _: &Node) -> Result<()> {
65 Ok(())
66 }
67 fn put_edge(&self, _: &Edge) -> Result<()> {
68 Ok(())
69 }
70 fn delete_edge(&self, _: EdgeId) -> Result<()> {
71 Ok(())
72 }
73 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74 Ok(())
75 }
76}
77
78impl<'a> ExecCtx<'a> {
79 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85 where
86 'a: 'b,
87 {
88 EvalCtx {
89 row,
90 params: self.params,
91 reader: self.store,
92 procedures: self.procedures,
93 outer_rows: self.outer_rows,
94 tombstones: self.tombstones,
95 }
96 }
97
98 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104 if let Some(v) = row.get(name) {
105 return Some(v);
106 }
107 for outer in self.outer_rows {
108 if let Some(v) = outer.get(name) {
109 return Some(v);
110 }
111 }
112 None
113 }
114}
115
116pub trait Operator {
117 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124 let params = ParamMap::new();
125 execute_with_reader(
126 plan,
127 store as &dyn GraphReader,
128 store as &dyn GraphWriter,
129 ¶ms,
130 )
131}
132
133pub fn execute_with_writer(
138 plan: &LogicalPlan,
139 store: &RocksDbStorageEngine,
140 writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142 let params = ParamMap::new();
143 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
144}
145
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151 let text = meshdb_cypher::format_plan(plan);
152 let mut row = Row::new();
153 row.insert("plan".to_string(), Value::Property(Property::String(text)));
154 vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158 let result_rows = execute(plan, store)?;
159 let row_count = result_rows.len() as i64;
160 let plan_text = meshdb_cypher::format_plan(plan);
161 let summary = format!("{plan_text}\nRows: {row_count}");
162 let mut row = Row::new();
163 row.insert(
164 "profile".to_string(),
165 Value::Property(Property::String(summary)),
166 );
167 row.insert(
168 "rows".to_string(),
169 Value::Property(Property::Int64(row_count)),
170 );
171 Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175 plan: &LogicalPlan,
176 reader: &dyn GraphReader,
177 writer: &dyn GraphWriter,
178 params: &ParamMap,
179) -> Result<Vec<Row>> {
180 let empty_procs = ProcedureRegistry::new();
181 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184pub fn execute_with_reader_and_procs(
190 plan: &LogicalPlan,
191 reader: &dyn GraphReader,
192 writer: &dyn GraphWriter,
193 params: &ParamMap,
194 procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196 crate::eval::reset_statement_time();
201 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206 return Ok(rows);
207 }
208 let suppress_output = is_write_only_plan(plan);
209 let mut op = build_op(plan);
210 let tombstones = Tombstones::default();
211 let ctx = ExecCtx {
212 store: reader,
213 writer,
214 params,
215 procedures,
216 outer_rows: &[],
217 tombstones: &tombstones,
218 };
219 let mut rows = Vec::new();
220 while let Some(row) = op.next(&ctx)? {
221 rows.push(row);
222 }
223 if suppress_output {
224 Ok(Vec::new())
225 } else {
226 Ok(rows)
227 }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231 match plan {
235 LogicalPlan::CreatePath { .. }
236 | LogicalPlan::Delete { .. }
237 | LogicalPlan::SetProperty { .. }
238 | LogicalPlan::Remove { .. }
239 | LogicalPlan::Foreach { .. }
240 | LogicalPlan::MergeNode { .. }
241 | LogicalPlan::MergeEdge { .. } => true,
242 _ => false,
243 }
244}
245
246fn try_execute_ddl(
256 plan: &LogicalPlan,
257 reader: &dyn GraphReader,
258 writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260 match plan {
261 LogicalPlan::CreatePropertyIndex { label, property } => {
262 writer.create_property_index(label, property)?;
263 Ok(Some(vec![node_index_ddl_ack_row(
264 "created", label, property,
265 )]))
266 }
267 LogicalPlan::DropPropertyIndex { label, property } => {
268 writer.drop_property_index(label, property)?;
269 Ok(Some(vec![node_index_ddl_ack_row(
270 "dropped", label, property,
271 )]))
272 }
273 LogicalPlan::CreateEdgePropertyIndex {
274 edge_type,
275 property,
276 } => {
277 writer.create_edge_property_index(edge_type, property)?;
278 Ok(Some(vec![edge_index_ddl_ack_row(
279 "created", edge_type, property,
280 )]))
281 }
282 LogicalPlan::DropEdgePropertyIndex {
283 edge_type,
284 property,
285 } => {
286 writer.drop_edge_property_index(edge_type, property)?;
287 Ok(Some(vec![edge_index_ddl_ack_row(
288 "dropped", edge_type, property,
289 )]))
290 }
291 LogicalPlan::ShowPropertyIndexes => {
292 let mut rows: Vec<Row> = Vec::new();
298 for (label, property) in reader.list_property_indexes()? {
299 rows.push(show_index_row("NODE", label, property));
300 }
301 for (edge_type, property) in reader.list_edge_property_indexes()? {
302 rows.push(show_index_row("RELATIONSHIP", edge_type, property));
303 }
304 Ok(Some(rows))
305 }
306 LogicalPlan::CreatePropertyConstraint {
307 name,
308 scope,
309 properties,
310 kind,
311 if_not_exists,
312 } => {
313 let storage_kind = match kind {
314 ConstraintKind::Unique => PropertyConstraintKind::Unique,
315 ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
316 ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
317 ConstraintKind::PropertyType(t) => {
318 PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
319 }
320 };
321 let storage_scope = cypher_to_storage_scope(scope);
322 let spec = writer.create_property_constraint(
323 name.as_deref(),
324 &storage_scope,
325 properties,
326 storage_kind,
327 *if_not_exists,
328 )?;
329 Ok(Some(vec![constraint_ack_row("created", &spec)]))
330 }
331 LogicalPlan::DropPropertyConstraint { name, if_exists } => {
332 writer.drop_property_constraint(name, *if_exists)?;
333 let mut row = Row::default();
334 row.insert(
335 "state".into(),
336 Value::Property(Property::String("dropped".into())),
337 );
338 row.insert(
339 "name".into(),
340 Value::Property(Property::String(name.clone())),
341 );
342 Ok(Some(vec![row]))
343 }
344 LogicalPlan::ShowPropertyConstraints => {
345 let specs = reader.list_property_constraints()?;
346 let rows = specs.into_iter().map(constraint_show_row).collect();
347 Ok(Some(rows))
348 }
349 _ => Ok(None),
350 }
351}
352
353fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
358 let mut row = constraint_show_row(spec.clone());
359 row.insert(
360 "state".into(),
361 Value::Property(Property::String(state.into())),
362 );
363 row
364}
365
366fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
372 let mut row = Row::default();
373 row.insert("name".into(), Value::Property(Property::String(spec.name)));
374 let (scope_tag, target) = match spec.scope {
375 meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
376 meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
377 };
378 row.insert(
379 "scope".into(),
380 Value::Property(Property::String(scope_tag.into())),
381 );
382 row.insert("label".into(), Value::Property(Property::String(target)));
387 let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
388 row.insert("properties".into(), Value::Property(Property::List(props)));
389 row.insert(
390 "type".into(),
391 Value::Property(Property::String(spec.kind.as_string())),
392 );
393 row
394}
395
396fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
400 match scope {
401 CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
402 CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
403 }
404}
405
406fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
411 match t {
412 CypherPropertyType::String => StoragePropertyType::String,
413 CypherPropertyType::Integer => StoragePropertyType::Integer,
414 CypherPropertyType::Float => StoragePropertyType::Float,
415 CypherPropertyType::Boolean => StoragePropertyType::Boolean,
416 }
417}
418
419fn node_index_ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
426 let mut row = Row::default();
427 row.insert(
428 "state".into(),
429 Value::Property(Property::String(state.into())),
430 );
431 row.insert(
432 "scope".into(),
433 Value::Property(Property::String("NODE".into())),
434 );
435 row.insert(
436 "label".into(),
437 Value::Property(Property::String(label.into())),
438 );
439 row.insert(
440 "property".into(),
441 Value::Property(Property::String(property.into())),
442 );
443 row
444}
445
446fn edge_index_ddl_ack_row(state: &str, edge_type: &str, property: &str) -> Row {
452 let mut row = Row::default();
453 row.insert(
454 "state".into(),
455 Value::Property(Property::String(state.into())),
456 );
457 row.insert(
458 "scope".into(),
459 Value::Property(Property::String("RELATIONSHIP".into())),
460 );
461 row.insert(
462 "label".into(),
463 Value::Property(Property::String(edge_type.into())),
464 );
465 row.insert(
466 "edge_type".into(),
467 Value::Property(Property::String(edge_type.into())),
468 );
469 row.insert(
470 "property".into(),
471 Value::Property(Property::String(property.into())),
472 );
473 row
474}
475
476fn show_index_row(scope: &str, target: String, property: String) -> Row {
483 let mut row = Row::default();
484 row.insert(
485 "scope".into(),
486 Value::Property(Property::String(scope.into())),
487 );
488 row.insert(
489 "label".into(),
490 Value::Property(Property::String(target.clone())),
491 );
492 if scope == "RELATIONSHIP" {
493 row.insert(
494 "edge_type".into(),
495 Value::Property(Property::String(target)),
496 );
497 }
498 row.insert(
499 "property".into(),
500 Value::Property(Property::String(property)),
501 );
502 row.insert(
503 "state".into(),
504 Value::Property(Property::String("online".into())),
505 );
506 row
507}
508
509fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
510 build_op_inner(plan, None)
511}
512
513pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
514 macro_rules! child {
515 ($p:expr) => {
516 build_op_inner($p, seed)
517 };
518 }
519 match plan {
520 LogicalPlan::CreatePath {
521 input,
522 nodes,
523 edges,
524 } => Box::new(CreatePathOp::new(
525 input.as_ref().map(|p| child!(p)),
526 nodes.clone(),
527 edges.clone(),
528 )),
529 LogicalPlan::CartesianProduct { left, right } => {
530 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
531 }
532 LogicalPlan::Delete {
533 input,
534 detach,
535 vars,
536 exprs,
537 } => Box::new(DeleteOp::new(
538 child!(input),
539 *detach,
540 vars.clone(),
541 exprs.clone(),
542 )),
543 LogicalPlan::SetProperty { input, assignments } => {
544 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
545 }
546 LogicalPlan::Remove { input, items } => {
547 Box::new(RemoveOp::new(child!(input), items.clone()))
548 }
549 LogicalPlan::LoadCsv {
550 input,
551 path_expr,
552 var,
553 with_headers,
554 } => Box::new(LoadCsvOp::new(
555 input.as_ref().map(|p| child!(p)),
556 path_expr.clone(),
557 var.clone(),
558 *with_headers,
559 )),
560 LogicalPlan::Foreach {
561 input,
562 var,
563 list_expr,
564 set_assignments,
565 remove_items,
566 } => Box::new(ForeachOp::new(
567 child!(input),
568 var.clone(),
569 list_expr.clone(),
570 set_assignments.clone(),
571 remove_items.clone(),
572 )),
573 LogicalPlan::CallSubquery { input, body } => {
574 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
575 }
576 LogicalPlan::OptionalApply {
577 input,
578 body,
579 null_vars,
580 } => Box::new(OptionalApplyOp::new(
581 child!(input),
582 (**body).clone(),
583 null_vars.clone(),
584 )),
585 LogicalPlan::ProcedureCall {
586 input,
587 qualified_name,
588 args,
589 yield_spec,
590 standalone,
591 } => Box::new(ProcedureCallOp::new(
592 input.as_ref().map(|p| child!(p)),
593 qualified_name.clone(),
594 args.clone(),
595 yield_spec.clone(),
596 *standalone,
597 )),
598 LogicalPlan::SeedRow => match seed {
599 Some(r) => Box::new(SeededRowOp {
600 row: Some(r.clone()),
601 }),
602 None => Box::new(SeedRowOp { done: false }),
603 },
604 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
605 LogicalPlan::NodeScanByLabels { var, labels } => {
606 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
607 }
608 LogicalPlan::EdgeExpand {
609 input,
610 src_var,
611 edge_var,
612 dst_var,
613 dst_labels,
614 edge_properties,
615 edge_types,
616 direction,
617 edge_constraint_var,
618 } => Box::new(EdgeExpandOp::new(
619 child!(input),
620 src_var.clone(),
621 edge_var.clone(),
622 dst_var.clone(),
623 dst_labels.clone(),
624 edge_properties.clone(),
625 edge_types.clone(),
626 *direction,
627 edge_constraint_var.clone(),
628 )),
629 LogicalPlan::OptionalEdgeExpand {
630 input,
631 src_var,
632 edge_var,
633 dst_var,
634 dst_labels,
635 dst_properties,
636 edge_types,
637 direction,
638 dst_constraint_var,
639 edge_constraint_var,
640 } => Box::new(OptionalEdgeExpandOp::new(
641 child!(input),
642 src_var.clone(),
643 edge_var.clone(),
644 dst_var.clone(),
645 dst_labels.clone(),
646 dst_properties.clone(),
647 edge_types.clone(),
648 *direction,
649 dst_constraint_var.clone(),
650 edge_constraint_var.clone(),
651 )),
652 LogicalPlan::VarLengthExpand {
653 input,
654 src_var,
655 edge_var,
656 dst_var,
657 dst_labels,
658 edge_types,
659 edge_properties,
660 direction,
661 min_hops,
662 max_hops,
663 path_var,
664 optional,
665 dst_constraint_var,
666 bound_edge_list_var,
667 excluded_edge_vars,
668 } => Box::new(VarLengthExpandOp::new(
669 child!(input),
670 src_var.clone(),
671 edge_var.clone(),
672 dst_var.clone(),
673 dst_labels.clone(),
674 edge_types.clone(),
675 edge_properties.clone(),
676 *direction,
677 *min_hops,
678 *max_hops,
679 path_var.clone(),
680 *optional,
681 dst_constraint_var.clone(),
682 bound_edge_list_var.clone(),
683 excluded_edge_vars.clone(),
684 )),
685 LogicalPlan::Filter { input, predicate } => {
686 Box::new(FilterOp::new(child!(input), predicate.clone()))
687 }
688 LogicalPlan::Project { input, items } => {
689 Box::new(ProjectOp::new(child!(input), items.clone()))
690 }
691 LogicalPlan::Aggregate {
692 input,
693 group_keys,
694 aggregates,
695 } => Box::new(AggregateOp::new(
696 child!(input),
697 group_keys.clone(),
698 aggregates.clone(),
699 )),
700 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
701 LogicalPlan::CoalesceNullRow { input, null_vars } => {
702 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
703 }
704 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
705 LogicalPlan::OrderBy { input, sort_items } => {
706 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
707 }
708 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
709 LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
710 LogicalPlan::MergeNode {
711 input,
712 var,
713 labels,
714 properties,
715 on_create,
716 on_match,
717 } => Box::new(MergeNodeOp::new(
718 input.as_ref().map(|p| child!(p)),
719 var.clone(),
720 labels.clone(),
721 properties.clone(),
722 on_create.clone(),
723 on_match.clone(),
724 )),
725 LogicalPlan::MergeEdge {
726 input,
727 edge_var,
728 src_var,
729 dst_var,
730 edge_type,
731 undirected,
732 properties,
733 on_create,
734 on_match,
735 } => Box::new(MergeEdgeOp::new(
736 child!(input),
737 edge_var.clone(),
738 src_var.clone(),
739 dst_var.clone(),
740 edge_type.clone(),
741 *undirected,
742 properties.clone(),
743 on_create.clone(),
744 on_match.clone(),
745 )),
746 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
747 LogicalPlan::UnwindChain { input, var, expr } => {
748 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
749 }
750 LogicalPlan::IndexSeek {
751 var,
752 label,
753 property,
754 value,
755 } => Box::new(IndexSeekOp::new(
756 var.clone(),
757 label.clone(),
758 property.clone(),
759 value.clone(),
760 )),
761 LogicalPlan::Union { branches, all } => {
766 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
767 Box::new(UnionOp::new(branch_ops, *all))
768 }
769 LogicalPlan::BindPath {
770 input,
771 path_var,
772 node_vars,
773 edge_vars,
774 } => Box::new(BindPathOp::new(
775 child!(input),
776 path_var.clone(),
777 node_vars.clone(),
778 edge_vars.clone(),
779 )),
780 LogicalPlan::ShortestPath {
781 input,
782 src_var,
783 dst_var,
784 path_var,
785 edge_types,
786 direction,
787 max_hops,
788 kind,
789 } => Box::new(ShortestPathOp::new(
790 child!(input),
791 src_var.clone(),
792 dst_var.clone(),
793 path_var.clone(),
794 edge_types.clone(),
795 *direction,
796 *max_hops,
797 *kind,
798 )),
799 LogicalPlan::CreatePropertyIndex { .. }
800 | LogicalPlan::DropPropertyIndex { .. }
801 | LogicalPlan::CreateEdgePropertyIndex { .. }
802 | LogicalPlan::DropEdgePropertyIndex { .. }
803 | LogicalPlan::ShowPropertyIndexes
804 | LogicalPlan::CreatePropertyConstraint { .. }
805 | LogicalPlan::DropPropertyConstraint { .. }
806 | LogicalPlan::ShowPropertyConstraints => {
807 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
808 }
809 }
810}
811
812struct UnwindOp {
813 var: String,
814 expr: Expr,
815 items: Option<Vec<Value>>,
816 cursor: usize,
817}
818
819impl UnwindOp {
820 fn new(var: String, expr: Expr) -> Self {
821 Self {
822 var,
823 expr,
824 items: None,
825 cursor: 0,
826 }
827 }
828}
829
830impl Operator for UnwindOp {
831 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
832 if self.items.is_none() {
833 let empty = Row::new();
834 let ectx = EvalCtx {
835 row: &empty,
836 params: ctx.params,
837 reader: ctx.store,
838 procedures: ctx.procedures,
839 outer_rows: ctx.outer_rows,
840 tombstones: ctx.tombstones,
841 };
842 let val = eval_expr(&self.expr, &ectx)?;
843 self.items = Some(coerce_unwind_list(val)?);
844 }
845 let items = self.items.as_ref().unwrap();
846 if self.cursor < items.len() {
847 let v = items[self.cursor].clone();
848 self.cursor += 1;
849 let mut row = Row::new();
850 row.insert(self.var.clone(), v);
851 Ok(Some(row))
852 } else {
853 Ok(None)
854 }
855 }
856}
857
858struct UnwindChainOp {
864 input: Box<dyn Operator>,
865 var: String,
866 expr: Expr,
867 current_row: Option<Row>,
868 items: Vec<Value>,
869 cursor: usize,
870}
871
872impl UnwindChainOp {
873 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
874 Self {
875 input,
876 var,
877 expr,
878 current_row: None,
879 items: Vec::new(),
880 cursor: 0,
881 }
882 }
883}
884
885impl Operator for UnwindChainOp {
886 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
887 loop {
888 if let Some(base) = &self.current_row {
889 if self.cursor < self.items.len() {
890 let v = self.items[self.cursor].clone();
891 self.cursor += 1;
892 let mut row = base.clone();
893 row.insert(self.var.clone(), v);
894 return Ok(Some(row));
895 }
896 self.current_row = None;
897 self.items.clear();
898 self.cursor = 0;
899 }
900 let base = match self.input.next(ctx)? {
901 Some(r) => r,
902 None => return Ok(None),
903 };
904 let ectx = EvalCtx {
905 row: &base,
906 params: ctx.params,
907 reader: ctx.store,
908 procedures: ctx.procedures,
909 outer_rows: ctx.outer_rows,
910 tombstones: ctx.tombstones,
911 };
912 let val = eval_expr(&self.expr, &ectx)?;
913 self.items = coerce_unwind_list(val)?;
914 self.current_row = Some(base);
915 }
916 }
917}
918
919fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
924 match val {
925 Value::List(items) => Ok(items),
926 Value::Property(Property::List(props)) => {
927 Ok(props.into_iter().map(Value::Property).collect())
928 }
929 Value::Null => Ok(Vec::new()),
930 _ => Err(Error::TypeMismatch),
931 }
932}
933
934struct CreatePathOp {
935 input: Option<Box<dyn Operator>>,
936 nodes: Vec<CreateNodeSpec>,
937 edges: Vec<CreateEdgeSpec>,
938 done: bool,
939 buffered: Option<Vec<Row>>,
940 cursor: usize,
941}
942
943impl CreatePathOp {
944 fn new(
945 input: Option<Box<dyn Operator>>,
946 nodes: Vec<CreateNodeSpec>,
947 edges: Vec<CreateEdgeSpec>,
948 ) -> Self {
949 Self {
950 input,
951 nodes,
952 edges,
953 done: false,
954 buffered: None,
955 cursor: 0,
956 }
957 }
958
959 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
960 let mut out = row.clone();
961 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
962 for spec in &self.nodes {
963 match spec {
964 CreateNodeSpec::New {
965 var,
966 labels,
967 properties,
968 } => {
969 let mut node = Node::new();
970 for label in labels {
971 node.labels.push(label.clone());
972 }
973 for (k, expr) in properties {
981 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
982 let prop = value_to_property(value)?;
983 if matches!(prop, Property::Null) {
984 continue;
985 }
986 node.properties.insert(k.clone(), prop);
987 }
988 ctx.writer.put_node(&node)?;
989 node_ids.push(node.id);
990 if let Some(v) = var {
991 out.insert(v.clone(), Value::Node(node));
992 }
993 }
994 CreateNodeSpec::Reference(name) => {
995 let id = match out.get(name) {
996 Some(Value::Node(n)) => n.id,
997 _ => return Err(Error::UnboundVariable(name.clone())),
998 };
999 node_ids.push(id);
1000 }
1001 }
1002 }
1003 for spec in &self.edges {
1004 let src = node_ids[spec.src_idx];
1005 let dst = node_ids[spec.dst_idx];
1006 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1007 for (k, expr) in &spec.properties {
1008 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1009 let prop = value_to_property(value)?;
1010 if matches!(prop, Property::Null) {
1011 continue;
1012 }
1013 edge.properties.insert(k.clone(), prop);
1014 }
1015 ctx.writer.put_edge(&edge)?;
1016 if let Some(v) = &spec.var {
1017 out.insert(v.clone(), Value::Edge(edge));
1018 }
1019 }
1020 Ok(out)
1021 }
1022}
1023
1024impl Operator for CreatePathOp {
1025 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1026 if self.input.is_some() {
1027 if let Some(buffered) = self.buffered.as_mut() {
1031 if self.cursor < buffered.len() {
1032 let row = buffered[self.cursor].clone();
1033 self.cursor += 1;
1034 return Ok(Some(self.apply(ctx, &row)?));
1035 }
1036 return Ok(None);
1037 }
1038 let mut rows: Vec<Row> = Vec::new();
1039 {
1040 let input = self.input.as_mut().unwrap();
1041 while let Some(row) = input.next(ctx)? {
1042 rows.push(row);
1043 }
1044 }
1045 self.buffered = Some(rows);
1046 self.cursor = 0;
1047 self.next(ctx)
1049 } else {
1050 if self.done {
1051 return Ok(None);
1052 }
1053 self.done = true;
1054 let empty = Row::new();
1055 Ok(Some(self.apply(ctx, &empty)?))
1056 }
1057 }
1058}
1059
1060struct CartesianProductOp {
1061 left: Box<dyn Operator>,
1062 right_plan: LogicalPlan,
1063 left_row: Option<Row>,
1064 right_op: Option<Box<dyn Operator>>,
1065}
1066
1067impl CartesianProductOp {
1068 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1069 Self {
1070 left,
1071 right_plan,
1072 left_row: None,
1073 right_op: None,
1074 }
1075 }
1076}
1077
1078impl Operator for CartesianProductOp {
1079 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1080 loop {
1081 if self.left_row.is_none() {
1082 match self.left.next(ctx)? {
1083 None => return Ok(None),
1084 Some(row) => {
1085 self.left_row = Some(row);
1086 self.right_op = Some(build_op(&self.right_plan));
1087 }
1088 }
1089 }
1090 let right_op = self.right_op.as_mut().expect("right_op set");
1091 let left_ref = self.left_row.as_ref().unwrap();
1096 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1097 stacked.push(left_ref);
1098 stacked.extend_from_slice(ctx.outer_rows);
1099 let inner_ctx = ExecCtx {
1100 store: ctx.store,
1101 writer: ctx.writer,
1102 params: ctx.params,
1103 procedures: ctx.procedures,
1104 outer_rows: &stacked,
1105 tombstones: ctx.tombstones,
1106 };
1107 match right_op.next(&inner_ctx)? {
1108 Some(right_row) => {
1109 let mut combined = left_ref.clone();
1110 for (k, v) in right_row {
1111 combined.insert(k, v);
1112 }
1113 return Ok(Some(combined));
1114 }
1115 None => {
1116 self.left_row = None;
1117 self.right_op = None;
1118 }
1119 }
1120 }
1121 }
1122}
1123
1124struct DeleteOp {
1125 input: Box<dyn Operator>,
1126 detach: bool,
1127 #[allow(dead_code)]
1128 vars: Vec<String>,
1129 exprs: Vec<Expr>,
1130 buffered: Option<Vec<Row>>,
1138 cursor: usize,
1139}
1140
1141impl DeleteOp {
1142 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1143 Self {
1144 input,
1145 detach,
1146 vars,
1147 exprs,
1148 buffered: None,
1149 cursor: 0,
1150 }
1151 }
1152
1153 fn apply_deletes(
1163 &self,
1164 ctx: &ExecCtx,
1165 row: &Row,
1166 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1167 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1168 ) -> Result<()> {
1169 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1170 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1171 for expr in &self.exprs {
1172 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1173 match v {
1174 Value::Node(n) => node_ids.push(n.id),
1175 Value::Edge(e) => edge_ids.push(e.id),
1176 Value::Path { nodes, edges } => {
1177 for e in edges {
1178 edge_ids.push(e.id);
1179 }
1180 for n in nodes {
1181 node_ids.push(n.id);
1182 }
1183 }
1184 Value::Null | Value::Property(Property::Null) => continue,
1185 _ => return Err(Error::TypeMismatch),
1186 }
1187 }
1188 for eid in &edge_ids {
1189 if seen_edges.insert(*eid) {
1190 ctx.writer.delete_edge(*eid)?;
1191 ctx.tombstones.edges.borrow_mut().insert(*eid);
1192 }
1193 }
1194 for nid in &node_ids {
1195 if !seen_nodes.insert(*nid) {
1196 continue;
1197 }
1198 if self.detach {
1199 for (eid, _) in ctx.store.outgoing(*nid)? {
1204 ctx.tombstones.edges.borrow_mut().insert(eid);
1205 }
1206 for (eid, _) in ctx.store.incoming(*nid)? {
1207 ctx.tombstones.edges.borrow_mut().insert(eid);
1208 }
1209 ctx.writer.detach_delete_node(*nid)?;
1210 } else {
1211 let out = ctx.store.outgoing(*nid)?;
1212 let inc = ctx.store.incoming(*nid)?;
1213 let still_attached = out
1214 .iter()
1215 .chain(inc.iter())
1216 .any(|(eid, _)| !seen_edges.contains(eid));
1217 if still_attached {
1218 return Err(Error::CannotDeleteAttachedNode);
1219 }
1220 ctx.writer.detach_delete_node(*nid)?;
1221 }
1222 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1223 }
1224 Ok(())
1225 }
1226}
1227
1228impl Operator for DeleteOp {
1229 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1230 if self.buffered.is_none() {
1238 let mut rows: Vec<Row> = Vec::new();
1239 while let Some(row) = self.input.next(ctx)? {
1240 rows.push(row);
1241 }
1242 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1243 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1244 for row in &rows {
1245 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1246 }
1247 self.buffered = Some(rows);
1248 self.cursor = 0;
1249 }
1250 let rows = self.buffered.as_ref().unwrap();
1251 if self.cursor < rows.len() {
1252 let row = rows[self.cursor].clone();
1253 self.cursor += 1;
1254 return Ok(Some(row));
1255 }
1256 Ok(None)
1257 }
1258}
1259
1260struct SetPropertyOp {
1261 input: Box<dyn Operator>,
1262 assignments: Vec<SetAssignment>,
1263}
1264
1265impl SetPropertyOp {
1266 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1267 Self { input, assignments }
1268 }
1269}
1270
1271impl Operator for SetPropertyOp {
1272 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1273 match self.input.next(ctx)? {
1274 None => Ok(None),
1275 Some(mut row) => {
1276 enum Action {
1278 SetKey {
1279 var: String,
1280 key: String,
1281 prop: Property,
1282 },
1283 AddLabels {
1284 var: String,
1285 labels: Vec<String>,
1286 },
1287 Replace {
1288 var: String,
1289 props: Vec<(String, Property)>,
1290 },
1291 Merge {
1292 var: String,
1293 props: Vec<(String, Property)>,
1294 },
1295 }
1296 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1297 for a in &self.assignments {
1298 match a {
1299 SetAssignment::Property { var, key, value } => {
1300 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1301 let prop = value_to_property(evaluated)?;
1302 actions.push(Action::SetKey {
1303 var: var.clone(),
1304 key: key.clone(),
1305 prop,
1306 });
1307 }
1308 SetAssignment::Labels { var, labels } => {
1309 actions.push(Action::AddLabels {
1310 var: var.clone(),
1311 labels: labels.clone(),
1312 });
1313 }
1314 SetAssignment::Replace { var, properties } => {
1315 let props = properties
1320 .iter()
1321 .map(|(k, expr)| {
1322 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1323 Ok((k.clone(), value_to_property(v)?))
1324 })
1325 .collect::<Result<Vec<(String, Property)>>>()?;
1326 actions.push(Action::Replace {
1327 var: var.clone(),
1328 props,
1329 });
1330 }
1331 SetAssignment::Merge { var, properties } => {
1332 let props = properties
1333 .iter()
1334 .map(|(k, expr)| {
1335 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1336 Ok((k.clone(), value_to_property(v)?))
1337 })
1338 .collect::<Result<Vec<(String, Property)>>>()?;
1339 actions.push(Action::Merge {
1340 var: var.clone(),
1341 props,
1342 });
1343 }
1344 SetAssignment::ReplaceFromExpr {
1345 var,
1346 source,
1347 replace,
1348 } => {
1349 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1350 let props = extract_property_map(&v)?;
1351 if *replace {
1352 actions.push(Action::Replace {
1353 var: var.clone(),
1354 props,
1355 });
1356 } else {
1357 actions.push(Action::Merge {
1358 var: var.clone(),
1359 props,
1360 });
1361 }
1362 }
1363 }
1364 }
1365
1366 let mut updated_nodes: HashSet<String> = HashSet::new();
1368 let mut updated_edges: HashSet<String> = HashSet::new();
1369 for action in actions {
1370 match action {
1371 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1372 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1376 continue
1377 }
1378 Some(Value::Node(n)) => {
1381 if matches!(prop, Property::Null) {
1382 n.properties.remove(&key);
1383 } else {
1384 n.properties.insert(key, prop);
1385 }
1386 updated_nodes.insert(var);
1387 }
1388 Some(Value::Edge(e)) => {
1389 if matches!(prop, Property::Null) {
1390 e.properties.remove(&key);
1391 } else {
1392 e.properties.insert(key, prop);
1393 }
1394 updated_edges.insert(var);
1395 }
1396 _ => return Err(Error::UnboundVariable(var)),
1397 },
1398 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1399 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1400 continue
1401 }
1402 Some(Value::Node(n)) => {
1403 for label in labels {
1404 if !n.labels.contains(&label) {
1405 n.labels.push(label);
1406 }
1407 }
1408 updated_nodes.insert(var);
1409 }
1410 _ => return Err(Error::UnboundVariable(var)),
1411 },
1412 Action::Replace { var, props } => match row.get_mut(&var) {
1413 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1414 continue
1415 }
1416 Some(Value::Node(n)) => {
1417 n.properties.clear();
1418 for (k, v) in props {
1419 if !matches!(v, Property::Null) {
1420 n.properties.insert(k, v);
1421 }
1422 }
1423 updated_nodes.insert(var);
1424 }
1425 Some(Value::Edge(e)) => {
1426 e.properties.clear();
1427 for (k, v) in props {
1428 if !matches!(v, Property::Null) {
1429 e.properties.insert(k, v);
1430 }
1431 }
1432 updated_edges.insert(var);
1433 }
1434 _ => return Err(Error::UnboundVariable(var)),
1435 },
1436 Action::Merge { var, props } => match row.get_mut(&var) {
1437 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1438 continue
1439 }
1440 Some(Value::Node(n)) => {
1441 for (k, v) in props {
1442 if matches!(v, Property::Null) {
1443 n.properties.remove(&k);
1444 } else {
1445 n.properties.insert(k, v);
1446 }
1447 }
1448 updated_nodes.insert(var);
1449 }
1450 Some(Value::Edge(e)) => {
1451 for (k, v) in props {
1452 if matches!(v, Property::Null) {
1453 e.properties.remove(&k);
1454 } else {
1455 e.properties.insert(k, v);
1456 }
1457 }
1458 updated_edges.insert(var);
1459 }
1460 _ => return Err(Error::UnboundVariable(var)),
1461 },
1462 }
1463 }
1464
1465 for var in &updated_nodes {
1467 if let Some(Value::Node(n)) = row.get(var) {
1468 ctx.writer.put_node(n)?;
1469 }
1470 }
1471 for var in &updated_edges {
1472 if let Some(Value::Edge(e)) = row.get(var) {
1473 ctx.writer.put_edge(e)?;
1474 }
1475 }
1476
1477 Ok(Some(row))
1478 }
1479 }
1480 }
1481}
1482
1483struct RemoveOp {
1484 input: Box<dyn Operator>,
1485 items: Vec<RemoveSpec>,
1486}
1487
1488impl RemoveOp {
1489 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1490 Self { input, items }
1491 }
1492}
1493
1494impl Operator for RemoveOp {
1495 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1496 match self.input.next(ctx)? {
1497 None => Ok(None),
1498 Some(mut row) => {
1499 let mut updated_nodes: HashSet<String> = HashSet::new();
1500 let mut updated_edges: HashSet<String> = HashSet::new();
1501 for item in &self.items {
1502 match item {
1503 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1504 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1507 continue
1508 }
1509 Some(Value::Node(n)) => {
1510 n.properties.remove(key);
1511 updated_nodes.insert(var.clone());
1512 }
1513 Some(Value::Edge(e)) => {
1514 e.properties.remove(key);
1515 updated_edges.insert(var.clone());
1516 }
1517 _ => return Err(Error::UnboundVariable(var.clone())),
1518 },
1519 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1520 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1521 continue
1522 }
1523 Some(Value::Node(n)) => {
1524 n.labels.retain(|l| !labels.contains(l));
1525 updated_nodes.insert(var.clone());
1526 }
1527 _ => return Err(Error::UnboundVariable(var.clone())),
1528 },
1529 }
1530 }
1531 for var in &updated_nodes {
1532 if let Some(Value::Node(n)) = row.get(var) {
1533 ctx.writer.put_node(n)?;
1534 }
1535 }
1536 for var in &updated_edges {
1537 if let Some(Value::Edge(e)) = row.get(var) {
1538 ctx.writer.put_edge(e)?;
1539 }
1540 }
1541 Ok(Some(row))
1542 }
1543 }
1544 }
1545}
1546
1547struct LoadCsvOp {
1548 input: Option<Box<dyn Operator>>,
1549 path_expr: Expr,
1550 var: String,
1551 with_headers: bool,
1552 rows: Option<Vec<Value>>,
1553 cursor: usize,
1554}
1555
1556impl LoadCsvOp {
1557 fn new(
1558 input: Option<Box<dyn Operator>>,
1559 path_expr: Expr,
1560 var: String,
1561 with_headers: bool,
1562 ) -> Self {
1563 Self {
1564 input,
1565 path_expr,
1566 var,
1567 with_headers,
1568 rows: None,
1569 cursor: 0,
1570 }
1571 }
1572
1573 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1574 let ectx = ctx.eval_ctx(base_row);
1575 let path_val = eval_expr(&self.path_expr, &ectx)?;
1576 let path = match path_val {
1577 Value::Property(Property::String(s)) => s,
1578 _ => return Err(Error::TypeMismatch),
1579 };
1580 let content = std::fs::read_to_string(&path).map_err(|e| {
1581 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1582 })?;
1583 let mut lines = content.lines();
1584 let headers: Option<Vec<String>> = if self.with_headers {
1585 lines
1586 .next()
1587 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1588 } else {
1589 None
1590 };
1591 let mut csv_rows = Vec::new();
1592 for line in lines {
1593 if line.trim().is_empty() {
1594 continue;
1595 }
1596 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1597 if let Some(hdrs) = &headers {
1598 let mut map = std::collections::HashMap::new();
1599 for (i, h) in hdrs.iter().enumerate() {
1600 let val = fields.get(i).cloned().unwrap_or_default();
1601 map.insert(h.clone(), Property::String(val));
1602 }
1603 csv_rows.push(Value::Property(Property::Map(map)));
1604 } else {
1605 let list: Vec<Value> = fields
1606 .into_iter()
1607 .map(|f| Value::Property(Property::String(f)))
1608 .collect();
1609 csv_rows.push(Value::List(list));
1610 }
1611 }
1612 self.rows = Some(csv_rows);
1613 self.cursor = 0;
1614 Ok(())
1615 }
1616}
1617
1618impl Operator for LoadCsvOp {
1619 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1620 if self.rows.is_none() {
1621 let base = if let Some(input) = &mut self.input {
1622 match input.next(ctx)? {
1623 Some(r) => r,
1624 None => return Ok(None),
1625 }
1626 } else {
1627 Row::new()
1628 };
1629 self.load(ctx, &base)?;
1630 }
1631 let rows = self.rows.as_ref().unwrap();
1632 if self.cursor < rows.len() {
1633 let val = rows[self.cursor].clone();
1634 self.cursor += 1;
1635 let mut row = Row::new();
1636 row.insert(self.var.clone(), val);
1637 Ok(Some(row))
1638 } else {
1639 Ok(None)
1640 }
1641 }
1642}
1643
1644struct ForeachOp {
1645 input: Box<dyn Operator>,
1646 var: String,
1647 list_expr: Expr,
1648 set_assignments: Vec<SetAssignment>,
1649 remove_items: Vec<RemoveSpec>,
1650}
1651
1652impl ForeachOp {
1653 fn new(
1654 input: Box<dyn Operator>,
1655 var: String,
1656 list_expr: Expr,
1657 set_assignments: Vec<SetAssignment>,
1658 remove_items: Vec<RemoveSpec>,
1659 ) -> Self {
1660 Self {
1661 input,
1662 var,
1663 list_expr,
1664 set_assignments,
1665 remove_items,
1666 }
1667 }
1668}
1669
1670impl Operator for ForeachOp {
1671 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1672 let Some(row) = self.input.next(ctx)? else {
1673 return Ok(None);
1674 };
1675 let ectx = ctx.eval_ctx(&row);
1676 let list_val = eval_expr(&self.list_expr, &ectx)?;
1677 let items = match list_val {
1678 Value::List(items) => items,
1679 Value::Property(Property::List(props)) => {
1680 props.into_iter().map(Value::Property).collect()
1681 }
1682 Value::Null | Value::Property(Property::Null) => Vec::new(),
1683 _ => return Err(Error::TypeMismatch),
1684 };
1685 for item in items {
1686 let mut scratch = row.clone();
1687 scratch.insert(self.var.clone(), item);
1688 for a in &self.set_assignments {
1689 match a {
1690 SetAssignment::Property { var, key, value } => {
1691 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1692 let prop = value_to_property(evaluated)?;
1693 match scratch.get_mut(var) {
1694 Some(Value::Node(n)) => {
1695 n.properties.insert(key.clone(), prop);
1696 }
1697 Some(Value::Edge(e)) => {
1698 e.properties.insert(key.clone(), prop);
1699 }
1700 _ => return Err(Error::UnboundVariable(var.clone())),
1701 }
1702 }
1703 SetAssignment::Labels { var, labels } => {
1704 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1705 for l in labels {
1706 if !n.labels.contains(l) {
1707 n.labels.push(l.clone());
1708 }
1709 }
1710 }
1711 }
1712 _ => {}
1713 }
1714 }
1715 for ri in &self.remove_items {
1716 match ri {
1717 RemoveSpec::Property { var, key } => {
1718 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1719 n.properties.remove(key);
1720 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1721 e.properties.remove(key);
1722 }
1723 }
1724 RemoveSpec::Labels { var, labels } => {
1725 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1726 n.labels.retain(|l| !labels.contains(l));
1727 }
1728 }
1729 }
1730 }
1731 for (_, val) in scratch.iter() {
1733 match val {
1734 Value::Node(n) => ctx.writer.put_node(n)?,
1735 Value::Edge(e) => ctx.writer.put_edge(e)?,
1736 _ => {}
1737 }
1738 }
1739 }
1740 Ok(Some(row))
1741 }
1742}
1743
1744struct SeedRowOp {
1745 done: bool,
1746}
1747
1748impl Operator for SeedRowOp {
1749 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1750 if self.done {
1751 return Ok(None);
1752 }
1753 self.done = true;
1754 Ok(Some(Row::new()))
1755 }
1756}
1757
1758struct SeededRowOp {
1759 row: Option<Row>,
1760}
1761
1762impl Operator for SeededRowOp {
1763 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1764 Ok(self.row.take())
1765 }
1766}
1767
1768struct CallSubqueryOp {
1769 input: Box<dyn Operator>,
1770 body_plan: LogicalPlan,
1771 pending: Vec<Row>,
1772 pending_idx: usize,
1773}
1774
1775impl CallSubqueryOp {
1776 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1777 Self {
1778 input,
1779 body_plan,
1780 pending: Vec::new(),
1781 pending_idx: 0,
1782 }
1783 }
1784}
1785
1786impl Operator for CallSubqueryOp {
1787 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1788 loop {
1789 if self.pending_idx < self.pending.len() {
1790 let row = self.pending[self.pending_idx].clone();
1791 self.pending_idx += 1;
1792 return Ok(Some(row));
1793 }
1794 let outer_row = match self.input.next(ctx)? {
1795 Some(r) => r,
1796 None => return Ok(None),
1797 };
1798 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1799 let mut results = Vec::new();
1800 while let Some(body_row) = body_op.next(ctx)? {
1801 let mut merged = outer_row.clone();
1802 for (k, v) in body_row {
1803 merged.insert(k, v);
1804 }
1805 results.push(merged);
1806 }
1807 if results.is_empty() {
1808 continue;
1809 }
1810 self.pending = results;
1811 self.pending_idx = 0;
1812 }
1813 }
1814}
1815
1816struct OptionalApplyOp {
1823 input: Box<dyn Operator>,
1824 body_plan: LogicalPlan,
1825 null_vars: Vec<String>,
1826 pending: Vec<Row>,
1827 pending_idx: usize,
1828}
1829
1830impl OptionalApplyOp {
1831 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1832 Self {
1833 input,
1834 body_plan,
1835 null_vars,
1836 pending: Vec::new(),
1837 pending_idx: 0,
1838 }
1839 }
1840}
1841
1842impl Operator for OptionalApplyOp {
1843 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1844 loop {
1845 if self.pending_idx < self.pending.len() {
1846 let row = self.pending[self.pending_idx].clone();
1847 self.pending_idx += 1;
1848 return Ok(Some(row));
1849 }
1850 let outer_row = match self.input.next(ctx)? {
1851 Some(r) => r,
1852 None => return Ok(None),
1853 };
1854 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1855 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1859 stacked.push(&outer_row);
1860 stacked.extend_from_slice(ctx.outer_rows);
1861 let inner_ctx = ExecCtx {
1862 store: ctx.store,
1863 writer: ctx.writer,
1864 params: ctx.params,
1865 procedures: ctx.procedures,
1866 outer_rows: &stacked,
1867 tombstones: ctx.tombstones,
1868 };
1869 let mut results = Vec::new();
1870 while let Some(body_row) = body_op.next(&inner_ctx)? {
1871 let mut merged = outer_row.clone();
1872 for (k, v) in body_row {
1873 merged.insert(k, v);
1874 }
1875 results.push(merged);
1876 }
1877 if results.is_empty() {
1878 let mut fallback = outer_row;
1879 for v in &self.null_vars {
1880 fallback.insert(v.clone(), Value::Null);
1881 }
1882 return Ok(Some(fallback));
1883 }
1884 self.pending = results;
1885 self.pending_idx = 0;
1886 }
1887 }
1888}
1889
1890struct ProcedureCallOp {
1909 input: Option<Box<dyn Operator>>,
1910 qualified_name: Vec<String>,
1911 args: Option<Vec<Expr>>,
1912 yield_spec: Option<YieldSpec>,
1913 standalone: bool,
1914 buffered: Vec<Row>,
1915 buffered_idx: usize,
1916 done: bool,
1919}
1920
1921impl ProcedureCallOp {
1922 fn new(
1923 input: Option<Box<dyn Operator>>,
1924 qualified_name: Vec<String>,
1925 args: Option<Vec<Expr>>,
1926 yield_spec: Option<YieldSpec>,
1927 standalone: bool,
1928 ) -> Self {
1929 Self {
1930 input,
1931 qualified_name,
1932 args,
1933 yield_spec,
1934 standalone,
1935 buffered: Vec::new(),
1936 buffered_idx: 0,
1937 done: false,
1938 }
1939 }
1940
1941 fn resolve_projection(
1947 &self,
1948 proc: &crate::procedures::Procedure,
1949 ) -> Result<Vec<(String, String)>> {
1950 match &self.yield_spec {
1951 None => {
1952 if !self.standalone {
1953 if proc.outputs.is_empty() {
1962 return Ok(Vec::new());
1963 }
1964 return Err(Error::Procedure(format!(
1965 "procedure '{}' has outputs but no YIELD clause",
1966 self.qualified_name.join(".")
1967 )));
1968 }
1969 Ok(proc
1970 .outputs
1971 .iter()
1972 .map(|o| (o.name.clone(), o.name.clone()))
1973 .collect())
1974 }
1975 Some(YieldSpec::Star) => {
1976 if !self.standalone {
1977 return Err(Error::Procedure(
1978 "YIELD * is only allowed on standalone CALL".into(),
1979 ));
1980 }
1981 Ok(proc
1982 .outputs
1983 .iter()
1984 .map(|o| (o.name.clone(), o.name.clone()))
1985 .collect())
1986 }
1987 Some(YieldSpec::Items(items)) => {
1988 let mut projection = Vec::with_capacity(items.len());
1989 let mut seen_aliases: std::collections::HashSet<String> =
1990 std::collections::HashSet::new();
1991 for yi in items {
1992 if !proc.outputs.iter().any(|o| o.name == yi.column) {
1993 return Err(Error::Procedure(format!(
1994 "procedure '{}' has no output column '{}'",
1995 self.qualified_name.join("."),
1996 yi.column
1997 )));
1998 }
1999 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2000 if !seen_aliases.insert(alias.clone()) {
2001 return Err(Error::Procedure(format!(
2002 "variable '{alias}' already bound by YIELD"
2003 )));
2004 }
2005 projection.push((yi.column.clone(), alias));
2006 }
2007 Ok(projection)
2008 }
2009 }
2010 }
2011
2012 fn evaluate_args(
2019 &self,
2020 ctx: &ExecCtx,
2021 row: &Row,
2022 proc: &crate::procedures::Procedure,
2023 ) -> Result<Vec<Value>> {
2024 match &self.args {
2025 Some(exprs) => {
2026 if exprs.len() != proc.inputs.len() {
2027 return Err(Error::Procedure(format!(
2028 "procedure '{}' expects {} argument(s), got {}",
2029 self.qualified_name.join("."),
2030 proc.inputs.len(),
2031 exprs.len()
2032 )));
2033 }
2034 let eval_ctx = ctx.eval_ctx(row);
2035 let mut values = Vec::with_capacity(exprs.len());
2036 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2037 let v = eval_expr(expr, &eval_ctx)?;
2038 if !spec.ty.accepts(&v) {
2039 return Err(Error::Procedure(format!(
2040 "argument '{}' has wrong type for procedure '{}'",
2041 spec.name,
2042 self.qualified_name.join(".")
2043 )));
2044 }
2045 values.push(coerce_arg(v, spec.ty));
2046 }
2047 Ok(values)
2048 }
2049 None => {
2050 if !self.standalone {
2052 return Err(Error::Procedure(
2053 "in-query CALL requires explicit argument list".into(),
2054 ));
2055 }
2056 let mut values = Vec::with_capacity(proc.inputs.len());
2057 for spec in &proc.inputs {
2058 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2059 Error::Procedure(format!(
2060 "missing parameter ${} for procedure '{}'",
2061 spec.name,
2062 self.qualified_name.join(".")
2063 ))
2064 })?;
2065 if !spec.ty.accepts(&v) {
2066 return Err(Error::Procedure(format!(
2067 "parameter '{}' has wrong type",
2068 spec.name
2069 )));
2070 }
2071 values.push(coerce_arg(v, spec.ty));
2072 }
2073 Ok(values)
2074 }
2075 }
2076 }
2077
2078 fn invoke_once(
2083 &self,
2084 ctx: &ExecCtx,
2085 input_row: &Row,
2086 proc: &crate::procedures::Procedure,
2087 projection: &[(String, String)],
2088 out: &mut Vec<Row>,
2089 ) -> Result<()> {
2090 if proc.outputs.is_empty() {
2094 if !self.standalone {
2095 out.push(input_row.clone());
2096 }
2097 return Ok(());
2098 }
2099 let args = self.evaluate_args(ctx, input_row, proc)?;
2100 let rows = proc.resolve_rows(ctx.store)?;
2101 for proc_row in &rows {
2102 if !proc.row_matches(proc_row, &args) {
2103 continue;
2104 }
2105 let mut merged = if self.standalone {
2106 Row::new()
2107 } else {
2108 input_row.clone()
2109 };
2110 for (src, alias) in projection {
2111 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2112 merged.insert(alias.clone(), v);
2113 }
2114 out.push(merged);
2115 }
2116 Ok(())
2117 }
2118}
2119
2120fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2125 use crate::procedures::ProcType;
2126 if matches!(ty, ProcType::Float) {
2127 if let Value::Property(Property::Int64(n)) = v {
2128 return Value::Property(Property::Float64(n as f64));
2129 }
2130 }
2131 v
2132}
2133
2134impl Operator for ProcedureCallOp {
2135 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2136 loop {
2137 if self.buffered_idx < self.buffered.len() {
2138 let row = self.buffered[self.buffered_idx].clone();
2139 self.buffered_idx += 1;
2140 return Ok(Some(row));
2141 }
2142 self.buffered.clear();
2143 self.buffered_idx = 0;
2144
2145 let proc = match ctx.procedures.get(&self.qualified_name) {
2146 Some(p) => p,
2147 None => {
2148 return Err(Error::Procedure(format!(
2149 "procedure '{}' not found",
2150 self.qualified_name.join(".")
2151 )));
2152 }
2153 };
2154 let projection = self.resolve_projection(proc)?;
2155
2156 let input_row = match &mut self.input {
2157 Some(inp) => match inp.next(ctx)? {
2158 Some(r) => r,
2159 None => return Ok(None),
2160 },
2161 None => {
2162 if self.done {
2163 return Ok(None);
2164 }
2165 self.done = true;
2166 Row::new()
2167 }
2168 };
2169
2170 let mut produced = Vec::new();
2171 self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2172 if produced.is_empty() {
2173 if self.input.is_some() {
2174 continue;
2175 }
2176 return Ok(None);
2177 }
2178 self.buffered = produced;
2179 }
2180 }
2181}
2182
2183fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2189 match v {
2190 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2191 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2192 Value::Map(pairs) => pairs
2193 .iter()
2194 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2195 .collect(),
2196 Value::Property(Property::Map(entries)) => Ok(entries
2197 .iter()
2198 .map(|(k, p)| (k.clone(), p.clone()))
2199 .collect()),
2200 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2201 _ => Err(Error::InvalidSetValue),
2202 }
2203}
2204
2205fn value_to_property(v: Value) -> Result<Property> {
2206 match v {
2207 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2208 Value::Property(p) => Ok(p),
2209 Value::Null => Ok(Property::Null),
2210 Value::List(items) => {
2211 let props: Vec<Property> = items
2212 .into_iter()
2213 .map(value_to_property)
2214 .collect::<Result<_>>()?;
2215 Ok(Property::List(props))
2216 }
2217 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2221 Err(Error::InvalidSetValue)
2222 }
2223 }
2224}
2225
2226struct NodeScanAllOp {
2227 var: String,
2228 ids: Option<Vec<NodeId>>,
2229 cursor: usize,
2230}
2231
2232impl NodeScanAllOp {
2233 fn new(var: String) -> Self {
2234 Self {
2235 var,
2236 ids: None,
2237 cursor: 0,
2238 }
2239 }
2240}
2241
2242impl Operator for NodeScanAllOp {
2243 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2244 if self.ids.is_none() {
2245 self.ids = Some(ctx.store.all_node_ids()?);
2246 }
2247 let ids = self.ids.as_ref().unwrap();
2248 while self.cursor < ids.len() {
2249 let id = ids[self.cursor];
2250 self.cursor += 1;
2251 if let Some(node) = ctx.store.get_node(id)? {
2252 let mut row = Row::new();
2253 row.insert(self.var.clone(), Value::Node(node));
2254 return Ok(Some(row));
2255 }
2256 }
2257 Ok(None)
2258 }
2259}
2260
2261struct NodeScanByLabelsOp {
2262 var: String,
2263 labels: Vec<String>,
2264 ids: Option<Vec<NodeId>>,
2265 cursor: usize,
2266}
2267
2268impl NodeScanByLabelsOp {
2269 fn new(var: String, labels: Vec<String>) -> Self {
2270 Self {
2271 var,
2272 labels,
2273 ids: None,
2274 cursor: 0,
2275 }
2276 }
2277}
2278
2279impl Operator for NodeScanByLabelsOp {
2280 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2281 if self.ids.is_none() {
2282 let primary = self
2284 .labels
2285 .first()
2286 .expect("NodeScanByLabels must have at least one label");
2287 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2288 }
2289 let ids = self.ids.as_ref().unwrap();
2290 while self.cursor < ids.len() {
2291 let id = ids[self.cursor];
2292 self.cursor += 1;
2293 if let Some(node) = ctx.store.get_node(id)? {
2294 if has_all_labels(&node, &self.labels) {
2295 let mut row = Row::new();
2296 row.insert(self.var.clone(), Value::Node(node));
2297 return Ok(Some(row));
2298 }
2299 }
2300 }
2301 Ok(None)
2302 }
2303}
2304
2305fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2306 labels.iter().all(|l| node.labels.contains(l))
2307}
2308
2309struct IndexSeekOp {
2321 var: String,
2322 label: String,
2323 property: String,
2324 value_expr: Expr,
2325 results: Option<Vec<NodeId>>,
2326 cursor: usize,
2327}
2328
2329impl IndexSeekOp {
2330 fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2331 Self {
2332 var,
2333 label,
2334 property,
2335 value_expr,
2336 results: None,
2337 cursor: 0,
2338 }
2339 }
2340}
2341
2342impl Operator for IndexSeekOp {
2343 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2344 if self.results.is_none() {
2345 let empty = Row::new();
2346 let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2347 let property = match value {
2348 Value::Property(p) => p,
2349 Value::Null => Property::Null,
2350 Value::Node(_)
2351 | Value::Edge(_)
2352 | Value::List(_)
2353 | Value::Map(_)
2354 | Value::Path { .. } => {
2355 return Err(Error::InvalidSetValue);
2356 }
2357 };
2358 let ids = ctx
2359 .store
2360 .nodes_by_property(&self.label, &self.property, &property)?;
2361 self.results = Some(ids);
2362 }
2363 let ids = self.results.as_ref().unwrap();
2364 while self.cursor < ids.len() {
2365 let id = ids[self.cursor];
2366 self.cursor += 1;
2367 if let Some(node) = ctx.store.get_node(id)? {
2368 let mut row = Row::new();
2369 row.insert(self.var.clone(), Value::Node(node));
2370 return Ok(Some(row));
2371 }
2372 }
2373 Ok(None)
2374 }
2375}
2376
2377fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2378 props.iter().all(|(k, v)| {
2379 node.properties
2380 .get(k)
2381 .map(|stored| stored == v)
2382 .unwrap_or(false)
2383 })
2384}
2385
2386struct MergeNodeOp {
2387 var: String,
2388 labels: Vec<String>,
2389 properties: Vec<(String, Expr)>,
2393 on_create: Vec<SetAssignment>,
2398 on_match: Vec<SetAssignment>,
2402 input: Option<Box<dyn Operator>>,
2409 merged_nodes: Vec<Node>,
2416 merge_done: bool,
2420 current_input_row: Option<Row>,
2424 cursor: usize,
2425}
2426
2427impl MergeNodeOp {
2428 fn new(
2429 input: Option<Box<dyn Operator>>,
2430 var: String,
2431 labels: Vec<String>,
2432 properties: Vec<(String, Expr)>,
2433 on_create: Vec<SetAssignment>,
2434 on_match: Vec<SetAssignment>,
2435 ) -> Self {
2436 Self {
2437 var,
2438 labels,
2439 properties,
2440 on_create,
2441 on_match,
2442 input,
2443 merged_nodes: Vec::new(),
2444 merge_done: false,
2445 current_input_row: None,
2446 cursor: 0,
2447 }
2448 }
2449
2450 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2462 let resolved_props: Vec<(String, Property)> = self
2463 .properties
2464 .iter()
2465 .map(|(k, expr)| {
2466 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2467 Ok((k.clone(), value_to_property(v)?))
2468 })
2469 .collect::<Result<Vec<_>>>()?;
2470
2471 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2472 ctx.store.nodes_by_label(primary)?
2473 } else {
2474 ctx.store.all_node_ids()?
2475 };
2476 let mut merged_nodes: Vec<Node> = Vec::new();
2477 for id in candidate_ids {
2478 if let Some(node) = ctx.store.get_node(id)? {
2479 if has_all_labels(&node, &self.labels)
2480 && matches_pattern_props(&node, &resolved_props)
2481 {
2482 merged_nodes.push(node);
2483 }
2484 }
2485 }
2486
2487 if merged_nodes.is_empty() {
2488 let mut node = Node::new();
2489 for label in &self.labels {
2490 node.labels.push(label.clone());
2491 }
2492 for (k, prop) in resolved_props {
2493 node.properties.insert(k, prop);
2494 }
2495 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2496 ctx.writer.put_node(&node)?;
2497 merged_nodes.push(node);
2498 } else if !self.on_match.is_empty() {
2499 for node in merged_nodes.iter_mut() {
2500 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2501 ctx.writer.put_node(node)?;
2502 }
2503 }
2504 Ok(merged_nodes)
2505 }
2506}
2507
2508impl Operator for MergeNodeOp {
2509 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2510 if self.input.is_none() {
2515 if !self.merge_done {
2516 let empty = Row::new();
2517 let nodes = self.run_merge_for(ctx, &empty)?;
2518 self.merged_nodes = nodes;
2519 self.merge_done = true;
2520 }
2521 if self.cursor < self.merged_nodes.len() {
2522 let node = self.merged_nodes[self.cursor].clone();
2523 self.cursor += 1;
2524 let mut row = Row::new();
2525 row.insert(self.var.clone(), Value::Node(node));
2526 return Ok(Some(row));
2527 }
2528 return Ok(None);
2529 }
2530
2531 loop {
2538 if let Some(base) = self.current_input_row.as_ref() {
2539 if self.cursor < self.merged_nodes.len() {
2540 let node = self.merged_nodes[self.cursor].clone();
2541 self.cursor += 1;
2542 let mut row = base.clone();
2543 row.insert(self.var.clone(), Value::Node(node));
2544 return Ok(Some(row));
2545 }
2546 }
2547 match self.input.as_mut().unwrap().next(ctx)? {
2548 None => return Ok(None),
2549 Some(row) => {
2550 let nodes = self.run_merge_for(ctx, &row)?;
2551 self.merged_nodes = nodes;
2552 self.cursor = 0;
2553 self.current_input_row = Some(row);
2554 }
2555 }
2556 }
2557 }
2558}
2559
2560struct MergeEdgeOp {
2579 input: Box<dyn Operator>,
2580 edge_var: String,
2581 src_var: String,
2582 dst_var: String,
2583 edge_type: String,
2584 undirected: bool,
2585 properties: Vec<(String, Expr)>,
2589 on_create: Vec<SetAssignment>,
2590 on_match: Vec<SetAssignment>,
2591 pending: std::collections::VecDeque<Row>,
2598}
2599
2600impl MergeEdgeOp {
2601 #[allow(clippy::too_many_arguments)]
2602 fn new(
2603 input: Box<dyn Operator>,
2604 edge_var: String,
2605 src_var: String,
2606 dst_var: String,
2607 edge_type: String,
2608 undirected: bool,
2609 properties: Vec<(String, Expr)>,
2610 on_create: Vec<SetAssignment>,
2611 on_match: Vec<SetAssignment>,
2612 ) -> Self {
2613 Self {
2614 input,
2615 edge_var,
2616 src_var,
2617 dst_var,
2618 edge_type,
2619 undirected,
2620 properties,
2621 on_create,
2622 on_match,
2623 pending: std::collections::VecDeque::new(),
2624 }
2625 }
2626}
2627
2628impl Operator for MergeEdgeOp {
2629 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2630 loop {
2631 if let Some(row) = self.pending.pop_front() {
2632 return Ok(Some(row));
2633 }
2634 let Some(row) = self.input.next(ctx)? else {
2635 return Ok(None);
2636 };
2637 let src_node = match row.get(&self.src_var) {
2642 Some(Value::Node(n)) => n.clone(),
2643 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2644 };
2645 let dst_node = match row.get(&self.dst_var) {
2646 Some(Value::Node(n)) => n.clone(),
2647 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2648 };
2649
2650 let required_props: Vec<(String, Property)> = self
2654 .properties
2655 .iter()
2656 .map(|(k, expr)| {
2657 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2658 Ok((k.clone(), value_to_property(v)?))
2659 })
2660 .collect::<Result<Vec<_>>>()?;
2661 let edge_matches = |edge: &Edge| -> bool {
2662 required_props.iter().all(|(k, want)| {
2663 edge.properties
2664 .get(k)
2665 .map(|have| have == want)
2666 .unwrap_or(false)
2667 })
2668 };
2669
2670 let mut matched: Vec<Edge> = Vec::new();
2677 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2678 if neighbor_id != dst_node.id {
2679 continue;
2680 }
2681 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2682 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2683 matched.push(edge);
2684 }
2685 }
2686 }
2687 if self.undirected {
2688 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2689 if neighbor_id != dst_node.id {
2690 continue;
2691 }
2692 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2693 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2694 matched.push(edge);
2695 }
2696 }
2697 }
2698 }
2699
2700 if matched.is_empty() {
2701 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2702 for (k, p) in &required_props {
2703 new_edge.properties.insert(k.clone(), p.clone());
2704 }
2705 let mut row_out = row.clone();
2706 apply_merge_edge_actions(
2707 &mut new_edge,
2708 &self.on_create,
2709 &self.edge_var,
2710 ctx,
2711 &mut row_out,
2712 )?;
2713 ctx.writer.put_edge(&new_edge)?;
2714 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2715 self.pending.push_back(row_out);
2716 } else {
2717 for mut existing in matched {
2718 let mut row_out = row.clone();
2719 if !self.on_match.is_empty() {
2720 apply_merge_edge_actions(
2721 &mut existing,
2722 &self.on_match,
2723 &self.edge_var,
2724 ctx,
2725 &mut row_out,
2726 )?;
2727 ctx.writer.put_edge(&existing)?;
2728 }
2729 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2730 self.pending.push_back(row_out);
2731 }
2732 }
2733 }
2734 }
2735}
2736
2737fn apply_merge_edge_actions(
2747 edge: &mut Edge,
2748 actions: &[SetAssignment],
2749 var: &str,
2750 exec_ctx: &ExecCtx,
2751 outer: &mut Row,
2752) -> Result<()> {
2753 if actions.is_empty() {
2754 return Ok(());
2755 }
2756 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2759 for action in actions {
2760 match action {
2761 SetAssignment::Property {
2762 var: target,
2763 key,
2764 value,
2765 } => {
2766 let sub_ctx = exec_ctx.eval_ctx(outer);
2767 let evaluated = eval_expr(value, &sub_ctx)?;
2768 let prop = value_to_property(evaluated)?;
2769 if target == var {
2770 if matches!(prop, Property::Null) {
2771 edge.properties.remove(key);
2772 } else {
2773 edge.properties.insert(key.clone(), prop);
2774 }
2775 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2776 } else {
2777 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2778 }
2779 }
2780 SetAssignment::Merge {
2781 var: target,
2782 properties,
2783 } => {
2784 let sub_ctx = exec_ctx.eval_ctx(outer);
2785 let resolved: Vec<(String, Property)> = properties
2786 .iter()
2787 .map(|(k, expr)| {
2788 let v = eval_expr(expr, &sub_ctx)?;
2789 Ok((k.clone(), value_to_property(v)?))
2790 })
2791 .collect::<Result<Vec<_>>>()?;
2792 if target == var {
2793 for (k, p) in resolved {
2794 edge.properties.insert(k, p);
2795 }
2796 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2797 } else {
2798 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2799 }
2800 }
2801 SetAssignment::Replace {
2802 var: target,
2803 properties,
2804 } => {
2805 let sub_ctx = exec_ctx.eval_ctx(outer);
2806 let resolved: Vec<(String, Property)> = properties
2807 .iter()
2808 .map(|(k, expr)| {
2809 let v = eval_expr(expr, &sub_ctx)?;
2810 Ok((k.clone(), value_to_property(v)?))
2811 })
2812 .collect::<Result<Vec<_>>>()?;
2813 if target == var {
2814 edge.properties.clear();
2815 for (k, p) in resolved {
2816 edge.properties.insert(k, p);
2817 }
2818 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2819 } else {
2820 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2821 }
2822 }
2823 SetAssignment::Labels {
2824 var: target,
2825 labels,
2826 } => {
2827 if target == var {
2828 return Err(Error::UnboundVariable(target.clone()));
2830 }
2831 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2832 }
2833 SetAssignment::ReplaceFromExpr {
2834 var: target,
2835 source,
2836 replace,
2837 } => {
2838 let sub_ctx = exec_ctx.eval_ctx(outer);
2839 let v = eval_expr(source, &sub_ctx)?;
2840 let props = extract_property_map(&v)?;
2841 if target == var {
2842 if *replace {
2843 edge.properties.clear();
2844 }
2845 for (k, p) in props {
2846 edge.properties.insert(k, p);
2847 }
2848 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2849 } else {
2850 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2851 }
2852 }
2853 }
2854 }
2855 Ok(())
2856}
2857
2858fn apply_set_prop_to_outer(
2863 outer: &mut Row,
2864 exec_ctx: &ExecCtx,
2865 target: &str,
2866 key: &str,
2867 prop: Property,
2868) -> Result<()> {
2869 match outer.get_mut(target) {
2870 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2871 return Ok(());
2874 }
2875 Some(Value::Node(n)) => {
2876 if matches!(prop, Property::Null) {
2877 n.properties.remove(key);
2878 } else {
2879 n.properties.insert(key.to_string(), prop);
2880 }
2881 exec_ctx.writer.put_node(n)?;
2882 }
2883 Some(Value::Edge(e)) => {
2884 if matches!(prop, Property::Null) {
2885 e.properties.remove(key);
2886 } else {
2887 e.properties.insert(key.to_string(), prop);
2888 }
2889 exec_ctx.writer.put_edge(e)?;
2890 }
2891 _ => return Err(Error::UnboundVariable(target.to_string())),
2892 }
2893 Ok(())
2894}
2895
2896fn apply_set_map_to_outer(
2900 outer: &mut Row,
2901 exec_ctx: &ExecCtx,
2902 target: &str,
2903 props: Vec<(String, Property)>,
2904 replace: bool,
2905) -> Result<()> {
2906 match outer.get_mut(target) {
2907 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2908 Some(Value::Node(n)) => {
2909 if replace {
2910 n.properties.clear();
2911 }
2912 for (k, p) in props {
2913 if replace || !matches!(p, Property::Null) {
2914 n.properties.insert(k, p);
2915 } else {
2916 n.properties.remove(&k);
2917 }
2918 }
2919 exec_ctx.writer.put_node(n)?;
2920 Ok(())
2921 }
2922 Some(Value::Edge(e)) => {
2923 if replace {
2924 e.properties.clear();
2925 }
2926 for (k, p) in props {
2927 if replace || !matches!(p, Property::Null) {
2928 e.properties.insert(k, p);
2929 } else {
2930 e.properties.remove(&k);
2931 }
2932 }
2933 exec_ctx.writer.put_edge(e)?;
2934 Ok(())
2935 }
2936 _ => Err(Error::UnboundVariable(target.to_string())),
2937 }
2938}
2939
2940fn apply_set_labels_to_outer(
2942 outer: &mut Row,
2943 exec_ctx: &ExecCtx,
2944 target: &str,
2945 labels: &[String],
2946) -> Result<()> {
2947 match outer.get_mut(target) {
2948 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2949 Some(Value::Node(n)) => {
2950 for label in labels {
2951 if !n.labels.contains(label) {
2952 n.labels.push(label.clone());
2953 }
2954 }
2955 exec_ctx.writer.put_node(n)?;
2956 Ok(())
2957 }
2958 _ => Err(Error::UnboundVariable(target.to_string())),
2959 }
2960}
2961
2962fn apply_merge_actions(
2971 node: &mut Node,
2972 actions: &[SetAssignment],
2973 var: &str,
2974 exec_ctx: &ExecCtx,
2975 base_row: &Row,
2976) -> Result<()> {
2977 if actions.is_empty() {
2978 return Ok(());
2979 }
2980 let mut row = base_row.clone();
2983 row.insert(var.to_string(), Value::Node(node.clone()));
2984 for action in actions {
2985 let sub_ctx = exec_ctx.eval_ctx(&row);
2986 match action {
2987 SetAssignment::Property {
2988 var: target,
2989 key,
2990 value,
2991 } => {
2992 if target != var {
2993 return Err(Error::UnboundVariable(target.clone()));
2994 }
2995 let evaluated = eval_expr(value, &sub_ctx)?;
2996 let prop = value_to_property(evaluated)?;
2997 node.properties.insert(key.clone(), prop);
2998 row.insert(var.to_string(), Value::Node(node.clone()));
2999 }
3000 SetAssignment::Labels {
3001 var: target,
3002 labels,
3003 } => {
3004 if target != var {
3005 return Err(Error::UnboundVariable(target.clone()));
3006 }
3007 for label in labels {
3008 if !node.labels.contains(label) {
3009 node.labels.push(label.clone());
3010 }
3011 }
3012 row.insert(var.to_string(), Value::Node(node.clone()));
3013 }
3014 SetAssignment::Replace {
3015 var: target,
3016 properties,
3017 } => {
3018 if target != var {
3019 return Err(Error::UnboundVariable(target.clone()));
3020 }
3021 let resolved: Vec<(String, Property)> = properties
3022 .iter()
3023 .map(|(k, expr)| {
3024 let v = eval_expr(expr, &sub_ctx)?;
3025 Ok((k.clone(), value_to_property(v)?))
3026 })
3027 .collect::<Result<Vec<_>>>()?;
3028 node.properties.clear();
3029 for (k, p) in resolved {
3030 node.properties.insert(k, p);
3031 }
3032 row.insert(var.to_string(), Value::Node(node.clone()));
3033 }
3034 SetAssignment::Merge {
3035 var: target,
3036 properties,
3037 } => {
3038 if target != var {
3039 return Err(Error::UnboundVariable(target.clone()));
3040 }
3041 let resolved: Vec<(String, Property)> = properties
3042 .iter()
3043 .map(|(k, expr)| {
3044 let v = eval_expr(expr, &sub_ctx)?;
3045 Ok((k.clone(), value_to_property(v)?))
3046 })
3047 .collect::<Result<Vec<_>>>()?;
3048 for (k, p) in resolved {
3049 node.properties.insert(k, p);
3050 }
3051 row.insert(var.to_string(), Value::Node(node.clone()));
3052 }
3053 SetAssignment::ReplaceFromExpr {
3054 var: target,
3055 source,
3056 replace,
3057 } => {
3058 if target != var {
3059 return Err(Error::UnboundVariable(target.clone()));
3060 }
3061 let v = eval_expr(source, &sub_ctx)?;
3062 let props = extract_property_map(&v)?;
3063 if *replace {
3064 node.properties.clear();
3065 }
3066 for (k, p) in props {
3067 node.properties.insert(k, p);
3068 }
3069 row.insert(var.to_string(), Value::Node(node.clone()));
3070 }
3071 }
3072 }
3073 Ok(())
3074}
3075
3076struct EdgeExpandOp {
3077 input: Box<dyn Operator>,
3078 src_var: String,
3079 edge_var: Option<String>,
3080 dst_var: String,
3081 dst_labels: Vec<String>,
3082 edge_properties: Vec<(String, Expr)>,
3083 edge_types: Vec<String>,
3084 direction: Direction,
3085 edge_constraint_var: Option<String>,
3091 current_row: Option<Row>,
3092 pending: Vec<(EdgeId, NodeId)>,
3093 pending_idx: usize,
3094}
3095
3096impl EdgeExpandOp {
3097 #[allow(clippy::too_many_arguments)]
3098 fn new(
3099 input: Box<dyn Operator>,
3100 src_var: String,
3101 edge_var: Option<String>,
3102 dst_var: String,
3103 dst_labels: Vec<String>,
3104 edge_properties: Vec<(String, Expr)>,
3105 edge_types: Vec<String>,
3106 direction: Direction,
3107 edge_constraint_var: Option<String>,
3108 ) -> Self {
3109 Self {
3110 input,
3111 src_var,
3112 edge_var,
3113 dst_var,
3114 dst_labels,
3115 edge_properties,
3116 edge_types,
3117 direction,
3118 edge_constraint_var,
3119 current_row: None,
3120 pending: Vec::new(),
3121 pending_idx: 0,
3122 }
3123 }
3124}
3125
3126impl Operator for EdgeExpandOp {
3127 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3128 loop {
3129 while self.pending_idx < self.pending.len() {
3130 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3131 self.pending_idx += 1;
3132
3133 let edge = match ctx.store.get_edge(edge_id)? {
3134 Some(e) => e,
3135 None => continue,
3136 };
3137 if !self.edge_types.is_empty()
3138 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3139 {
3140 continue;
3141 }
3142 if let Some(constraint_var) = &self.edge_constraint_var {
3150 let base = self
3151 .current_row
3152 .as_ref()
3153 .expect("pending edges without source row");
3154 let expected = match ctx.lookup_binding(base, constraint_var) {
3155 Some(Value::Edge(e)) => Some(e.id),
3156 _ => None,
3157 };
3158 match expected {
3159 Some(id) if id != edge.id => continue,
3160 None => continue,
3161 _ => {}
3162 }
3163 }
3164 if !self.edge_properties.is_empty() {
3169 let base = self
3170 .current_row
3171 .as_ref()
3172 .expect("pending edges without source row");
3173 let ectx = ctx.eval_ctx(base);
3174 let mut ok = true;
3175 for (key, expr) in &self.edge_properties {
3176 let expected = eval_expr(expr, &ectx)?;
3177 let actual = match edge.properties.get(key) {
3178 Some(v) => Value::Property(v.clone()),
3179 None => {
3180 ok = false;
3181 break;
3182 }
3183 };
3184 if !values_equal(&actual, &expected) {
3185 ok = false;
3186 break;
3187 }
3188 }
3189 if !ok {
3190 continue;
3191 }
3192 }
3193
3194 let neighbor = match ctx.store.get_node(neighbor_id)? {
3195 Some(n) => n,
3196 None => continue,
3197 };
3198 if !has_all_labels(&neighbor, &self.dst_labels) {
3199 continue;
3200 }
3201
3202 let base = self
3203 .current_row
3204 .as_ref()
3205 .expect("pending edges without source row");
3206 let mut out = base.clone();
3207 if let Some(ev) = &self.edge_var {
3208 out.insert(ev.clone(), Value::Edge(edge));
3209 }
3210 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3211 return Ok(Some(out));
3212 }
3213
3214 match self.input.next(ctx)? {
3215 None => return Ok(None),
3216 Some(row) => {
3217 let src_id = match row.get(&self.src_var) {
3218 Some(Value::Node(n)) => n.id,
3219 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3224 continue
3225 }
3226 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3227 };
3228 self.pending = match self.direction {
3229 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3230 Direction::Incoming => ctx.store.incoming(src_id)?,
3231 Direction::Both => {
3232 let mut all = ctx.store.outgoing(src_id)?;
3238 let mut seen: std::collections::HashSet<EdgeId> =
3239 all.iter().map(|(e, _)| *e).collect();
3240 for (e, n) in ctx.store.incoming(src_id)? {
3241 if seen.insert(e) {
3242 all.push((e, n));
3243 }
3244 }
3245 all
3246 }
3247 };
3248 self.pending_idx = 0;
3249 self.current_row = Some(row);
3250 }
3251 }
3252 }
3253 }
3254}
3255
3256struct OptionalEdgeExpandOp {
3271 input: Box<dyn Operator>,
3272 src_var: String,
3273 edge_var: Option<String>,
3274 dst_var: String,
3275 dst_labels: Vec<String>,
3276 dst_properties: Vec<(String, Expr)>,
3277 edge_types: Vec<String>,
3278 direction: Direction,
3279 dst_constraint_var: Option<String>,
3285 edge_constraint_var: Option<String>,
3290 current_row: Option<Row>,
3291 pending: Vec<(EdgeId, NodeId)>,
3292 pending_idx: usize,
3293 yielded_for_current: bool,
3294}
3295
3296impl OptionalEdgeExpandOp {
3297 #[allow(clippy::too_many_arguments)]
3298 fn new(
3299 input: Box<dyn Operator>,
3300 src_var: String,
3301 edge_var: Option<String>,
3302 dst_var: String,
3303 dst_labels: Vec<String>,
3304 dst_properties: Vec<(String, Expr)>,
3305 edge_types: Vec<String>,
3306 direction: Direction,
3307 dst_constraint_var: Option<String>,
3308 edge_constraint_var: Option<String>,
3309 ) -> Self {
3310 Self {
3311 input,
3312 src_var,
3313 edge_var,
3314 dst_var,
3315 dst_labels,
3316 dst_properties,
3317 edge_types,
3318 direction,
3319 dst_constraint_var,
3320 edge_constraint_var,
3321 current_row: None,
3322 pending: Vec::new(),
3323 pending_idx: 0,
3324 yielded_for_current: false,
3325 }
3326 }
3327}
3328
3329impl Operator for OptionalEdgeExpandOp {
3330 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3331 loop {
3332 while self.pending_idx < self.pending.len() {
3333 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3334 self.pending_idx += 1;
3335
3336 let edge = match ctx.store.get_edge(edge_id)? {
3337 Some(e) => e,
3338 None => continue,
3339 };
3340 if !self.edge_types.is_empty()
3341 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3342 {
3343 continue;
3344 }
3345 if let Some(constraint_var) = &self.edge_constraint_var {
3351 let base = self
3352 .current_row
3353 .as_ref()
3354 .expect("pending without source row");
3355 let expected = match ctx.lookup_binding(base, constraint_var) {
3356 Some(Value::Edge(e)) => Some(e.id),
3357 _ => None,
3358 };
3359 match expected {
3360 Some(id) if id != edge.id => continue,
3361 None => continue,
3362 _ => {}
3363 }
3364 }
3365
3366 let neighbor = match ctx.store.get_node(neighbor_id)? {
3367 Some(n) => n,
3368 None => continue,
3369 };
3370 if !has_all_labels(&neighbor, &self.dst_labels) {
3371 continue;
3372 }
3373 if let Some(constraint_var) = &self.dst_constraint_var {
3380 let base = self
3381 .current_row
3382 .as_ref()
3383 .expect("pending without source row");
3384 let bound_id = match base.get(constraint_var) {
3385 Some(Value::Node(n)) => Some(n.id),
3386 Some(Value::Null)
3387 | Some(Value::Property(meshdb_core::Property::Null))
3388 | None => None,
3389 _ => None,
3390 };
3391 match bound_id {
3392 Some(id) if id != neighbor.id => continue,
3393 None => continue,
3394 _ => {}
3395 }
3396 }
3397 if !self.dst_properties.is_empty() {
3398 let base = self
3399 .current_row
3400 .as_ref()
3401 .expect("pending without source row");
3402 let ectx = ctx.eval_ctx(base);
3403 let mut props_ok = true;
3404 for (key, expr) in &self.dst_properties {
3405 let expected = eval_expr(expr, &ectx)?;
3406 let actual = neighbor
3407 .properties
3408 .get(key)
3409 .cloned()
3410 .map(Value::Property)
3411 .unwrap_or(Value::Null);
3412 if !values_equal(&expected, &actual) {
3413 props_ok = false;
3414 break;
3415 }
3416 }
3417 if !props_ok {
3418 continue;
3419 }
3420 }
3421
3422 let base = self
3423 .current_row
3424 .as_ref()
3425 .expect("pending edges without source row");
3426 let mut out = base.clone();
3427 if let Some(ev) = &self.edge_var {
3428 out.insert(ev.clone(), Value::Edge(edge));
3429 }
3430 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3431 self.yielded_for_current = true;
3432 return Ok(Some(out));
3433 }
3434
3435 if let Some(base) = self.current_row.take() {
3445 if !self.yielded_for_current {
3446 let mut out = base;
3447 if let Some(ev) = &self.edge_var {
3448 let preserve = self
3449 .edge_constraint_var
3450 .as_ref()
3451 .map(|c| c == ev)
3452 .unwrap_or(false);
3453 if !preserve {
3454 out.insert(ev.clone(), Value::Null);
3455 }
3456 }
3457 let preserve_dst = self
3458 .dst_constraint_var
3459 .as_ref()
3460 .map(|c| c == &self.dst_var)
3461 .unwrap_or(false);
3462 if !preserve_dst {
3463 out.insert(self.dst_var.clone(), Value::Null);
3464 }
3465 self.yielded_for_current = true;
3466 return Ok(Some(out));
3467 }
3468 }
3469
3470 match self.input.next(ctx)? {
3471 None => return Ok(None),
3472 Some(row) => {
3473 let src_id = match row.get(&self.src_var) {
3474 Some(Value::Node(n)) => n.id,
3475 Some(Value::Null) => {
3482 self.pending = Vec::new();
3483 self.pending_idx = 0;
3484 self.yielded_for_current = false;
3485 self.current_row = Some(row);
3486 continue;
3487 }
3488 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3489 };
3490 self.pending = match self.direction {
3491 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3492 Direction::Incoming => ctx.store.incoming(src_id)?,
3493 Direction::Both => {
3494 let mut all = ctx.store.outgoing(src_id)?;
3500 let mut seen: std::collections::HashSet<EdgeId> =
3501 all.iter().map(|(e, _)| *e).collect();
3502 for (e, n) in ctx.store.incoming(src_id)? {
3503 if seen.insert(e) {
3504 all.push((e, n));
3505 }
3506 }
3507 all
3508 }
3509 };
3510 self.pending_idx = 0;
3511 self.yielded_for_current = false;
3512 self.current_row = Some(row);
3513 }
3514 }
3515 }
3516 }
3517}
3518
3519struct VarLengthExpandOp {
3520 input: Box<dyn Operator>,
3521 src_var: String,
3522 edge_var: Option<String>,
3523 dst_var: String,
3524 dst_labels: Vec<String>,
3525 edge_types: Vec<String>,
3526 edge_properties: Vec<(String, Expr)>,
3532 direction: Direction,
3533 min_hops: u64,
3534 max_hops: u64,
3535 path_var: Option<String>,
3536 optional: bool,
3542 dst_constraint_var: Option<String>,
3549 bound_edge_list_var: Option<String>,
3553 excluded_edge_vars: Vec<String>,
3561 current_row: Option<Row>,
3562 pending_paths: Vec<Vec<Edge>>,
3563 pending_node_paths: Vec<Vec<NodeId>>,
3564 pending_targets: Vec<NodeId>,
3565 pending_idx: usize,
3566}
3567
3568impl VarLengthExpandOp {
3569 #[allow(clippy::too_many_arguments)]
3570 fn new(
3571 input: Box<dyn Operator>,
3572 src_var: String,
3573 edge_var: Option<String>,
3574 dst_var: String,
3575 dst_labels: Vec<String>,
3576 edge_types: Vec<String>,
3577 edge_properties: Vec<(String, Expr)>,
3578 direction: Direction,
3579 min_hops: u64,
3580 max_hops: u64,
3581 path_var: Option<String>,
3582 optional: bool,
3583 dst_constraint_var: Option<String>,
3584 bound_edge_list_var: Option<String>,
3585 excluded_edge_vars: Vec<String>,
3586 ) -> Self {
3587 Self {
3588 input,
3589 src_var,
3590 edge_var,
3591 dst_var,
3592 dst_labels,
3593 edge_types,
3594 edge_properties,
3595 direction,
3596 min_hops,
3597 max_hops,
3598 path_var,
3599 optional,
3600 dst_constraint_var,
3601 bound_edge_list_var,
3602 excluded_edge_vars,
3603 current_row: None,
3604 pending_paths: Vec::new(),
3605 pending_node_paths: Vec::new(),
3606 pending_targets: Vec::new(),
3607 pending_idx: 0,
3608 }
3609 }
3610
3611 fn enumerate(
3612 &self,
3613 ctx: &ExecCtx,
3614 start: NodeId,
3615 input_row: &Row,
3616 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3617 let mut paths: Vec<Vec<Edge>> = Vec::new();
3618 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3619 let mut targets: Vec<NodeId> = Vec::new();
3620 let mut edge_buf: Vec<Edge> = Vec::new();
3621 let mut node_buf: Vec<NodeId> = vec![start];
3622 let mut used: HashSet<EdgeId> = HashSet::new();
3629 for var in &self.excluded_edge_vars {
3630 match ctx.lookup_binding(input_row, var) {
3631 Some(Value::Edge(e)) => {
3632 used.insert(e.id);
3633 }
3634 Some(Value::List(items)) => {
3635 for item in items {
3636 if let Value::Edge(e) = item {
3637 used.insert(e.id);
3638 }
3639 }
3640 }
3641 _ => {}
3642 }
3643 }
3644 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3648 Vec::new()
3649 } else {
3650 let ectx = ctx.eval_ctx(input_row);
3651 self.edge_properties
3652 .iter()
3653 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3654 .collect::<Result<Vec<_>>>()?
3655 };
3656 self.dfs(
3657 ctx,
3658 start,
3659 &expected_edge_props,
3660 &mut edge_buf,
3661 &mut node_buf,
3662 &mut used,
3663 &mut paths,
3664 &mut node_paths,
3665 &mut targets,
3666 )?;
3667 Ok((paths, node_paths, targets))
3668 }
3669
3670 #[allow(clippy::too_many_arguments)]
3671 fn dfs(
3672 &self,
3673 ctx: &ExecCtx,
3674 current_node: NodeId,
3675 expected_edge_props: &[(String, Value)],
3676 edge_buf: &mut Vec<Edge>,
3677 node_buf: &mut Vec<NodeId>,
3678 used: &mut HashSet<EdgeId>,
3679 out_paths: &mut Vec<Vec<Edge>>,
3680 out_node_paths: &mut Vec<Vec<NodeId>>,
3681 out_targets: &mut Vec<NodeId>,
3682 ) -> Result<()> {
3683 let depth = edge_buf.len() as u64;
3684
3685 if depth >= self.min_hops && depth <= self.max_hops {
3686 let terminal_ok = match ctx.store.get_node(current_node)? {
3687 Some(node) => has_all_labels(&node, &self.dst_labels),
3688 None => false,
3689 };
3690 if terminal_ok {
3691 out_paths.push(edge_buf.clone());
3692 out_node_paths.push(node_buf.clone());
3693 out_targets.push(current_node);
3694 }
3695 }
3696
3697 if depth >= self.max_hops {
3698 return Ok(());
3699 }
3700
3701 let neighbors = match self.direction {
3702 Direction::Outgoing => ctx.store.outgoing(current_node)?,
3703 Direction::Incoming => ctx.store.incoming(current_node)?,
3704 Direction::Both => {
3705 let mut all = ctx.store.outgoing(current_node)?;
3706 all.extend(ctx.store.incoming(current_node)?);
3707 all
3708 }
3709 };
3710
3711 for (eid, neighbor_id) in neighbors {
3712 if used.contains(&eid) {
3713 continue;
3714 }
3715 let edge = match ctx.store.get_edge(eid)? {
3716 Some(e) => e,
3717 None => continue,
3718 };
3719 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3720 {
3721 continue;
3722 }
3723 if !expected_edge_props.is_empty() {
3728 let mut ok = true;
3729 for (key, expected) in expected_edge_props {
3730 let actual = match edge.properties.get(key) {
3731 Some(v) => Value::Property(v.clone()),
3732 None => {
3733 ok = false;
3734 break;
3735 }
3736 };
3737 if !values_equal(&actual, expected) {
3738 ok = false;
3739 break;
3740 }
3741 }
3742 if !ok {
3743 continue;
3744 }
3745 }
3746 used.insert(eid);
3747 edge_buf.push(edge);
3748 node_buf.push(neighbor_id);
3749 self.dfs(
3750 ctx,
3751 neighbor_id,
3752 expected_edge_props,
3753 edge_buf,
3754 node_buf,
3755 used,
3756 out_paths,
3757 out_node_paths,
3758 out_targets,
3759 )?;
3760 edge_buf.pop();
3761 node_buf.pop();
3762 used.remove(&eid);
3763 }
3764
3765 Ok(())
3766 }
3767}
3768
3769fn replay_edge_list(
3787 ctx: &ExecCtx,
3788 row: &Row,
3789 list_var: &str,
3790 src_id: Option<NodeId>,
3791 direction: Direction,
3792 edge_types: &[String],
3793) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3794 let start = match src_id {
3795 Some(id) => id,
3796 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3797 };
3798 let list = match ctx.lookup_binding(row, list_var) {
3799 Some(Value::List(items)) => items.clone(),
3800 Some(Value::Property(meshdb_core::Property::List(items))) => items
3801 .iter()
3802 .cloned()
3803 .map(Value::Property)
3804 .collect::<Vec<_>>(),
3805 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3806 };
3807 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3808 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3809 node_buf.push(start);
3810 let mut current = start;
3811 for item in list {
3812 let edge = match item {
3813 Value::Edge(e) => e,
3814 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3815 };
3816 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3817 return Ok((Vec::new(), Vec::new(), Vec::new()));
3818 }
3819 let next_node = match direction {
3820 Direction::Outgoing => {
3821 if edge.source != current {
3822 return Ok((Vec::new(), Vec::new(), Vec::new()));
3823 }
3824 edge.target
3825 }
3826 Direction::Incoming => {
3827 if edge.target != current {
3828 return Ok((Vec::new(), Vec::new(), Vec::new()));
3829 }
3830 edge.source
3831 }
3832 Direction::Both => {
3833 if edge.source == current {
3834 edge.target
3835 } else if edge.target == current {
3836 edge.source
3837 } else {
3838 return Ok((Vec::new(), Vec::new(), Vec::new()));
3839 }
3840 }
3841 };
3842 if ctx.store.get_node(next_node)?.is_none() {
3846 return Ok((Vec::new(), Vec::new(), Vec::new()));
3847 }
3848 edge_buf.push(edge);
3849 node_buf.push(next_node);
3850 current = next_node;
3851 }
3852 Ok((vec![edge_buf], vec![node_buf], vec![current]))
3853}
3854
3855impl Operator for VarLengthExpandOp {
3856 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3857 loop {
3858 while self.pending_idx < self.pending_paths.len() {
3859 let i = self.pending_idx;
3860 self.pending_idx += 1;
3861
3862 let target_id = self.pending_targets[i];
3863 let target = match ctx.store.get_node(target_id)? {
3864 Some(n) => n,
3865 None => continue,
3866 };
3867
3868 let base = self
3869 .current_row
3870 .as_ref()
3871 .expect("pending without source row");
3872 let mut out = base.clone();
3873 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3874 if let Some(ev) = &self.edge_var {
3875 let edges: Vec<Value> = self.pending_paths[i]
3876 .iter()
3877 .cloned()
3878 .map(Value::Edge)
3879 .collect();
3880 out.insert(ev.clone(), Value::List(edges));
3881 }
3882 if let Some(pv) = &self.path_var {
3883 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3884 for nid in &self.pending_node_paths[i] {
3885 match ctx.store.get_node(*nid)? {
3886 Some(n) => nodes.push(n),
3887 None => continue,
3888 }
3889 }
3890 let edges = self.pending_paths[i].clone();
3891 out.insert(pv.clone(), Value::Path { nodes, edges });
3892 }
3893 return Ok(Some(out));
3894 }
3895
3896 match self.input.next(ctx)? {
3897 None => return Ok(None),
3898 Some(row) => {
3899 let src_id = match row.get(&self.src_var) {
3900 Some(Value::Node(n)) => Some(n.id),
3901 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3907 None
3908 }
3909 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3910 };
3911 let (mut paths, mut node_paths, mut targets) =
3919 if let Some(list_var) = &self.bound_edge_list_var {
3920 replay_edge_list(
3921 ctx,
3922 &row,
3923 list_var,
3924 src_id,
3925 self.direction,
3926 &self.edge_types,
3927 )?
3928 } else {
3929 match src_id {
3930 Some(id) => self.enumerate(ctx, id, &row)?,
3931 None => (Vec::new(), Vec::new(), Vec::new()),
3932 }
3933 };
3934 if let Some(constraint_var) = &self.dst_constraint_var {
3941 let target_id = match row.get(constraint_var) {
3942 Some(Value::Node(n)) => Some(n.id),
3943 _ => None,
3944 };
3945 match target_id {
3946 Some(id) => {
3947 let mut kept_paths = Vec::new();
3948 let mut kept_node_paths = Vec::new();
3949 let mut kept_targets = Vec::new();
3950 for ((p, np), t) in paths
3951 .drain(..)
3952 .zip(node_paths.drain(..))
3953 .zip(targets.drain(..))
3954 {
3955 if t == id {
3956 kept_paths.push(p);
3957 kept_node_paths.push(np);
3958 kept_targets.push(t);
3959 }
3960 }
3961 paths = kept_paths;
3962 node_paths = kept_node_paths;
3963 targets = kept_targets;
3964 }
3965 None => {
3966 paths.clear();
3967 node_paths.clear();
3968 targets.clear();
3969 }
3970 }
3971 }
3972 if paths.is_empty() && self.optional {
3973 let mut out = row;
3978 if let Some(ev) = &self.edge_var {
3979 out.insert(ev.clone(), Value::Null);
3980 }
3981 out.insert(self.dst_var.clone(), Value::Null);
3982 if let Some(pv) = &self.path_var {
3983 out.insert(pv.clone(), Value::Null);
3984 }
3985 return Ok(Some(out));
3986 }
3987 self.pending_paths = paths;
3988 self.pending_node_paths = node_paths;
3989 self.pending_targets = targets;
3990 self.pending_idx = 0;
3991 self.current_row = Some(row);
3992 }
3993 }
3994 }
3995 }
3996}
3997
3998struct FilterOp {
3999 input: Box<dyn Operator>,
4000 predicate: Expr,
4001}
4002
4003impl FilterOp {
4004 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4005 Self { input, predicate }
4006 }
4007}
4008
4009impl Operator for FilterOp {
4010 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4011 while let Some(row) = self.input.next(ctx)? {
4012 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4013 Ok(v) => v,
4014 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4017 Err(e) => return Err(e),
4018 };
4019 if to_bool(&v).unwrap_or(false) {
4020 return Ok(Some(row));
4021 }
4022 }
4023 Ok(None)
4024 }
4025}
4026
4027struct IdentityOp {
4030 input: Box<dyn Operator>,
4031}
4032
4033impl IdentityOp {
4034 fn new(input: Box<dyn Operator>) -> Self {
4035 Self { input }
4036 }
4037}
4038
4039impl Operator for IdentityOp {
4040 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4041 self.input.next(ctx)
4042 }
4043}
4044
4045struct CoalesceNullRowOp {
4051 input: Box<dyn Operator>,
4052 null_vars: Vec<String>,
4053 produced_any: bool,
4054 done: bool,
4055}
4056
4057impl CoalesceNullRowOp {
4058 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
4059 Self {
4060 input,
4061 null_vars,
4062 produced_any: false,
4063 done: false,
4064 }
4065 }
4066}
4067
4068impl Operator for CoalesceNullRowOp {
4069 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4070 if self.done {
4071 return Ok(None);
4072 }
4073 match self.input.next(ctx)? {
4074 Some(row) => {
4075 self.produced_any = true;
4076 Ok(Some(row))
4077 }
4078 None => {
4079 self.done = true;
4080 if self.produced_any {
4081 Ok(None)
4082 } else {
4083 let mut row = Row::new();
4084 for v in &self.null_vars {
4085 row.insert(v.clone(), Value::Null);
4086 }
4087 Ok(Some(row))
4088 }
4089 }
4090 }
4091 }
4092}
4093
4094struct ProjectOp {
4095 input: Box<dyn Operator>,
4096 items: Vec<ReturnItem>,
4097}
4098
4099impl ProjectOp {
4100 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4101 Self { input, items }
4102 }
4103}
4104
4105impl Operator for ProjectOp {
4106 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4107 match self.input.next(ctx)? {
4108 Some(row) => {
4109 let mut out = Row::new();
4110 for (i, item) in self.items.iter().enumerate() {
4111 let name = item.alias.clone().unwrap_or_else(|| {
4112 item.raw_text
4113 .clone()
4114 .unwrap_or_else(|| default_name(&item.expr, i))
4115 });
4116 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4117 out.insert(name, value);
4118 }
4119 Ok(Some(out))
4120 }
4121 None => Ok(None),
4122 }
4123 }
4124}
4125
4126fn default_name(expr: &Expr, idx: usize) -> String {
4127 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4128}
4129
4130fn render_expr_name(expr: &Expr) -> Option<String> {
4131 Some(match expr {
4132 Expr::Identifier(s) => s.clone(),
4133 Expr::Property { var, key } => format!("{var}.{key}"),
4134 Expr::PropertyAccess { base, key } => {
4135 if matches!(
4139 base.as_ref(),
4140 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4141 ) {
4142 format!("({}).{key}", render_expr_name(base)?)
4143 } else {
4144 format!("{}.{key}", render_expr_name(base)?)
4145 }
4146 }
4147 Expr::Parameter(name) => format!("${name}"),
4148 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4149 Expr::Literal(Literal::Integer(i)) => i.to_string(),
4150 Expr::Literal(Literal::Float(f)) => f.to_string(),
4151 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4152 Expr::Literal(Literal::Null) => "NULL".into(),
4153 Expr::Call { name, args } => {
4154 let arg_str = match args {
4155 CallArgs::Star => "*".into(),
4156 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4157 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4158 "DISTINCT "
4159 } else {
4160 ""
4161 };
4162 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4163 if inner.len() != es.len() {
4164 return None;
4165 }
4166 format!("{prefix}{}", inner.join(", "))
4167 }
4168 };
4169 format!("{name}({arg_str})")
4170 }
4171 Expr::BinaryOp { op, left, right } => {
4172 let op_str = match op {
4173 BinaryOp::Add => " + ",
4174 BinaryOp::Sub => " - ",
4175 BinaryOp::Mul => " * ",
4176 BinaryOp::Div => " / ",
4177 BinaryOp::Mod => " % ",
4178 BinaryOp::Pow => " ^ ",
4179 };
4180 format!(
4181 "{}{op_str}{}",
4182 render_expr_name(left)?,
4183 render_expr_name(right)?
4184 )
4185 }
4186 Expr::UnaryOp { op, operand } => {
4187 let op_str = match op {
4188 UnaryOp::Neg => "-",
4189 };
4190 format!("{op_str}{}", render_expr_name(operand)?)
4191 }
4192 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4193 Expr::IsNull { negated, inner } => {
4194 if *negated {
4195 format!("{} IS NOT NULL", render_expr_name(inner)?)
4196 } else {
4197 format!("{} IS NULL", render_expr_name(inner)?)
4198 }
4199 }
4200 Expr::Compare { op, left, right } => {
4201 let op_str = match op {
4202 CompareOp::Eq => " = ",
4203 CompareOp::Ne => " <> ",
4204 CompareOp::Lt => " < ",
4205 CompareOp::Le => " <= ",
4206 CompareOp::Gt => " > ",
4207 CompareOp::Ge => " >= ",
4208 CompareOp::StartsWith => " STARTS WITH ",
4209 CompareOp::EndsWith => " ENDS WITH ",
4210 CompareOp::Contains => " CONTAINS ",
4211 CompareOp::RegexMatch => " =~ ",
4212 };
4213 format!(
4214 "{}{op_str}{}",
4215 render_expr_name(left)?,
4216 render_expr_name(right)?
4217 )
4218 }
4219 Expr::List(items) => {
4220 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4221 if inner.len() != items.len() {
4222 return None;
4223 }
4224 format!("[{}]", inner.join(", "))
4225 }
4226 Expr::Map(entries) => {
4227 let inner: Vec<String> = entries
4228 .iter()
4229 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4230 .collect::<Option<Vec<_>>>()?;
4231 format!("{{{}}}", inner.join(", "))
4232 }
4233 Expr::IndexAccess { base, index } => {
4234 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4235 }
4236 Expr::InList { element, list } => {
4237 format!(
4238 "{} IN {}",
4239 render_expr_name(element)?,
4240 render_expr_name(list)?
4241 )
4242 }
4243 Expr::HasLabels { expr, labels } => {
4244 let mut s = format!("({}", render_expr_name(expr)?);
4245 for l in labels {
4246 s.push(':');
4247 s.push_str(l);
4248 }
4249 s.push(')');
4250 s
4251 }
4252 _ => return None,
4253 })
4254}
4255
4256struct DistinctOp {
4257 input: Box<dyn Operator>,
4258 seen: HashSet<String>,
4259}
4260
4261impl DistinctOp {
4262 fn new(input: Box<dyn Operator>) -> Self {
4263 Self {
4264 input,
4265 seen: HashSet::new(),
4266 }
4267 }
4268}
4269
4270impl Operator for DistinctOp {
4271 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4272 while let Some(row) = self.input.next(ctx)? {
4273 let key = row_key(&row);
4274 if self.seen.insert(key) {
4275 return Ok(Some(row));
4276 }
4277 }
4278 Ok(None)
4279 }
4280}
4281
4282struct BindPathOp {
4298 input: Box<dyn Operator>,
4299 path_var: String,
4300 node_vars: Vec<String>,
4301 edge_vars: Vec<String>,
4302}
4303
4304impl BindPathOp {
4305 fn new(
4306 input: Box<dyn Operator>,
4307 path_var: String,
4308 node_vars: Vec<String>,
4309 edge_vars: Vec<String>,
4310 ) -> Self {
4311 Self {
4312 input,
4313 path_var,
4314 node_vars,
4315 edge_vars,
4316 }
4317 }
4318}
4319
4320impl Operator for BindPathOp {
4321 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4322 let Some(mut row) = self.input.next(ctx)? else {
4323 return Ok(None);
4324 };
4325 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4329 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4330 let mut abort = false;
4331 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4338 nodes.push(n.clone());
4339 } else {
4340 abort = true;
4341 }
4342 if !abort {
4343 for (i, ev) in self.edge_vars.iter().enumerate() {
4344 match row.get(ev) {
4345 Some(Value::Edge(e)) => {
4346 edges.push(e.clone());
4347 match row.get(&self.node_vars[i + 1]) {
4348 Some(Value::Node(n)) => nodes.push(n.clone()),
4349 _ => {
4350 abort = true;
4351 break;
4352 }
4353 }
4354 }
4355 Some(Value::Path {
4356 nodes: sub_nodes,
4357 edges: sub_edges,
4358 }) => {
4359 edges.extend(sub_edges.iter().cloned());
4365 if sub_nodes.len() > 1 {
4366 nodes.extend(sub_nodes[1..].iter().cloned());
4367 }
4368 }
4369 _ => {
4370 abort = true;
4371 break;
4372 }
4373 }
4374 }
4375 }
4376 if abort {
4377 row.insert(self.path_var.clone(), Value::Null);
4378 } else {
4379 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4380 }
4381 Ok(Some(row))
4382 }
4383}
4384
4385struct ShortestPathOp {
4404 input: Box<dyn Operator>,
4405 src_var: String,
4406 dst_var: String,
4407 path_var: String,
4408 edge_types: Vec<String>,
4409 direction: meshdb_cypher::Direction,
4410 max_hops: u64,
4411 kind: meshdb_cypher::ShortestKind,
4412 pending: std::collections::VecDeque<(Row, Value)>,
4419}
4420
4421impl ShortestPathOp {
4422 #[allow(clippy::too_many_arguments)]
4423 fn new(
4424 input: Box<dyn Operator>,
4425 src_var: String,
4426 dst_var: String,
4427 path_var: String,
4428 edge_types: Vec<String>,
4429 direction: meshdb_cypher::Direction,
4430 max_hops: u64,
4431 kind: meshdb_cypher::ShortestKind,
4432 ) -> Self {
4433 Self {
4434 input,
4435 src_var,
4436 dst_var,
4437 path_var,
4438 edge_types,
4439 direction,
4440 max_hops,
4441 kind,
4442 pending: std::collections::VecDeque::new(),
4443 }
4444 }
4445}
4446
4447impl Operator for ShortestPathOp {
4448 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4449 loop {
4450 if let Some((mut row, path)) = self.pending.pop_front() {
4455 row.insert(self.path_var.clone(), path);
4456 return Ok(Some(row));
4457 }
4458 let Some(row) = self.input.next(ctx)? else {
4459 return Ok(None);
4460 };
4461 let src = match row.get(&self.src_var) {
4462 Some(Value::Node(n)) => n.clone(),
4463 _ => continue,
4464 };
4465 let dst = match row.get(&self.dst_var) {
4466 Some(Value::Node(n)) => n.clone(),
4467 _ => continue,
4468 };
4469 let paths = bfs_shortest_paths(
4470 &src,
4471 &dst,
4472 &self.edge_types,
4473 self.direction,
4474 self.max_hops,
4475 self.kind,
4476 ctx.store,
4477 )?;
4478 if paths.is_empty() {
4479 continue;
4481 }
4482 for path in paths {
4483 self.pending.push_back((row.clone(), path));
4484 }
4485 }
4486 }
4487}
4488
4489fn bfs_shortest_paths(
4508 src: &Node,
4509 dst: &Node,
4510 edge_types: &[String],
4511 direction: meshdb_cypher::Direction,
4512 max_hops: u64,
4513 kind: meshdb_cypher::ShortestKind,
4514 reader: &dyn crate::reader::GraphReader,
4515) -> Result<Vec<Value>> {
4516 use meshdb_cypher::Direction;
4517
4518 if src.id == dst.id {
4519 return Ok(vec![Value::Path {
4520 nodes: vec![src.clone()],
4521 edges: vec![],
4522 }]);
4523 }
4524
4525 let mut dist: HashMap<NodeId, u64> = HashMap::new();
4531 dist.insert(src.id, 0);
4532 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4533
4534 let mut frontier: Vec<NodeId> = vec![src.id];
4535 let mut depth: u64 = 0;
4536 let mut found = false;
4537
4538 while !frontier.is_empty() && depth < max_hops && !found {
4539 let mut next_frontier: Vec<NodeId> = Vec::new();
4540 for node_id in &frontier {
4541 let neighbors = match direction {
4542 Direction::Outgoing => reader.outgoing(*node_id)?,
4543 Direction::Incoming => reader.incoming(*node_id)?,
4544 Direction::Both => {
4545 let mut out = reader.outgoing(*node_id)?;
4546 out.extend(reader.incoming(*node_id)?);
4547 out
4548 }
4549 };
4550 for (edge_id, neighbor_id) in neighbors {
4551 if !edge_types.is_empty() {
4554 let edge = match reader.get_edge(edge_id)? {
4555 Some(e) => e,
4556 None => continue,
4557 };
4558 if !edge_types.iter().any(|t| t == &edge.edge_type) {
4559 continue;
4560 }
4561 }
4562 match dist.get(&neighbor_id) {
4563 Some(&d) if d == depth + 1 => {
4564 parents
4570 .entry(neighbor_id)
4571 .or_default()
4572 .push((*node_id, edge_id));
4573 }
4574 Some(_) => {
4575 }
4579 None => {
4580 dist.insert(neighbor_id, depth + 1);
4581 parents
4582 .entry(neighbor_id)
4583 .or_default()
4584 .push((*node_id, edge_id));
4585 if neighbor_id == dst.id {
4586 found = true;
4587 } else {
4588 next_frontier.push(neighbor_id);
4589 }
4590 }
4591 }
4592 }
4593 }
4594 depth += 1;
4595 if !found {
4596 frontier = next_frontier;
4597 }
4598 }
4599
4600 if !found {
4601 return Ok(Vec::new());
4602 }
4603
4604 let mut out: Vec<Value> = Vec::new();
4608 let mut nodes_rev: Vec<Node> = Vec::new();
4609 let mut edges_rev: Vec<Edge> = Vec::new();
4610 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4611 collect_shortest_paths(
4612 src,
4613 dst,
4614 &parents,
4615 reader,
4616 &mut nodes_rev,
4617 &mut edges_rev,
4618 &mut out,
4619 only_first,
4620 )?;
4621 Ok(out)
4622}
4623
4624#[allow(clippy::too_many_arguments)]
4636fn collect_shortest_paths(
4637 src: &Node,
4638 current: &Node,
4639 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4640 reader: &dyn crate::reader::GraphReader,
4641 nodes_rev: &mut Vec<Node>,
4642 edges_rev: &mut Vec<Edge>,
4643 out: &mut Vec<Value>,
4644 only_first: bool,
4645) -> Result<()> {
4646 if current.id == src.id {
4647 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4652 nodes.push(src.clone());
4653 nodes.extend(nodes_rev.iter().rev().cloned());
4654 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4655 out.push(Value::Path { nodes, edges });
4656 return Ok(());
4657 }
4658 let Some(parent_edges) = parents.get(¤t.id) else {
4659 return Ok(());
4663 };
4664 for (parent_id, edge_id) in parent_edges {
4665 if only_first && !out.is_empty() {
4666 return Ok(());
4667 }
4668 let edge = reader
4669 .get_edge(*edge_id)?
4670 .expect("BFS inserted this edge id; it must still exist");
4671 let parent_node = reader
4672 .get_node(*parent_id)?
4673 .expect("BFS visited this node id; it must still exist");
4674 nodes_rev.push(current.clone());
4675 edges_rev.push(edge);
4676 collect_shortest_paths(
4677 src,
4678 &parent_node,
4679 parents,
4680 reader,
4681 nodes_rev,
4682 edges_rev,
4683 out,
4684 only_first,
4685 )?;
4686 nodes_rev.pop();
4687 edges_rev.pop();
4688 }
4689 Ok(())
4690}
4691
4692struct UnionOp {
4701 branches: Vec<Box<dyn Operator>>,
4702 current: usize,
4703 seen: Option<HashSet<String>>,
4704}
4705
4706impl UnionOp {
4707 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4708 Self {
4709 branches,
4710 current: 0,
4711 seen: if all { None } else { Some(HashSet::new()) },
4712 }
4713 }
4714}
4715
4716impl Operator for UnionOp {
4717 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4718 while self.current < self.branches.len() {
4719 match self.branches[self.current].next(ctx)? {
4720 Some(row) => {
4721 if let Some(seen) = self.seen.as_mut() {
4722 let key = row_key(&row);
4723 if !seen.insert(key) {
4724 continue;
4725 }
4726 }
4727 return Ok(Some(row));
4728 }
4729 None => {
4730 self.current += 1;
4731 }
4732 }
4733 }
4734 Ok(None)
4735 }
4736}
4737
4738struct OrderByOp {
4739 input: Box<dyn Operator>,
4740 sort_items: Vec<SortItem>,
4741 sorted: Option<Vec<Row>>,
4742 cursor: usize,
4743}
4744
4745impl OrderByOp {
4746 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4747 Self {
4748 input,
4749 sort_items,
4750 sorted: None,
4751 cursor: 0,
4752 }
4753 }
4754}
4755
4756impl Operator for OrderByOp {
4757 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4758 if self.sorted.is_none() {
4759 let mut rows: Vec<Row> = Vec::new();
4760 while let Some(row) = self.input.next(ctx)? {
4761 rows.push(row);
4762 }
4763 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4764 for row in rows {
4765 let mut keys = Vec::with_capacity(self.sort_items.len());
4766 for item in &self.sort_items {
4767 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4768 }
4769 keyed.push((keys, row));
4770 }
4771 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4772 keyed.sort_by(|a, b| {
4773 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4774 let ord = compare_values(va, vb);
4775 let ord = if descs[i] { ord.reverse() } else { ord };
4776 if ord != Ordering::Equal {
4777 return ord;
4778 }
4779 }
4780 Ordering::Equal
4781 });
4782 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4783 }
4784 let rows = self.sorted.as_ref().unwrap();
4785 if self.cursor < rows.len() {
4786 let row = rows[self.cursor].clone();
4787 self.cursor += 1;
4788 Ok(Some(row))
4789 } else {
4790 Ok(None)
4791 }
4792 }
4793}
4794
4795struct AggregateOp {
4796 input: Box<dyn Operator>,
4797 group_keys: Vec<ReturnItem>,
4798 aggregates: Vec<AggregateSpec>,
4799 results: Option<Vec<Row>>,
4800 cursor: usize,
4801}
4802
4803impl AggregateOp {
4804 fn new(
4805 input: Box<dyn Operator>,
4806 group_keys: Vec<ReturnItem>,
4807 aggregates: Vec<AggregateSpec>,
4808 ) -> Self {
4809 Self {
4810 input,
4811 group_keys,
4812 aggregates,
4813 results: None,
4814 cursor: 0,
4815 }
4816 }
4817
4818 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4819 let mut groups: HashMap<String, GroupState> = HashMap::new();
4820 let mut order: Vec<String> = Vec::new();
4821
4822 let mut saw_any = false;
4825
4826 while let Some(row) = self.input.next(ctx)? {
4827 saw_any = true;
4828 let mut key_values = Vec::with_capacity(self.group_keys.len());
4829 for item in &self.group_keys {
4830 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4831 }
4832 let mut hash_key = String::new();
4833 for v in &key_values {
4834 hash_key.push_str(&value_key(v));
4835 hash_key.push('|');
4836 }
4837 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4838 order.push(hash_key.clone());
4839 GroupState {
4840 key_values: key_values.clone(),
4841 agg_states: self
4842 .aggregates
4843 .iter()
4844 .map(|a| AggState::initial(a.function))
4845 .collect(),
4846 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4847 }
4848 });
4849 for (i, spec) in self.aggregates.iter().enumerate() {
4850 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4851 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4852 if matches!(v, Value::Null) {
4853 continue;
4854 }
4855 let key = value_key(&v);
4856 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4857 if !seen.insert(key) {
4858 continue;
4859 }
4860 }
4861 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4862 if let Some(extra_expr) = &spec.extra_arg {
4866 let need_resolve = matches!(
4867 &entry.agg_states[i],
4868 AggState::PercentileDisc {
4869 percentile: None,
4870 ..
4871 } | AggState::PercentileCont {
4872 percentile: None,
4873 ..
4874 }
4875 );
4876 if need_resolve {
4877 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4878 let p = match pv {
4879 Value::Property(Property::Float64(f)) => f,
4880 Value::Property(Property::Int64(i)) => i as f64,
4881 _ => 0.0,
4882 };
4883 if !(0.0..=1.0).contains(&p) || p.is_nan() {
4887 return Err(Error::Procedure(format!("percentile out of range: {p}")));
4888 }
4889 match &mut entry.agg_states[i] {
4890 AggState::PercentileDisc { percentile, .. }
4891 | AggState::PercentileCont { percentile, .. } => {
4892 *percentile = Some(p);
4893 }
4894 _ => {}
4895 }
4896 }
4897 }
4898 }
4899 }
4900
4901 let mut out = Vec::new();
4902 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4903 let mut row = Row::new();
4905 for spec in &self.aggregates {
4906 row.insert(
4907 spec.alias.clone(),
4908 AggState::initial(spec.function).finalize(),
4909 );
4910 }
4911 out.push(row);
4912 } else {
4913 for key in order {
4914 let state = groups.remove(&key).unwrap();
4915 let mut row = Row::new();
4916 for (i, item) in self.group_keys.iter().enumerate() {
4917 let name = item
4918 .alias
4919 .clone()
4920 .unwrap_or_else(|| default_name(&item.expr, i));
4921 row.insert(name, state.key_values[i].clone());
4922 }
4923 for (i, spec) in self.aggregates.iter().enumerate() {
4924 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4925 }
4926 out.push(row);
4927 }
4928 }
4929 self.results = Some(out);
4930 Ok(())
4931 }
4932}
4933
4934impl Operator for AggregateOp {
4935 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4936 if self.results.is_none() {
4937 self.compute(ctx)?;
4938 }
4939 let rows = self.results.as_ref().unwrap();
4940 if self.cursor < rows.len() {
4941 let row = rows[self.cursor].clone();
4942 self.cursor += 1;
4943 Ok(Some(row))
4944 } else {
4945 Ok(None)
4946 }
4947 }
4948}
4949
4950struct GroupState {
4951 key_values: Vec<Value>,
4952 agg_states: Vec<AggState>,
4953 distinct_seen: Vec<Option<HashSet<String>>>,
4954}
4955
4956enum AggState {
4957 Count(i64),
4958 Sum {
4959 int_part: i64,
4960 float_part: f64,
4961 is_float: bool,
4962 },
4963 Avg {
4964 total: f64,
4965 count: i64,
4966 },
4967 Min(Option<Value>),
4968 Max(Option<Value>),
4969 Collect(Vec<Value>),
4970 StDev {
4971 sum: f64,
4972 sum_sq: f64,
4973 count: i64,
4974 },
4975 StDevP {
4976 sum: f64,
4977 sum_sq: f64,
4978 count: i64,
4979 },
4980 PercentileDisc {
4981 items: Vec<Value>,
4982 percentile: Option<f64>,
4983 },
4984 PercentileCont {
4985 items: Vec<Value>,
4986 percentile: Option<f64>,
4987 },
4988}
4989
4990impl AggState {
4991 fn initial(func: AggregateFn) -> Self {
4992 match func {
4993 AggregateFn::Count => AggState::Count(0),
4994 AggregateFn::Sum => AggState::Sum {
4995 int_part: 0,
4996 float_part: 0.0,
4997 is_float: false,
4998 },
4999 AggregateFn::Avg => AggState::Avg {
5000 total: 0.0,
5001 count: 0,
5002 },
5003 AggregateFn::Min => AggState::Min(None),
5004 AggregateFn::Max => AggState::Max(None),
5005 AggregateFn::Collect => AggState::Collect(Vec::new()),
5006 AggregateFn::StDev => AggState::StDev {
5007 sum: 0.0,
5008 sum_sq: 0.0,
5009 count: 0,
5010 },
5011 AggregateFn::StDevP => AggState::StDevP {
5012 sum: 0.0,
5013 sum_sq: 0.0,
5014 count: 0,
5015 },
5016 AggregateFn::PercentileDisc => AggState::PercentileDisc {
5017 items: Vec::new(),
5018 percentile: None,
5019 },
5020 AggregateFn::PercentileCont => AggState::PercentileCont {
5021 items: Vec::new(),
5022 percentile: None,
5023 },
5024 }
5025 }
5026
5027 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
5028 match self {
5029 AggState::Count(c) => match arg {
5030 AggregateArg::Star => *c += 1,
5031 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
5032 if !matches!(eval_expr(e, ctx)?, Value::Null) {
5033 *c += 1;
5034 }
5035 }
5036 },
5037 AggState::Sum {
5038 int_part,
5039 float_part,
5040 is_float,
5041 } => {
5042 let v = expr_arg_value(arg, ctx)?;
5043 match v {
5044 Value::Null => {}
5045 Value::Property(Property::Int64(i)) => *int_part += i,
5046 Value::Property(Property::Float64(f)) => {
5047 *float_part += f;
5048 *is_float = true;
5049 }
5050 _ => return Err(Error::AggregateTypeError),
5051 }
5052 }
5053 AggState::Avg { total, count } => {
5054 let v = expr_arg_value(arg, ctx)?;
5055 match v {
5056 Value::Null => {}
5057 Value::Property(Property::Int64(i)) => {
5058 *total += i as f64;
5059 *count += 1;
5060 }
5061 Value::Property(Property::Float64(f)) => {
5062 *total += f;
5063 *count += 1;
5064 }
5065 _ => return Err(Error::AggregateTypeError),
5066 }
5067 }
5068 AggState::Min(slot) => {
5069 let v = expr_arg_value(arg, ctx)?;
5076 if matches!(v, Value::Null | Value::Property(Property::Null)) {
5077 } else {
5079 match slot {
5080 None => *slot = Some(v),
5081 Some(cur) => {
5082 if compare_values(&v, cur) == Ordering::Less {
5083 *cur = v;
5084 }
5085 }
5086 }
5087 }
5088 }
5089 AggState::Max(slot) => {
5090 let v = expr_arg_value(arg, ctx)?;
5091 if matches!(v, Value::Null | Value::Property(Property::Null)) {
5092 } else {
5094 match slot {
5095 None => *slot = Some(v),
5096 Some(cur) => {
5097 if compare_values(&v, cur) == Ordering::Greater {
5098 *cur = v;
5099 }
5100 }
5101 }
5102 }
5103 }
5104 AggState::Collect(items) => {
5105 let v = expr_arg_value(arg, ctx)?;
5106 if !matches!(v, Value::Null) {
5107 items.push(v);
5108 }
5109 }
5110 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5111 let v = expr_arg_value(arg, ctx)?;
5112 if !matches!(v, Value::Null) {
5113 items.push(v);
5114 }
5115 }
5116 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5117 let v = expr_arg_value(arg, ctx)?;
5118 match v {
5119 Value::Null => {}
5120 Value::Property(Property::Int64(i)) => {
5121 let f = i as f64;
5122 *sum += f;
5123 *sum_sq += f * f;
5124 *count += 1;
5125 }
5126 Value::Property(Property::Float64(f)) => {
5127 *sum += f;
5128 *sum_sq += f * f;
5129 *count += 1;
5130 }
5131 _ => return Err(Error::AggregateTypeError),
5132 }
5133 }
5134 }
5135 Ok(())
5136 }
5137
5138 fn finalize(&self) -> Value {
5139 match self {
5140 AggState::Count(c) => Value::Property(Property::Int64(*c)),
5141 AggState::Sum {
5142 int_part,
5143 float_part,
5144 is_float,
5145 } => {
5146 if *is_float {
5147 Value::Property(Property::Float64(*float_part + *int_part as f64))
5148 } else {
5149 Value::Property(Property::Int64(*int_part))
5150 }
5151 }
5152 AggState::Avg { total, count } => {
5153 if *count == 0 {
5154 Value::Null
5155 } else {
5156 Value::Property(Property::Float64(*total / *count as f64))
5157 }
5158 }
5159 AggState::Min(slot) | AggState::Max(slot) => match slot {
5160 Some(v) => v.clone(),
5161 None => Value::Null,
5162 },
5163 AggState::Collect(items) => Value::List(items.clone()),
5164 AggState::StDevP { sum, sum_sq, count } => {
5165 if *count == 0 {
5166 Value::Property(Property::Float64(0.0))
5167 } else {
5168 let n = *count as f64;
5169 let variance = *sum_sq / n - (*sum / n).powi(2);
5170 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5171 }
5172 }
5173 AggState::StDev { sum, sum_sq, count } => {
5174 if *count < 2 {
5175 Value::Property(Property::Float64(0.0))
5176 } else {
5177 let n = *count as f64;
5178 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5179 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5180 }
5181 }
5182 AggState::PercentileDisc { items, percentile } => {
5183 percentile_disc(items, percentile.unwrap_or(0.0))
5184 }
5185 AggState::PercentileCont { items, percentile } => {
5186 percentile_cont(items, percentile.unwrap_or(0.0))
5187 }
5188 }
5189 }
5190}
5191
5192fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5193 match arg {
5194 AggregateArg::Star => Err(Error::AggregateTypeError),
5195 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5196 }
5197}
5198
5199fn value_to_f64(v: &Value) -> f64 {
5203 match v {
5204 Value::Property(Property::Int64(i)) => *i as f64,
5205 Value::Property(Property::Float64(f)) => *f,
5206 _ => f64::NAN,
5207 }
5208}
5209
5210fn percentile_disc(items: &[Value], p: f64) -> Value {
5215 let mut nums: Vec<(f64, Value)> = items
5216 .iter()
5217 .map(|v| (value_to_f64(v), v.clone()))
5218 .filter(|(f, _)| !f.is_nan())
5219 .collect();
5220 if nums.is_empty() {
5221 return Value::Null;
5222 }
5223 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5224 let p = p.clamp(0.0, 1.0);
5225 let n = nums.len();
5226 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5228 nums[idx.min(n - 1)].1.clone()
5229}
5230
5231fn percentile_cont(items: &[Value], p: f64) -> Value {
5235 let mut nums: Vec<f64> = items
5236 .iter()
5237 .map(value_to_f64)
5238 .filter(|f| !f.is_nan())
5239 .collect();
5240 if nums.is_empty() {
5241 return Value::Null;
5242 }
5243 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5244 let p = p.clamp(0.0, 1.0);
5245 let n = nums.len();
5246 if n == 1 {
5247 return Value::Property(Property::Float64(nums[0]));
5248 }
5249 let pos = p * (n as f64 - 1.0);
5250 let lo = pos.floor() as usize;
5251 let hi = pos.ceil() as usize;
5252 let frac = pos - lo as f64;
5253 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5254 Value::Property(Property::Float64(v))
5255}
5256
5257struct SkipOp {
5258 input: Box<dyn Operator>,
5259 count_expr: Expr,
5260 remaining: Option<i64>,
5261}
5262
5263impl SkipOp {
5264 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5265 Self {
5266 input,
5267 count_expr,
5268 remaining: None,
5269 }
5270 }
5271}
5272
5273impl Operator for SkipOp {
5274 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5275 if self.remaining.is_none() {
5276 let empty = Row::new();
5277 let ectx = ctx.eval_ctx(&empty);
5278 let val = eval_expr(&self.count_expr, &ectx)?;
5279 self.remaining = Some(expr_to_count(val)?);
5280 }
5281 let rem = self.remaining.as_mut().unwrap();
5282 while *rem > 0 {
5283 if self.input.next(ctx)?.is_none() {
5284 return Ok(None);
5285 }
5286 *rem -= 1;
5287 }
5288 self.input.next(ctx)
5289 }
5290}
5291
5292struct LimitOp {
5293 input: Box<dyn Operator>,
5294 count_expr: Expr,
5295 remaining: Option<i64>,
5296}
5297
5298impl LimitOp {
5299 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5300 Self {
5301 input,
5302 count_expr,
5303 remaining: None,
5304 }
5305 }
5306}
5307
5308impl Operator for LimitOp {
5309 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5310 if self.remaining.is_none() {
5311 let empty = Row::new();
5312 let ectx = ctx.eval_ctx(&empty);
5313 let val = eval_expr(&self.count_expr, &ectx)?;
5314 self.remaining = Some(expr_to_count(val)?);
5315 }
5316 let rem = self.remaining.as_mut().unwrap();
5317 if *rem <= 0 {
5318 return Ok(None);
5319 }
5320 match self.input.next(ctx)? {
5321 Some(row) => {
5322 *rem -= 1;
5323 Ok(Some(row))
5324 }
5325 None => Ok(None),
5326 }
5327 }
5328}
5329
5330fn expr_to_count(val: Value) -> Result<i64> {
5331 match val {
5332 Value::Null | Value::Property(Property::Null) => Ok(0),
5333 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5334 _ => Err(Error::TypeMismatch),
5339 }
5340}