1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12 ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13 Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14 ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17 ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18 PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24#[derive(Default)]
30pub struct Tombstones {
31 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36 pub store: &'a dyn GraphReader,
37 pub writer: &'a dyn GraphWriter,
38 pub params: &'a ParamMap,
43 pub procedures: &'a ProcedureRegistry,
48 pub outer_rows: &'a [&'a Row],
56 pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64 fn put_node(&self, _: &Node) -> Result<()> {
65 Ok(())
66 }
67 fn put_edge(&self, _: &Edge) -> Result<()> {
68 Ok(())
69 }
70 fn delete_edge(&self, _: EdgeId) -> Result<()> {
71 Ok(())
72 }
73 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74 Ok(())
75 }
76}
77
78impl<'a> ExecCtx<'a> {
79 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85 where
86 'a: 'b,
87 {
88 EvalCtx {
89 row,
90 params: self.params,
91 reader: self.store,
92 procedures: self.procedures,
93 outer_rows: self.outer_rows,
94 tombstones: self.tombstones,
95 }
96 }
97
98 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104 if let Some(v) = row.get(name) {
105 return Some(v);
106 }
107 for outer in self.outer_rows {
108 if let Some(v) = outer.get(name) {
109 return Some(v);
110 }
111 }
112 None
113 }
114}
115
116pub trait Operator {
117 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124 let params = ParamMap::new();
125 execute_with_reader(
126 plan,
127 store as &dyn GraphReader,
128 store as &dyn GraphWriter,
129 ¶ms,
130 )
131}
132
133pub fn execute_with_writer(
138 plan: &LogicalPlan,
139 store: &RocksDbStorageEngine,
140 writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142 let params = ParamMap::new();
143 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
144}
145
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151 let text = meshdb_cypher::format_plan(plan);
152 let mut row = Row::new();
153 row.insert("plan".to_string(), Value::Property(Property::String(text)));
154 vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158 let result_rows = execute(plan, store)?;
159 let row_count = result_rows.len() as i64;
160 let plan_text = meshdb_cypher::format_plan(plan);
161 let summary = format!("{plan_text}\nRows: {row_count}");
162 let mut row = Row::new();
163 row.insert(
164 "profile".to_string(),
165 Value::Property(Property::String(summary)),
166 );
167 row.insert(
168 "rows".to_string(),
169 Value::Property(Property::Int64(row_count)),
170 );
171 Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175 plan: &LogicalPlan,
176 reader: &dyn GraphReader,
177 writer: &dyn GraphWriter,
178 params: &ParamMap,
179) -> Result<Vec<Row>> {
180 let empty_procs = ProcedureRegistry::new();
181 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184pub fn execute_with_reader_and_procs(
190 plan: &LogicalPlan,
191 reader: &dyn GraphReader,
192 writer: &dyn GraphWriter,
193 params: &ParamMap,
194 procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196 crate::eval::reset_statement_time();
201 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206 return Ok(rows);
207 }
208 let suppress_output = is_write_only_plan(plan);
209 let mut op = build_op(plan);
210 let tombstones = Tombstones::default();
211 let ctx = ExecCtx {
212 store: reader,
213 writer,
214 params,
215 procedures,
216 outer_rows: &[],
217 tombstones: &tombstones,
218 };
219 let mut rows = Vec::new();
220 while let Some(row) = op.next(&ctx)? {
221 rows.push(row);
222 }
223 if suppress_output {
224 Ok(Vec::new())
225 } else {
226 Ok(rows)
227 }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231 match plan {
235 LogicalPlan::CreatePath { .. }
236 | LogicalPlan::Delete { .. }
237 | LogicalPlan::SetProperty { .. }
238 | LogicalPlan::Remove { .. }
239 | LogicalPlan::Foreach { .. }
240 | LogicalPlan::MergeNode { .. }
241 | LogicalPlan::MergeEdge { .. } => true,
242 _ => false,
243 }
244}
245
246fn try_execute_ddl(
256 plan: &LogicalPlan,
257 reader: &dyn GraphReader,
258 writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260 match plan {
261 LogicalPlan::CreatePropertyIndex { label, properties } => {
262 writer.create_property_index(label, properties)?;
263 Ok(Some(vec![node_index_ddl_ack_row(
264 "created", label, properties,
265 )]))
266 }
267 LogicalPlan::DropPropertyIndex { label, properties } => {
268 writer.drop_property_index(label, properties)?;
269 Ok(Some(vec![node_index_ddl_ack_row(
270 "dropped", label, properties,
271 )]))
272 }
273 LogicalPlan::CreateEdgePropertyIndex {
274 edge_type,
275 properties,
276 } => {
277 writer.create_edge_property_index(edge_type, properties)?;
278 Ok(Some(vec![edge_index_ddl_ack_row(
279 "created", edge_type, properties,
280 )]))
281 }
282 LogicalPlan::DropEdgePropertyIndex {
283 edge_type,
284 properties,
285 } => {
286 writer.drop_edge_property_index(edge_type, properties)?;
287 Ok(Some(vec![edge_index_ddl_ack_row(
288 "dropped", edge_type, properties,
289 )]))
290 }
291 LogicalPlan::ShowPropertyIndexes => {
292 let mut rows: Vec<Row> = Vec::new();
298 for (label, properties) in reader.list_property_indexes()? {
299 rows.push(show_index_row("NODE", label, properties));
300 }
301 for (edge_type, properties) in reader.list_edge_property_indexes()? {
302 rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
303 }
304 Ok(Some(rows))
305 }
306 LogicalPlan::CreatePointIndex { label, property } => {
307 writer.create_point_index(label, property)?;
308 Ok(Some(vec![point_index_ddl_ack_row(
309 "created", "NODE", label, property,
310 )]))
311 }
312 LogicalPlan::DropPointIndex { label, property } => {
313 writer.drop_point_index(label, property)?;
314 Ok(Some(vec![point_index_ddl_ack_row(
315 "dropped", "NODE", label, property,
316 )]))
317 }
318 LogicalPlan::CreateEdgePointIndex {
319 edge_type,
320 property,
321 } => {
322 writer.create_edge_point_index(edge_type, property)?;
323 Ok(Some(vec![point_index_ddl_ack_row(
324 "created",
325 "RELATIONSHIP",
326 edge_type,
327 property,
328 )]))
329 }
330 LogicalPlan::DropEdgePointIndex {
331 edge_type,
332 property,
333 } => {
334 writer.drop_edge_point_index(edge_type, property)?;
335 Ok(Some(vec![point_index_ddl_ack_row(
336 "dropped",
337 "RELATIONSHIP",
338 edge_type,
339 property,
340 )]))
341 }
342 LogicalPlan::ShowPointIndexes => {
343 let mut rows: Vec<Row> = Vec::new();
350 for (label, property) in reader.list_point_indexes()? {
351 rows.push(show_point_index_row("NODE", label, property));
352 }
353 for (edge_type, property) in reader.list_edge_point_indexes()? {
354 rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
355 }
356 Ok(Some(rows))
357 }
358 LogicalPlan::CreatePropertyConstraint {
359 name,
360 scope,
361 properties,
362 kind,
363 if_not_exists,
364 } => {
365 let storage_kind = match kind {
366 ConstraintKind::Unique => PropertyConstraintKind::Unique,
367 ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
368 ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
369 ConstraintKind::PropertyType(t) => {
370 PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
371 }
372 };
373 let storage_scope = cypher_to_storage_scope(scope);
374 let spec = writer.create_property_constraint(
375 name.as_deref(),
376 &storage_scope,
377 properties,
378 storage_kind,
379 *if_not_exists,
380 )?;
381 Ok(Some(vec![constraint_ack_row("created", &spec)]))
382 }
383 LogicalPlan::DropPropertyConstraint { name, if_exists } => {
384 writer.drop_property_constraint(name, *if_exists)?;
385 let mut row = Row::default();
386 row.insert(
387 "state".into(),
388 Value::Property(Property::String("dropped".into())),
389 );
390 row.insert(
391 "name".into(),
392 Value::Property(Property::String(name.clone())),
393 );
394 Ok(Some(vec![row]))
395 }
396 LogicalPlan::ShowPropertyConstraints => {
397 let specs = reader.list_property_constraints()?;
398 let rows = specs.into_iter().map(constraint_show_row).collect();
399 Ok(Some(rows))
400 }
401 _ => Ok(None),
402 }
403}
404
405fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
410 let mut row = constraint_show_row(spec.clone());
411 row.insert(
412 "state".into(),
413 Value::Property(Property::String(state.into())),
414 );
415 row
416}
417
418fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
424 let mut row = Row::default();
425 row.insert("name".into(), Value::Property(Property::String(spec.name)));
426 let (scope_tag, target) = match spec.scope {
427 meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
428 meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
429 };
430 row.insert(
431 "scope".into(),
432 Value::Property(Property::String(scope_tag.into())),
433 );
434 row.insert("label".into(), Value::Property(Property::String(target)));
439 let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
440 row.insert("properties".into(), Value::Property(Property::List(props)));
441 row.insert(
442 "type".into(),
443 Value::Property(Property::String(spec.kind.as_string())),
444 );
445 row
446}
447
448fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
452 match scope {
453 CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
454 CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
455 }
456}
457
458fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
463 match t {
464 CypherPropertyType::String => StoragePropertyType::String,
465 CypherPropertyType::Integer => StoragePropertyType::Integer,
466 CypherPropertyType::Float => StoragePropertyType::Float,
467 CypherPropertyType::Boolean => StoragePropertyType::Boolean,
468 }
469}
470
471fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
478 let mut row = Row::default();
479 row.insert(
480 "state".into(),
481 Value::Property(Property::String(state.into())),
482 );
483 row.insert(
484 "scope".into(),
485 Value::Property(Property::String("NODE".into())),
486 );
487 row.insert(
488 "label".into(),
489 Value::Property(Property::String(label.into())),
490 );
491 row.insert(
496 "property".into(),
497 Value::Property(Property::String(properties.join(","))),
498 );
499 row.insert("properties".into(), properties_list_value(properties));
500 row
501}
502
503fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
509 let mut row = Row::default();
510 row.insert(
511 "state".into(),
512 Value::Property(Property::String(state.into())),
513 );
514 row.insert(
515 "scope".into(),
516 Value::Property(Property::String("RELATIONSHIP".into())),
517 );
518 row.insert(
519 "label".into(),
520 Value::Property(Property::String(edge_type.into())),
521 );
522 row.insert(
523 "edge_type".into(),
524 Value::Property(Property::String(edge_type.into())),
525 );
526 row.insert(
527 "property".into(),
528 Value::Property(Property::String(properties.join(","))),
529 );
530 row.insert("properties".into(), properties_list_value(properties));
531 row
532}
533
534fn properties_list_value(properties: &[String]) -> Value {
535 Value::List(
536 properties
537 .iter()
538 .map(|p| Value::Property(Property::String(p.clone())))
539 .collect(),
540 )
541}
542
543fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
550 let mut row = Row::default();
551 row.insert(
552 "state".into(),
553 Value::Property(Property::String(state.into())),
554 );
555 row.insert(
556 "scope".into(),
557 Value::Property(Property::String(scope.into())),
558 );
559 row.insert(
560 "type".into(),
561 Value::Property(Property::String("POINT".into())),
562 );
563 row.insert(
564 "label".into(),
565 Value::Property(Property::String(target.into())),
566 );
567 if scope == "RELATIONSHIP" {
568 row.insert(
569 "edge_type".into(),
570 Value::Property(Property::String(target.into())),
571 );
572 }
573 row.insert(
574 "property".into(),
575 Value::Property(Property::String(property.into())),
576 );
577 row
578}
579
580fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
585 let mut row = Row::default();
586 row.insert(
587 "scope".into(),
588 Value::Property(Property::String(scope.into())),
589 );
590 row.insert(
591 "type".into(),
592 Value::Property(Property::String("POINT".into())),
593 );
594 row.insert(
595 "label".into(),
596 Value::Property(Property::String(target.clone())),
597 );
598 if scope == "RELATIONSHIP" {
599 row.insert(
600 "edge_type".into(),
601 Value::Property(Property::String(target)),
602 );
603 }
604 row.insert(
605 "property".into(),
606 Value::Property(Property::String(property)),
607 );
608 row.insert(
609 "state".into(),
610 Value::Property(Property::String("online".into())),
611 );
612 row
613}
614
615fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
623 let mut row = Row::default();
624 row.insert(
625 "scope".into(),
626 Value::Property(Property::String(scope.into())),
627 );
628 row.insert(
629 "label".into(),
630 Value::Property(Property::String(target.clone())),
631 );
632 if scope == "RELATIONSHIP" {
633 row.insert(
634 "edge_type".into(),
635 Value::Property(Property::String(target)),
636 );
637 }
638 row.insert(
639 "property".into(),
640 Value::Property(Property::String(properties.join(","))),
641 );
642 row.insert("properties".into(), properties_list_value(&properties));
643 row.insert(
644 "state".into(),
645 Value::Property(Property::String("online".into())),
646 );
647 row
648}
649
650fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
651 build_op_inner(plan, None)
652}
653
654pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
655 macro_rules! child {
656 ($p:expr) => {
657 build_op_inner($p, seed)
658 };
659 }
660 match plan {
661 LogicalPlan::CreatePath {
662 input,
663 nodes,
664 edges,
665 } => Box::new(CreatePathOp::new(
666 input.as_ref().map(|p| child!(p)),
667 nodes.clone(),
668 edges.clone(),
669 )),
670 LogicalPlan::CartesianProduct { left, right } => {
671 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
672 }
673 LogicalPlan::Delete {
674 input,
675 detach,
676 vars,
677 exprs,
678 } => Box::new(DeleteOp::new(
679 child!(input),
680 *detach,
681 vars.clone(),
682 exprs.clone(),
683 )),
684 LogicalPlan::SetProperty { input, assignments } => {
685 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
686 }
687 LogicalPlan::Remove { input, items } => {
688 Box::new(RemoveOp::new(child!(input), items.clone()))
689 }
690 LogicalPlan::LoadCsv {
691 input,
692 path_expr,
693 var,
694 with_headers,
695 } => Box::new(LoadCsvOp::new(
696 input.as_ref().map(|p| child!(p)),
697 path_expr.clone(),
698 var.clone(),
699 *with_headers,
700 )),
701 LogicalPlan::Foreach {
702 input,
703 var,
704 list_expr,
705 set_assignments,
706 remove_items,
707 } => Box::new(ForeachOp::new(
708 child!(input),
709 var.clone(),
710 list_expr.clone(),
711 set_assignments.clone(),
712 remove_items.clone(),
713 )),
714 LogicalPlan::CallSubquery { input, body } => {
715 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
716 }
717 LogicalPlan::OptionalApply {
718 input,
719 body,
720 null_vars,
721 } => Box::new(OptionalApplyOp::new(
722 child!(input),
723 (**body).clone(),
724 null_vars.clone(),
725 )),
726 LogicalPlan::ProcedureCall {
727 input,
728 qualified_name,
729 args,
730 yield_spec,
731 standalone,
732 } => Box::new(ProcedureCallOp::new(
733 input.as_ref().map(|p| child!(p)),
734 qualified_name.clone(),
735 args.clone(),
736 yield_spec.clone(),
737 *standalone,
738 )),
739 LogicalPlan::SeedRow => match seed {
740 Some(r) => Box::new(SeededRowOp {
741 row: Some(r.clone()),
742 }),
743 None => Box::new(SeedRowOp { done: false }),
744 },
745 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
746 LogicalPlan::NodeScanByLabels { var, labels } => {
747 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
748 }
749 LogicalPlan::EdgeExpand {
750 input,
751 src_var,
752 edge_var,
753 dst_var,
754 dst_labels,
755 edge_properties,
756 edge_types,
757 direction,
758 edge_constraint_var,
759 } => Box::new(EdgeExpandOp::new(
760 child!(input),
761 src_var.clone(),
762 edge_var.clone(),
763 dst_var.clone(),
764 dst_labels.clone(),
765 edge_properties.clone(),
766 edge_types.clone(),
767 *direction,
768 edge_constraint_var.clone(),
769 )),
770 LogicalPlan::OptionalEdgeExpand {
771 input,
772 src_var,
773 edge_var,
774 dst_var,
775 dst_labels,
776 dst_properties,
777 edge_types,
778 direction,
779 dst_constraint_var,
780 edge_constraint_var,
781 } => Box::new(OptionalEdgeExpandOp::new(
782 child!(input),
783 src_var.clone(),
784 edge_var.clone(),
785 dst_var.clone(),
786 dst_labels.clone(),
787 dst_properties.clone(),
788 edge_types.clone(),
789 *direction,
790 dst_constraint_var.clone(),
791 edge_constraint_var.clone(),
792 )),
793 LogicalPlan::VarLengthExpand {
794 input,
795 src_var,
796 edge_var,
797 dst_var,
798 dst_labels,
799 edge_types,
800 edge_properties,
801 direction,
802 min_hops,
803 max_hops,
804 path_var,
805 optional,
806 dst_constraint_var,
807 bound_edge_list_var,
808 excluded_edge_vars,
809 } => Box::new(VarLengthExpandOp::new(
810 child!(input),
811 src_var.clone(),
812 edge_var.clone(),
813 dst_var.clone(),
814 dst_labels.clone(),
815 edge_types.clone(),
816 edge_properties.clone(),
817 *direction,
818 *min_hops,
819 *max_hops,
820 path_var.clone(),
821 *optional,
822 dst_constraint_var.clone(),
823 bound_edge_list_var.clone(),
824 excluded_edge_vars.clone(),
825 )),
826 LogicalPlan::Filter { input, predicate } => {
827 Box::new(FilterOp::new(child!(input), predicate.clone()))
828 }
829 LogicalPlan::Project { input, items } => {
830 Box::new(ProjectOp::new(child!(input), items.clone()))
831 }
832 LogicalPlan::Aggregate {
833 input,
834 group_keys,
835 aggregates,
836 } => Box::new(AggregateOp::new(
837 child!(input),
838 group_keys.clone(),
839 aggregates.clone(),
840 )),
841 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
842 LogicalPlan::CoalesceNullRow { input, null_vars } => {
843 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
844 }
845 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
846 LogicalPlan::OrderBy { input, sort_items } => {
847 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
848 }
849 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
850 LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
851 LogicalPlan::MergeNode {
852 input,
853 var,
854 labels,
855 properties,
856 on_create,
857 on_match,
858 } => Box::new(MergeNodeOp::new(
859 input.as_ref().map(|p| child!(p)),
860 var.clone(),
861 labels.clone(),
862 properties.clone(),
863 on_create.clone(),
864 on_match.clone(),
865 )),
866 LogicalPlan::MergeEdge {
867 input,
868 edge_var,
869 src_var,
870 dst_var,
871 edge_type,
872 undirected,
873 properties,
874 on_create,
875 on_match,
876 } => Box::new(MergeEdgeOp::new(
877 child!(input),
878 edge_var.clone(),
879 src_var.clone(),
880 dst_var.clone(),
881 edge_type.clone(),
882 *undirected,
883 properties.clone(),
884 on_create.clone(),
885 on_match.clone(),
886 )),
887 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
888 LogicalPlan::UnwindChain { input, var, expr } => {
889 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
890 }
891 LogicalPlan::IndexSeek {
892 var,
893 label,
894 properties,
895 values,
896 } => Box::new(IndexSeekOp::new(
897 var.clone(),
898 label.clone(),
899 properties.clone(),
900 values.clone(),
901 )),
902 LogicalPlan::PointIndexSeek {
903 var,
904 label,
905 property,
906 bounds,
907 } => Box::new(PointIndexSeekOp::new(
908 var.clone(),
909 label.clone(),
910 property.clone(),
911 bounds.clone(),
912 )),
913 LogicalPlan::EdgeSeek {
914 edge_var,
915 src_var,
916 dst_var,
917 edge_type,
918 property,
919 value,
920 direction,
921 residual_properties,
922 } => Box::new(EdgeSeekOp::new(
923 edge_var.clone(),
924 src_var.clone(),
925 dst_var.clone(),
926 edge_type.clone(),
927 property.clone(),
928 value.clone(),
929 *direction,
930 residual_properties.clone(),
931 )),
932 LogicalPlan::EdgePointIndexSeek {
933 edge_var,
934 src_var,
935 dst_var,
936 edge_type,
937 property,
938 direction,
939 bounds,
940 } => Box::new(EdgePointIndexSeekOp::new(
941 edge_var.clone(),
942 src_var.clone(),
943 dst_var.clone(),
944 edge_type.clone(),
945 property.clone(),
946 *direction,
947 bounds.clone(),
948 )),
949 LogicalPlan::Union { branches, all } => {
954 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
955 Box::new(UnionOp::new(branch_ops, *all))
956 }
957 LogicalPlan::BindPath {
958 input,
959 path_var,
960 node_vars,
961 edge_vars,
962 } => Box::new(BindPathOp::new(
963 child!(input),
964 path_var.clone(),
965 node_vars.clone(),
966 edge_vars.clone(),
967 )),
968 LogicalPlan::ShortestPath {
969 input,
970 src_var,
971 dst_var,
972 path_var,
973 edge_types,
974 direction,
975 max_hops,
976 kind,
977 } => Box::new(ShortestPathOp::new(
978 child!(input),
979 src_var.clone(),
980 dst_var.clone(),
981 path_var.clone(),
982 edge_types.clone(),
983 *direction,
984 *max_hops,
985 *kind,
986 )),
987 LogicalPlan::CreatePropertyIndex { .. }
988 | LogicalPlan::DropPropertyIndex { .. }
989 | LogicalPlan::CreateEdgePropertyIndex { .. }
990 | LogicalPlan::DropEdgePropertyIndex { .. }
991 | LogicalPlan::ShowPropertyIndexes
992 | LogicalPlan::CreatePointIndex { .. }
993 | LogicalPlan::DropPointIndex { .. }
994 | LogicalPlan::CreateEdgePointIndex { .. }
995 | LogicalPlan::DropEdgePointIndex { .. }
996 | LogicalPlan::ShowPointIndexes
997 | LogicalPlan::CreatePropertyConstraint { .. }
998 | LogicalPlan::DropPropertyConstraint { .. }
999 | LogicalPlan::ShowPropertyConstraints => {
1000 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1001 }
1002 }
1003}
1004
1005struct UnwindOp {
1006 var: String,
1007 expr: Expr,
1008 items: Option<Vec<Value>>,
1009 cursor: usize,
1010}
1011
1012impl UnwindOp {
1013 fn new(var: String, expr: Expr) -> Self {
1014 Self {
1015 var,
1016 expr,
1017 items: None,
1018 cursor: 0,
1019 }
1020 }
1021}
1022
1023impl Operator for UnwindOp {
1024 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1025 if self.items.is_none() {
1026 let empty = Row::new();
1027 let ectx = EvalCtx {
1028 row: &empty,
1029 params: ctx.params,
1030 reader: ctx.store,
1031 procedures: ctx.procedures,
1032 outer_rows: ctx.outer_rows,
1033 tombstones: ctx.tombstones,
1034 };
1035 let val = eval_expr(&self.expr, &ectx)?;
1036 self.items = Some(coerce_unwind_list(val)?);
1037 }
1038 let items = self.items.as_ref().unwrap();
1039 if self.cursor < items.len() {
1040 let v = items[self.cursor].clone();
1041 self.cursor += 1;
1042 let mut row = Row::new();
1043 row.insert(self.var.clone(), v);
1044 Ok(Some(row))
1045 } else {
1046 Ok(None)
1047 }
1048 }
1049}
1050
1051struct UnwindChainOp {
1057 input: Box<dyn Operator>,
1058 var: String,
1059 expr: Expr,
1060 current_row: Option<Row>,
1061 items: Vec<Value>,
1062 cursor: usize,
1063}
1064
1065impl UnwindChainOp {
1066 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1067 Self {
1068 input,
1069 var,
1070 expr,
1071 current_row: None,
1072 items: Vec::new(),
1073 cursor: 0,
1074 }
1075 }
1076}
1077
1078impl Operator for UnwindChainOp {
1079 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1080 loop {
1081 if let Some(base) = &self.current_row {
1082 if self.cursor < self.items.len() {
1083 let v = self.items[self.cursor].clone();
1084 self.cursor += 1;
1085 let mut row = base.clone();
1086 row.insert(self.var.clone(), v);
1087 return Ok(Some(row));
1088 }
1089 self.current_row = None;
1090 self.items.clear();
1091 self.cursor = 0;
1092 }
1093 let base = match self.input.next(ctx)? {
1094 Some(r) => r,
1095 None => return Ok(None),
1096 };
1097 let ectx = EvalCtx {
1098 row: &base,
1099 params: ctx.params,
1100 reader: ctx.store,
1101 procedures: ctx.procedures,
1102 outer_rows: ctx.outer_rows,
1103 tombstones: ctx.tombstones,
1104 };
1105 let val = eval_expr(&self.expr, &ectx)?;
1106 self.items = coerce_unwind_list(val)?;
1107 self.current_row = Some(base);
1108 }
1109 }
1110}
1111
1112fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1117 match val {
1118 Value::List(items) => Ok(items),
1119 Value::Property(Property::List(props)) => {
1120 Ok(props.into_iter().map(Value::Property).collect())
1121 }
1122 Value::Null => Ok(Vec::new()),
1123 _ => Err(Error::TypeMismatch),
1124 }
1125}
1126
1127struct CreatePathOp {
1128 input: Option<Box<dyn Operator>>,
1129 nodes: Vec<CreateNodeSpec>,
1130 edges: Vec<CreateEdgeSpec>,
1131 done: bool,
1132 buffered: Option<Vec<Row>>,
1133 cursor: usize,
1134}
1135
1136impl CreatePathOp {
1137 fn new(
1138 input: Option<Box<dyn Operator>>,
1139 nodes: Vec<CreateNodeSpec>,
1140 edges: Vec<CreateEdgeSpec>,
1141 ) -> Self {
1142 Self {
1143 input,
1144 nodes,
1145 edges,
1146 done: false,
1147 buffered: None,
1148 cursor: 0,
1149 }
1150 }
1151
1152 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1153 let mut out = row.clone();
1154 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1155 for spec in &self.nodes {
1156 match spec {
1157 CreateNodeSpec::New {
1158 var,
1159 labels,
1160 properties,
1161 } => {
1162 let mut node = Node::new();
1163 for label in labels {
1164 node.labels.push(label.clone());
1165 }
1166 for (k, expr) in properties {
1174 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1175 let prop = value_to_property(value)?;
1176 if matches!(prop, Property::Null) {
1177 continue;
1178 }
1179 node.properties.insert(k.clone(), prop);
1180 }
1181 ctx.writer.put_node(&node)?;
1182 node_ids.push(node.id);
1183 if let Some(v) = var {
1184 out.insert(v.clone(), Value::Node(node));
1185 }
1186 }
1187 CreateNodeSpec::Reference(name) => {
1188 let id = match out.get(name) {
1189 Some(Value::Node(n)) => n.id,
1190 _ => return Err(Error::UnboundVariable(name.clone())),
1191 };
1192 node_ids.push(id);
1193 }
1194 }
1195 }
1196 for spec in &self.edges {
1197 let src = node_ids[spec.src_idx];
1198 let dst = node_ids[spec.dst_idx];
1199 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1200 for (k, expr) in &spec.properties {
1201 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1202 let prop = value_to_property(value)?;
1203 if matches!(prop, Property::Null) {
1204 continue;
1205 }
1206 edge.properties.insert(k.clone(), prop);
1207 }
1208 ctx.writer.put_edge(&edge)?;
1209 if let Some(v) = &spec.var {
1210 out.insert(v.clone(), Value::Edge(edge));
1211 }
1212 }
1213 Ok(out)
1214 }
1215}
1216
1217impl Operator for CreatePathOp {
1218 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1219 if self.input.is_some() {
1220 if let Some(buffered) = self.buffered.as_mut() {
1224 if self.cursor < buffered.len() {
1225 let row = buffered[self.cursor].clone();
1226 self.cursor += 1;
1227 return Ok(Some(self.apply(ctx, &row)?));
1228 }
1229 return Ok(None);
1230 }
1231 let mut rows: Vec<Row> = Vec::new();
1232 {
1233 let input = self.input.as_mut().unwrap();
1234 while let Some(row) = input.next(ctx)? {
1235 rows.push(row);
1236 }
1237 }
1238 self.buffered = Some(rows);
1239 self.cursor = 0;
1240 self.next(ctx)
1242 } else {
1243 if self.done {
1244 return Ok(None);
1245 }
1246 self.done = true;
1247 let empty = Row::new();
1248 Ok(Some(self.apply(ctx, &empty)?))
1249 }
1250 }
1251}
1252
1253struct CartesianProductOp {
1254 left: Box<dyn Operator>,
1255 right_plan: LogicalPlan,
1256 left_row: Option<Row>,
1257 right_op: Option<Box<dyn Operator>>,
1258}
1259
1260impl CartesianProductOp {
1261 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1262 Self {
1263 left,
1264 right_plan,
1265 left_row: None,
1266 right_op: None,
1267 }
1268 }
1269}
1270
1271impl Operator for CartesianProductOp {
1272 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1273 loop {
1274 if self.left_row.is_none() {
1275 match self.left.next(ctx)? {
1276 None => return Ok(None),
1277 Some(row) => {
1278 self.left_row = Some(row);
1279 self.right_op = Some(build_op(&self.right_plan));
1280 }
1281 }
1282 }
1283 let right_op = self.right_op.as_mut().expect("right_op set");
1284 let left_ref = self.left_row.as_ref().unwrap();
1289 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1290 stacked.push(left_ref);
1291 stacked.extend_from_slice(ctx.outer_rows);
1292 let inner_ctx = ExecCtx {
1293 store: ctx.store,
1294 writer: ctx.writer,
1295 params: ctx.params,
1296 procedures: ctx.procedures,
1297 outer_rows: &stacked,
1298 tombstones: ctx.tombstones,
1299 };
1300 match right_op.next(&inner_ctx)? {
1301 Some(right_row) => {
1302 let mut combined = left_ref.clone();
1303 for (k, v) in right_row {
1304 combined.insert(k, v);
1305 }
1306 return Ok(Some(combined));
1307 }
1308 None => {
1309 self.left_row = None;
1310 self.right_op = None;
1311 }
1312 }
1313 }
1314 }
1315}
1316
1317struct DeleteOp {
1318 input: Box<dyn Operator>,
1319 detach: bool,
1320 #[allow(dead_code)]
1321 vars: Vec<String>,
1322 exprs: Vec<Expr>,
1323 buffered: Option<Vec<Row>>,
1331 cursor: usize,
1332}
1333
1334impl DeleteOp {
1335 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1336 Self {
1337 input,
1338 detach,
1339 vars,
1340 exprs,
1341 buffered: None,
1342 cursor: 0,
1343 }
1344 }
1345
1346 fn apply_deletes(
1356 &self,
1357 ctx: &ExecCtx,
1358 row: &Row,
1359 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1360 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1361 ) -> Result<()> {
1362 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1363 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1364 for expr in &self.exprs {
1365 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1366 match v {
1367 Value::Node(n) => node_ids.push(n.id),
1368 Value::Edge(e) => edge_ids.push(e.id),
1369 Value::Path { nodes, edges } => {
1370 for e in edges {
1371 edge_ids.push(e.id);
1372 }
1373 for n in nodes {
1374 node_ids.push(n.id);
1375 }
1376 }
1377 Value::Null | Value::Property(Property::Null) => continue,
1378 _ => return Err(Error::TypeMismatch),
1379 }
1380 }
1381 for eid in &edge_ids {
1382 if seen_edges.insert(*eid) {
1383 ctx.writer.delete_edge(*eid)?;
1384 ctx.tombstones.edges.borrow_mut().insert(*eid);
1385 }
1386 }
1387 for nid in &node_ids {
1388 if !seen_nodes.insert(*nid) {
1389 continue;
1390 }
1391 if self.detach {
1392 for (eid, _) in ctx.store.outgoing(*nid)? {
1397 ctx.tombstones.edges.borrow_mut().insert(eid);
1398 }
1399 for (eid, _) in ctx.store.incoming(*nid)? {
1400 ctx.tombstones.edges.borrow_mut().insert(eid);
1401 }
1402 ctx.writer.detach_delete_node(*nid)?;
1403 } else {
1404 let out = ctx.store.outgoing(*nid)?;
1405 let inc = ctx.store.incoming(*nid)?;
1406 let still_attached = out
1407 .iter()
1408 .chain(inc.iter())
1409 .any(|(eid, _)| !seen_edges.contains(eid));
1410 if still_attached {
1411 return Err(Error::CannotDeleteAttachedNode);
1412 }
1413 ctx.writer.detach_delete_node(*nid)?;
1414 }
1415 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1416 }
1417 Ok(())
1418 }
1419}
1420
1421impl Operator for DeleteOp {
1422 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1423 if self.buffered.is_none() {
1431 let mut rows: Vec<Row> = Vec::new();
1432 while let Some(row) = self.input.next(ctx)? {
1433 rows.push(row);
1434 }
1435 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1436 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1437 for row in &rows {
1438 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1439 }
1440 self.buffered = Some(rows);
1441 self.cursor = 0;
1442 }
1443 let rows = self.buffered.as_ref().unwrap();
1444 if self.cursor < rows.len() {
1445 let row = rows[self.cursor].clone();
1446 self.cursor += 1;
1447 return Ok(Some(row));
1448 }
1449 Ok(None)
1450 }
1451}
1452
1453struct SetPropertyOp {
1454 input: Box<dyn Operator>,
1455 assignments: Vec<SetAssignment>,
1456}
1457
1458impl SetPropertyOp {
1459 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1460 Self { input, assignments }
1461 }
1462}
1463
1464impl Operator for SetPropertyOp {
1465 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1466 match self.input.next(ctx)? {
1467 None => Ok(None),
1468 Some(mut row) => {
1469 enum Action {
1471 SetKey {
1472 var: String,
1473 key: String,
1474 prop: Property,
1475 },
1476 AddLabels {
1477 var: String,
1478 labels: Vec<String>,
1479 },
1480 Replace {
1481 var: String,
1482 props: Vec<(String, Property)>,
1483 },
1484 Merge {
1485 var: String,
1486 props: Vec<(String, Property)>,
1487 },
1488 }
1489 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1490 for a in &self.assignments {
1491 match a {
1492 SetAssignment::Property { var, key, value } => {
1493 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1494 let prop = value_to_property(evaluated)?;
1495 actions.push(Action::SetKey {
1496 var: var.clone(),
1497 key: key.clone(),
1498 prop,
1499 });
1500 }
1501 SetAssignment::Labels { var, labels } => {
1502 actions.push(Action::AddLabels {
1503 var: var.clone(),
1504 labels: labels.clone(),
1505 });
1506 }
1507 SetAssignment::Replace { var, properties } => {
1508 let props = properties
1513 .iter()
1514 .map(|(k, expr)| {
1515 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1516 Ok((k.clone(), value_to_property(v)?))
1517 })
1518 .collect::<Result<Vec<(String, Property)>>>()?;
1519 actions.push(Action::Replace {
1520 var: var.clone(),
1521 props,
1522 });
1523 }
1524 SetAssignment::Merge { var, properties } => {
1525 let props = properties
1526 .iter()
1527 .map(|(k, expr)| {
1528 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1529 Ok((k.clone(), value_to_property(v)?))
1530 })
1531 .collect::<Result<Vec<(String, Property)>>>()?;
1532 actions.push(Action::Merge {
1533 var: var.clone(),
1534 props,
1535 });
1536 }
1537 SetAssignment::ReplaceFromExpr {
1538 var,
1539 source,
1540 replace,
1541 } => {
1542 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1543 let props = extract_property_map(&v)?;
1544 if *replace {
1545 actions.push(Action::Replace {
1546 var: var.clone(),
1547 props,
1548 });
1549 } else {
1550 actions.push(Action::Merge {
1551 var: var.clone(),
1552 props,
1553 });
1554 }
1555 }
1556 }
1557 }
1558
1559 let mut updated_nodes: HashSet<String> = HashSet::new();
1561 let mut updated_edges: HashSet<String> = HashSet::new();
1562 for action in actions {
1563 match action {
1564 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1565 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1569 continue
1570 }
1571 Some(Value::Node(n)) => {
1574 if matches!(prop, Property::Null) {
1575 n.properties.remove(&key);
1576 } else {
1577 n.properties.insert(key, prop);
1578 }
1579 updated_nodes.insert(var);
1580 }
1581 Some(Value::Edge(e)) => {
1582 if matches!(prop, Property::Null) {
1583 e.properties.remove(&key);
1584 } else {
1585 e.properties.insert(key, prop);
1586 }
1587 updated_edges.insert(var);
1588 }
1589 _ => return Err(Error::UnboundVariable(var)),
1590 },
1591 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1592 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1593 continue
1594 }
1595 Some(Value::Node(n)) => {
1596 for label in labels {
1597 if !n.labels.contains(&label) {
1598 n.labels.push(label);
1599 }
1600 }
1601 updated_nodes.insert(var);
1602 }
1603 _ => return Err(Error::UnboundVariable(var)),
1604 },
1605 Action::Replace { var, props } => match row.get_mut(&var) {
1606 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1607 continue
1608 }
1609 Some(Value::Node(n)) => {
1610 n.properties.clear();
1611 for (k, v) in props {
1612 if !matches!(v, Property::Null) {
1613 n.properties.insert(k, v);
1614 }
1615 }
1616 updated_nodes.insert(var);
1617 }
1618 Some(Value::Edge(e)) => {
1619 e.properties.clear();
1620 for (k, v) in props {
1621 if !matches!(v, Property::Null) {
1622 e.properties.insert(k, v);
1623 }
1624 }
1625 updated_edges.insert(var);
1626 }
1627 _ => return Err(Error::UnboundVariable(var)),
1628 },
1629 Action::Merge { var, props } => match row.get_mut(&var) {
1630 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1631 continue
1632 }
1633 Some(Value::Node(n)) => {
1634 for (k, v) in props {
1635 if matches!(v, Property::Null) {
1636 n.properties.remove(&k);
1637 } else {
1638 n.properties.insert(k, v);
1639 }
1640 }
1641 updated_nodes.insert(var);
1642 }
1643 Some(Value::Edge(e)) => {
1644 for (k, v) in props {
1645 if matches!(v, Property::Null) {
1646 e.properties.remove(&k);
1647 } else {
1648 e.properties.insert(k, v);
1649 }
1650 }
1651 updated_edges.insert(var);
1652 }
1653 _ => return Err(Error::UnboundVariable(var)),
1654 },
1655 }
1656 }
1657
1658 for var in &updated_nodes {
1660 if let Some(Value::Node(n)) = row.get(var) {
1661 ctx.writer.put_node(n)?;
1662 }
1663 }
1664 for var in &updated_edges {
1665 if let Some(Value::Edge(e)) = row.get(var) {
1666 ctx.writer.put_edge(e)?;
1667 }
1668 }
1669
1670 Ok(Some(row))
1671 }
1672 }
1673 }
1674}
1675
1676struct RemoveOp {
1677 input: Box<dyn Operator>,
1678 items: Vec<RemoveSpec>,
1679}
1680
1681impl RemoveOp {
1682 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1683 Self { input, items }
1684 }
1685}
1686
1687impl Operator for RemoveOp {
1688 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1689 match self.input.next(ctx)? {
1690 None => Ok(None),
1691 Some(mut row) => {
1692 let mut updated_nodes: HashSet<String> = HashSet::new();
1693 let mut updated_edges: HashSet<String> = HashSet::new();
1694 for item in &self.items {
1695 match item {
1696 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1697 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1700 continue
1701 }
1702 Some(Value::Node(n)) => {
1703 n.properties.remove(key);
1704 updated_nodes.insert(var.clone());
1705 }
1706 Some(Value::Edge(e)) => {
1707 e.properties.remove(key);
1708 updated_edges.insert(var.clone());
1709 }
1710 _ => return Err(Error::UnboundVariable(var.clone())),
1711 },
1712 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1713 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1714 continue
1715 }
1716 Some(Value::Node(n)) => {
1717 n.labels.retain(|l| !labels.contains(l));
1718 updated_nodes.insert(var.clone());
1719 }
1720 _ => return Err(Error::UnboundVariable(var.clone())),
1721 },
1722 }
1723 }
1724 for var in &updated_nodes {
1725 if let Some(Value::Node(n)) = row.get(var) {
1726 ctx.writer.put_node(n)?;
1727 }
1728 }
1729 for var in &updated_edges {
1730 if let Some(Value::Edge(e)) = row.get(var) {
1731 ctx.writer.put_edge(e)?;
1732 }
1733 }
1734 Ok(Some(row))
1735 }
1736 }
1737 }
1738}
1739
1740struct LoadCsvOp {
1741 input: Option<Box<dyn Operator>>,
1742 path_expr: Expr,
1743 var: String,
1744 with_headers: bool,
1745 rows: Option<Vec<Value>>,
1746 cursor: usize,
1747}
1748
1749impl LoadCsvOp {
1750 fn new(
1751 input: Option<Box<dyn Operator>>,
1752 path_expr: Expr,
1753 var: String,
1754 with_headers: bool,
1755 ) -> Self {
1756 Self {
1757 input,
1758 path_expr,
1759 var,
1760 with_headers,
1761 rows: None,
1762 cursor: 0,
1763 }
1764 }
1765
1766 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1767 let ectx = ctx.eval_ctx(base_row);
1768 let path_val = eval_expr(&self.path_expr, &ectx)?;
1769 let path = match path_val {
1770 Value::Property(Property::String(s)) => s,
1771 _ => return Err(Error::TypeMismatch),
1772 };
1773 let content = std::fs::read_to_string(&path).map_err(|e| {
1774 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1775 })?;
1776 let mut lines = content.lines();
1777 let headers: Option<Vec<String>> = if self.with_headers {
1778 lines
1779 .next()
1780 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1781 } else {
1782 None
1783 };
1784 let mut csv_rows = Vec::new();
1785 for line in lines {
1786 if line.trim().is_empty() {
1787 continue;
1788 }
1789 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1790 if let Some(hdrs) = &headers {
1791 let mut map = std::collections::HashMap::new();
1792 for (i, h) in hdrs.iter().enumerate() {
1793 let val = fields.get(i).cloned().unwrap_or_default();
1794 map.insert(h.clone(), Property::String(val));
1795 }
1796 csv_rows.push(Value::Property(Property::Map(map)));
1797 } else {
1798 let list: Vec<Value> = fields
1799 .into_iter()
1800 .map(|f| Value::Property(Property::String(f)))
1801 .collect();
1802 csv_rows.push(Value::List(list));
1803 }
1804 }
1805 self.rows = Some(csv_rows);
1806 self.cursor = 0;
1807 Ok(())
1808 }
1809}
1810
1811impl Operator for LoadCsvOp {
1812 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1813 if self.rows.is_none() {
1814 let base = if let Some(input) = &mut self.input {
1815 match input.next(ctx)? {
1816 Some(r) => r,
1817 None => return Ok(None),
1818 }
1819 } else {
1820 Row::new()
1821 };
1822 self.load(ctx, &base)?;
1823 }
1824 let rows = self.rows.as_ref().unwrap();
1825 if self.cursor < rows.len() {
1826 let val = rows[self.cursor].clone();
1827 self.cursor += 1;
1828 let mut row = Row::new();
1829 row.insert(self.var.clone(), val);
1830 Ok(Some(row))
1831 } else {
1832 Ok(None)
1833 }
1834 }
1835}
1836
1837struct ForeachOp {
1838 input: Box<dyn Operator>,
1839 var: String,
1840 list_expr: Expr,
1841 set_assignments: Vec<SetAssignment>,
1842 remove_items: Vec<RemoveSpec>,
1843}
1844
1845impl ForeachOp {
1846 fn new(
1847 input: Box<dyn Operator>,
1848 var: String,
1849 list_expr: Expr,
1850 set_assignments: Vec<SetAssignment>,
1851 remove_items: Vec<RemoveSpec>,
1852 ) -> Self {
1853 Self {
1854 input,
1855 var,
1856 list_expr,
1857 set_assignments,
1858 remove_items,
1859 }
1860 }
1861}
1862
1863impl Operator for ForeachOp {
1864 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1865 let Some(row) = self.input.next(ctx)? else {
1866 return Ok(None);
1867 };
1868 let ectx = ctx.eval_ctx(&row);
1869 let list_val = eval_expr(&self.list_expr, &ectx)?;
1870 let items = match list_val {
1871 Value::List(items) => items,
1872 Value::Property(Property::List(props)) => {
1873 props.into_iter().map(Value::Property).collect()
1874 }
1875 Value::Null | Value::Property(Property::Null) => Vec::new(),
1876 _ => return Err(Error::TypeMismatch),
1877 };
1878 for item in items {
1879 let mut scratch = row.clone();
1880 scratch.insert(self.var.clone(), item);
1881 for a in &self.set_assignments {
1882 match a {
1883 SetAssignment::Property { var, key, value } => {
1884 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1885 let prop = value_to_property(evaluated)?;
1886 match scratch.get_mut(var) {
1887 Some(Value::Node(n)) => {
1888 n.properties.insert(key.clone(), prop);
1889 }
1890 Some(Value::Edge(e)) => {
1891 e.properties.insert(key.clone(), prop);
1892 }
1893 _ => return Err(Error::UnboundVariable(var.clone())),
1894 }
1895 }
1896 SetAssignment::Labels { var, labels } => {
1897 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1898 for l in labels {
1899 if !n.labels.contains(l) {
1900 n.labels.push(l.clone());
1901 }
1902 }
1903 }
1904 }
1905 _ => {}
1906 }
1907 }
1908 for ri in &self.remove_items {
1909 match ri {
1910 RemoveSpec::Property { var, key } => {
1911 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1912 n.properties.remove(key);
1913 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1914 e.properties.remove(key);
1915 }
1916 }
1917 RemoveSpec::Labels { var, labels } => {
1918 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1919 n.labels.retain(|l| !labels.contains(l));
1920 }
1921 }
1922 }
1923 }
1924 for (_, val) in scratch.iter() {
1926 match val {
1927 Value::Node(n) => ctx.writer.put_node(n)?,
1928 Value::Edge(e) => ctx.writer.put_edge(e)?,
1929 _ => {}
1930 }
1931 }
1932 }
1933 Ok(Some(row))
1934 }
1935}
1936
1937struct SeedRowOp {
1938 done: bool,
1939}
1940
1941impl Operator for SeedRowOp {
1942 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1943 if self.done {
1944 return Ok(None);
1945 }
1946 self.done = true;
1947 Ok(Some(Row::new()))
1948 }
1949}
1950
1951struct SeededRowOp {
1952 row: Option<Row>,
1953}
1954
1955impl Operator for SeededRowOp {
1956 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1957 Ok(self.row.take())
1958 }
1959}
1960
1961struct CallSubqueryOp {
1962 input: Box<dyn Operator>,
1963 body_plan: LogicalPlan,
1964 pending: Vec<Row>,
1965 pending_idx: usize,
1966}
1967
1968impl CallSubqueryOp {
1969 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1970 Self {
1971 input,
1972 body_plan,
1973 pending: Vec::new(),
1974 pending_idx: 0,
1975 }
1976 }
1977}
1978
1979impl Operator for CallSubqueryOp {
1980 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1981 loop {
1982 if self.pending_idx < self.pending.len() {
1983 let row = self.pending[self.pending_idx].clone();
1984 self.pending_idx += 1;
1985 return Ok(Some(row));
1986 }
1987 let outer_row = match self.input.next(ctx)? {
1988 Some(r) => r,
1989 None => return Ok(None),
1990 };
1991 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1992 let mut results = Vec::new();
1993 while let Some(body_row) = body_op.next(ctx)? {
1994 let mut merged = outer_row.clone();
1995 for (k, v) in body_row {
1996 merged.insert(k, v);
1997 }
1998 results.push(merged);
1999 }
2000 if results.is_empty() {
2001 continue;
2002 }
2003 self.pending = results;
2004 self.pending_idx = 0;
2005 }
2006 }
2007}
2008
2009struct OptionalApplyOp {
2016 input: Box<dyn Operator>,
2017 body_plan: LogicalPlan,
2018 null_vars: Vec<String>,
2019 pending: Vec<Row>,
2020 pending_idx: usize,
2021}
2022
2023impl OptionalApplyOp {
2024 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2025 Self {
2026 input,
2027 body_plan,
2028 null_vars,
2029 pending: Vec::new(),
2030 pending_idx: 0,
2031 }
2032 }
2033}
2034
2035impl Operator for OptionalApplyOp {
2036 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2037 loop {
2038 if self.pending_idx < self.pending.len() {
2039 let row = self.pending[self.pending_idx].clone();
2040 self.pending_idx += 1;
2041 return Ok(Some(row));
2042 }
2043 let outer_row = match self.input.next(ctx)? {
2044 Some(r) => r,
2045 None => return Ok(None),
2046 };
2047 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
2048 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2052 stacked.push(&outer_row);
2053 stacked.extend_from_slice(ctx.outer_rows);
2054 let inner_ctx = ExecCtx {
2055 store: ctx.store,
2056 writer: ctx.writer,
2057 params: ctx.params,
2058 procedures: ctx.procedures,
2059 outer_rows: &stacked,
2060 tombstones: ctx.tombstones,
2061 };
2062 let mut results = Vec::new();
2063 while let Some(body_row) = body_op.next(&inner_ctx)? {
2064 let mut merged = outer_row.clone();
2065 for (k, v) in body_row {
2066 merged.insert(k, v);
2067 }
2068 results.push(merged);
2069 }
2070 if results.is_empty() {
2071 let mut fallback = outer_row;
2072 for v in &self.null_vars {
2073 fallback.insert(v.clone(), Value::Null);
2074 }
2075 return Ok(Some(fallback));
2076 }
2077 self.pending = results;
2078 self.pending_idx = 0;
2079 }
2080 }
2081}
2082
2083struct ProcedureCallOp {
2102 input: Option<Box<dyn Operator>>,
2103 qualified_name: Vec<String>,
2104 args: Option<Vec<Expr>>,
2105 yield_spec: Option<YieldSpec>,
2106 standalone: bool,
2107 buffered: Vec<Row>,
2108 buffered_idx: usize,
2109 done: bool,
2112}
2113
2114impl ProcedureCallOp {
2115 fn new(
2116 input: Option<Box<dyn Operator>>,
2117 qualified_name: Vec<String>,
2118 args: Option<Vec<Expr>>,
2119 yield_spec: Option<YieldSpec>,
2120 standalone: bool,
2121 ) -> Self {
2122 Self {
2123 input,
2124 qualified_name,
2125 args,
2126 yield_spec,
2127 standalone,
2128 buffered: Vec::new(),
2129 buffered_idx: 0,
2130 done: false,
2131 }
2132 }
2133
2134 fn resolve_projection(
2140 &self,
2141 proc: &crate::procedures::Procedure,
2142 ) -> Result<Vec<(String, String)>> {
2143 match &self.yield_spec {
2144 None => {
2145 if !self.standalone {
2146 if proc.outputs.is_empty() {
2155 return Ok(Vec::new());
2156 }
2157 return Err(Error::Procedure(format!(
2158 "procedure '{}' has outputs but no YIELD clause",
2159 self.qualified_name.join(".")
2160 )));
2161 }
2162 Ok(proc
2163 .outputs
2164 .iter()
2165 .map(|o| (o.name.clone(), o.name.clone()))
2166 .collect())
2167 }
2168 Some(YieldSpec::Star) => {
2169 if !self.standalone {
2170 return Err(Error::Procedure(
2171 "YIELD * is only allowed on standalone CALL".into(),
2172 ));
2173 }
2174 Ok(proc
2175 .outputs
2176 .iter()
2177 .map(|o| (o.name.clone(), o.name.clone()))
2178 .collect())
2179 }
2180 Some(YieldSpec::Items(items)) => {
2181 let mut projection = Vec::with_capacity(items.len());
2182 let mut seen_aliases: std::collections::HashSet<String> =
2183 std::collections::HashSet::new();
2184 for yi in items {
2185 if !proc.outputs.iter().any(|o| o.name == yi.column) {
2186 return Err(Error::Procedure(format!(
2187 "procedure '{}' has no output column '{}'",
2188 self.qualified_name.join("."),
2189 yi.column
2190 )));
2191 }
2192 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2193 if !seen_aliases.insert(alias.clone()) {
2194 return Err(Error::Procedure(format!(
2195 "variable '{alias}' already bound by YIELD"
2196 )));
2197 }
2198 projection.push((yi.column.clone(), alias));
2199 }
2200 Ok(projection)
2201 }
2202 }
2203 }
2204
2205 fn evaluate_args(
2212 &self,
2213 ctx: &ExecCtx,
2214 row: &Row,
2215 proc: &crate::procedures::Procedure,
2216 ) -> Result<Vec<Value>> {
2217 match &self.args {
2218 Some(exprs) => {
2219 if exprs.len() != proc.inputs.len() {
2220 return Err(Error::Procedure(format!(
2221 "procedure '{}' expects {} argument(s), got {}",
2222 self.qualified_name.join("."),
2223 proc.inputs.len(),
2224 exprs.len()
2225 )));
2226 }
2227 let eval_ctx = ctx.eval_ctx(row);
2228 let mut values = Vec::with_capacity(exprs.len());
2229 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2230 let v = eval_expr(expr, &eval_ctx)?;
2231 if !spec.ty.accepts(&v) {
2232 return Err(Error::Procedure(format!(
2233 "argument '{}' has wrong type for procedure '{}'",
2234 spec.name,
2235 self.qualified_name.join(".")
2236 )));
2237 }
2238 values.push(coerce_arg(v, spec.ty));
2239 }
2240 Ok(values)
2241 }
2242 None => {
2243 if !self.standalone {
2245 return Err(Error::Procedure(
2246 "in-query CALL requires explicit argument list".into(),
2247 ));
2248 }
2249 let mut values = Vec::with_capacity(proc.inputs.len());
2250 for spec in &proc.inputs {
2251 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2252 Error::Procedure(format!(
2253 "missing parameter ${} for procedure '{}'",
2254 spec.name,
2255 self.qualified_name.join(".")
2256 ))
2257 })?;
2258 if !spec.ty.accepts(&v) {
2259 return Err(Error::Procedure(format!(
2260 "parameter '{}' has wrong type",
2261 spec.name
2262 )));
2263 }
2264 values.push(coerce_arg(v, spec.ty));
2265 }
2266 Ok(values)
2267 }
2268 }
2269 }
2270
2271 fn invoke_once(
2276 &self,
2277 ctx: &ExecCtx,
2278 input_row: &Row,
2279 proc: &crate::procedures::Procedure,
2280 projection: &[(String, String)],
2281 out: &mut Vec<Row>,
2282 ) -> Result<()> {
2283 if proc.outputs.is_empty() {
2287 if !self.standalone {
2288 out.push(input_row.clone());
2289 }
2290 return Ok(());
2291 }
2292 let args = self.evaluate_args(ctx, input_row, proc)?;
2293 let rows = proc.resolve_rows(ctx.store)?;
2294 for proc_row in &rows {
2295 if !proc.row_matches(proc_row, &args) {
2296 continue;
2297 }
2298 let mut merged = if self.standalone {
2299 Row::new()
2300 } else {
2301 input_row.clone()
2302 };
2303 for (src, alias) in projection {
2304 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2305 merged.insert(alias.clone(), v);
2306 }
2307 out.push(merged);
2308 }
2309 Ok(())
2310 }
2311}
2312
2313fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2318 use crate::procedures::ProcType;
2319 if matches!(ty, ProcType::Float) {
2320 if let Value::Property(Property::Int64(n)) = v {
2321 return Value::Property(Property::Float64(n as f64));
2322 }
2323 }
2324 v
2325}
2326
2327impl Operator for ProcedureCallOp {
2328 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2329 loop {
2330 if self.buffered_idx < self.buffered.len() {
2331 let row = self.buffered[self.buffered_idx].clone();
2332 self.buffered_idx += 1;
2333 return Ok(Some(row));
2334 }
2335 self.buffered.clear();
2336 self.buffered_idx = 0;
2337
2338 let proc = match ctx.procedures.get(&self.qualified_name) {
2339 Some(p) => p,
2340 None => {
2341 return Err(Error::Procedure(format!(
2342 "procedure '{}' not found",
2343 self.qualified_name.join(".")
2344 )));
2345 }
2346 };
2347 let projection = self.resolve_projection(proc)?;
2348
2349 let input_row = match &mut self.input {
2350 Some(inp) => match inp.next(ctx)? {
2351 Some(r) => r,
2352 None => return Ok(None),
2353 },
2354 None => {
2355 if self.done {
2356 return Ok(None);
2357 }
2358 self.done = true;
2359 Row::new()
2360 }
2361 };
2362
2363 let mut produced = Vec::new();
2364 self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2365 if produced.is_empty() {
2366 if self.input.is_some() {
2367 continue;
2368 }
2369 return Ok(None);
2370 }
2371 self.buffered = produced;
2372 }
2373 }
2374}
2375
2376fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2382 match v {
2383 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2384 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2385 Value::Map(pairs) => pairs
2386 .iter()
2387 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2388 .collect(),
2389 Value::Property(Property::Map(entries)) => Ok(entries
2390 .iter()
2391 .map(|(k, p)| (k.clone(), p.clone()))
2392 .collect()),
2393 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2394 _ => Err(Error::InvalidSetValue),
2395 }
2396}
2397
2398fn value_to_property(v: Value) -> Result<Property> {
2399 match v {
2400 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2401 Value::Property(p) => Ok(p),
2402 Value::Null => Ok(Property::Null),
2403 Value::List(items) => {
2404 let props: Vec<Property> = items
2405 .into_iter()
2406 .map(value_to_property)
2407 .collect::<Result<_>>()?;
2408 Ok(Property::List(props))
2409 }
2410 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2414 Err(Error::InvalidSetValue)
2415 }
2416 }
2417}
2418
2419struct NodeScanAllOp {
2420 var: String,
2421 ids: Option<Vec<NodeId>>,
2422 cursor: usize,
2423}
2424
2425impl NodeScanAllOp {
2426 fn new(var: String) -> Self {
2427 Self {
2428 var,
2429 ids: None,
2430 cursor: 0,
2431 }
2432 }
2433}
2434
2435impl Operator for NodeScanAllOp {
2436 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2437 if self.ids.is_none() {
2438 self.ids = Some(ctx.store.all_node_ids()?);
2439 }
2440 let ids = self.ids.as_ref().unwrap();
2441 while self.cursor < ids.len() {
2442 let id = ids[self.cursor];
2443 self.cursor += 1;
2444 if let Some(node) = ctx.store.get_node(id)? {
2445 let mut row = Row::new();
2446 row.insert(self.var.clone(), Value::Node(node));
2447 return Ok(Some(row));
2448 }
2449 }
2450 Ok(None)
2451 }
2452}
2453
2454struct NodeScanByLabelsOp {
2455 var: String,
2456 labels: Vec<String>,
2457 ids: Option<Vec<NodeId>>,
2458 cursor: usize,
2459}
2460
2461impl NodeScanByLabelsOp {
2462 fn new(var: String, labels: Vec<String>) -> Self {
2463 Self {
2464 var,
2465 labels,
2466 ids: None,
2467 cursor: 0,
2468 }
2469 }
2470}
2471
2472impl Operator for NodeScanByLabelsOp {
2473 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2474 if self.ids.is_none() {
2475 let primary = self
2477 .labels
2478 .first()
2479 .expect("NodeScanByLabels must have at least one label");
2480 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2481 }
2482 let ids = self.ids.as_ref().unwrap();
2483 while self.cursor < ids.len() {
2484 let id = ids[self.cursor];
2485 self.cursor += 1;
2486 if let Some(node) = ctx.store.get_node(id)? {
2487 if has_all_labels(&node, &self.labels) {
2488 let mut row = Row::new();
2489 row.insert(self.var.clone(), Value::Node(node));
2490 return Ok(Some(row));
2491 }
2492 }
2493 }
2494 Ok(None)
2495 }
2496}
2497
2498fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2499 labels.iter().all(|l| node.labels.contains(l))
2500}
2501
2502struct IndexSeekOp {
2514 var: String,
2515 label: String,
2516 properties: Vec<String>,
2517 value_exprs: Vec<Expr>,
2518 results: Option<Vec<NodeId>>,
2519 cursor: usize,
2520}
2521
2522impl IndexSeekOp {
2523 fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2524 assert_eq!(
2525 properties.len(),
2526 value_exprs.len(),
2527 "IndexSeekOp: properties and values must have equal length"
2528 );
2529 Self {
2530 var,
2531 label,
2532 properties,
2533 value_exprs,
2534 results: None,
2535 cursor: 0,
2536 }
2537 }
2538}
2539
2540impl Operator for IndexSeekOp {
2541 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2542 if self.results.is_none() {
2543 let empty = Row::new();
2544 let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2545 for expr in &self.value_exprs {
2546 let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2547 let property = match value {
2548 Value::Property(p) => p,
2549 Value::Null => Property::Null,
2550 Value::Node(_)
2551 | Value::Edge(_)
2552 | Value::List(_)
2553 | Value::Map(_)
2554 | Value::Path { .. } => {
2555 return Err(Error::InvalidSetValue);
2556 }
2557 };
2558 values.push(property);
2559 }
2560 let ids = ctx
2561 .store
2562 .nodes_by_properties(&self.label, &self.properties, &values)?;
2563 self.results = Some(ids);
2564 }
2565 let ids = self.results.as_ref().unwrap();
2566 while self.cursor < ids.len() {
2567 let id = ids[self.cursor];
2568 self.cursor += 1;
2569 if let Some(node) = ctx.store.get_node(id)? {
2570 let mut row = Row::new();
2571 row.insert(self.var.clone(), Value::Node(node));
2572 return Ok(Some(row));
2573 }
2574 }
2575 Ok(None)
2576 }
2577}
2578
2579struct PointIndexSeekOp {
2598 var: String,
2599 label: String,
2600 property: String,
2601 bounds: PointSeekBounds,
2602 results: Option<Vec<NodeId>>,
2603 cursor: usize,
2604}
2605
2606impl PointIndexSeekOp {
2607 fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
2608 Self {
2609 var,
2610 label,
2611 property,
2612 bounds,
2613 results: None,
2614 cursor: 0,
2615 }
2616 }
2617}
2618
2619impl Operator for PointIndexSeekOp {
2620 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2621 if self.results.is_none() {
2622 let empty = Row::new();
2623 let ectx = ctx.eval_ctx(&empty);
2624 let ids = match &self.bounds {
2625 PointSeekBounds::Corners { lo, hi } => {
2626 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2627 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2628 match (lo_pt, hi_pt) {
2629 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
2630 &self.label,
2631 &self.property,
2632 lo.srid,
2633 lo.x,
2634 lo.y,
2635 hi.x,
2636 hi.y,
2637 )?,
2638 _ => Vec::new(),
2639 }
2640 }
2641 PointSeekBounds::Radius { center, radius } => {
2642 let center_pt = extract_point(&eval_expr(center, &ectx)?);
2643 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2644 match (center_pt, radius_val) {
2645 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2646 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2647 ctx.store.nodes_in_bbox(
2648 &self.label,
2649 &self.property,
2650 c.srid,
2651 xlo,
2652 ylo,
2653 xhi,
2654 yhi,
2655 )?
2656 }
2657 _ => Vec::new(),
2659 }
2660 }
2661 };
2662 self.results = Some(ids);
2663 }
2664 let ids = self.results.as_ref().unwrap();
2665 while self.cursor < ids.len() {
2666 let id = ids[self.cursor];
2667 self.cursor += 1;
2668 if let Some(node) = ctx.store.get_node(id)? {
2669 let mut row = Row::new();
2670 row.insert(self.var.clone(), Value::Node(node));
2671 return Ok(Some(row));
2672 }
2673 }
2674 Ok(None)
2675 }
2676}
2677
2678fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
2679 match v {
2680 Value::Property(Property::Point(p)) => Some(*p),
2681 _ => None,
2682 }
2683}
2684
2685fn extract_f64(v: &Value) -> Option<f64> {
2686 match v {
2687 Value::Property(Property::Float64(f)) => Some(*f),
2688 Value::Property(Property::Int64(i)) => Some(*i as f64),
2689 _ => None,
2690 }
2691}
2692
2693fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
2703 if center.is_geographic() {
2704 const METRES_PER_DEG: f64 = 111_320.0;
2707 let dlat = r / METRES_PER_DEG;
2708 let cos_lat = center.y.to_radians().cos().abs();
2709 let cos_lat_floor = cos_lat.max(1.0e-6);
2713 let dlon = r / (METRES_PER_DEG * cos_lat_floor);
2714 (
2715 center.x - dlon,
2716 center.y - dlat,
2717 center.x + dlon,
2718 center.y + dlat,
2719 )
2720 } else {
2721 (center.x - r, center.y - r, center.x + r, center.y + r)
2722 }
2723}
2724
2725struct EdgeSeekOp {
2734 edge_var: String,
2735 src_var: String,
2736 dst_var: String,
2737 edge_type: String,
2738 property: String,
2739 value_expr: Expr,
2740 direction: Direction,
2741 residual_properties: Vec<(String, Expr)>,
2742 results: Option<Vec<Row>>,
2745 cursor: usize,
2746}
2747
2748impl EdgeSeekOp {
2749 #[allow(clippy::too_many_arguments)]
2750 fn new(
2751 edge_var: String,
2752 src_var: String,
2753 dst_var: String,
2754 edge_type: String,
2755 property: String,
2756 value_expr: Expr,
2757 direction: Direction,
2758 residual_properties: Vec<(String, Expr)>,
2759 ) -> Self {
2760 Self {
2761 edge_var,
2762 src_var,
2763 dst_var,
2764 edge_type,
2765 property,
2766 value_expr,
2767 direction,
2768 residual_properties,
2769 results: None,
2770 cursor: 0,
2771 }
2772 }
2773}
2774
2775impl Operator for EdgeSeekOp {
2776 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2777 if self.results.is_none() {
2778 let empty = Row::new();
2779 let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2780 let property = match seek_value {
2781 Value::Property(p) => p,
2782 Value::Null => Property::Null,
2783 Value::Node(_)
2784 | Value::Edge(_)
2785 | Value::List(_)
2786 | Value::Map(_)
2787 | Value::Path { .. } => {
2788 return Err(Error::InvalidSetValue);
2789 }
2790 };
2791 let ids = ctx
2792 .store
2793 .edges_by_property(&self.edge_type, &self.property, &property)?;
2794 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
2795 for id in ids {
2796 let Some(edge) = ctx.store.get_edge(id)? else {
2797 continue;
2798 };
2799 let mut residuals_ok = true;
2805 for (key, expr) in &self.residual_properties {
2806 let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2807 let Some(stored) = edge.properties.get(key) else {
2808 residuals_ok = false;
2809 break;
2810 };
2811 if !values_equal(&Value::Property(stored.clone()), &wanted) {
2812 residuals_ok = false;
2813 break;
2814 }
2815 }
2816 if !residuals_ok {
2817 continue;
2818 }
2819 let Some(src_node) = ctx.store.get_node(edge.source)? else {
2824 continue;
2825 };
2826 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
2827 continue;
2828 };
2829 match self.direction {
2834 Direction::Outgoing => {
2835 rows.push(self.make_row(&edge, &src_node, &dst_node));
2836 }
2837 Direction::Incoming => {
2838 rows.push(self.make_row(&edge, &dst_node, &src_node));
2839 }
2840 Direction::Both => {
2841 rows.push(self.make_row(&edge, &src_node, &dst_node));
2842 if edge.source != edge.target {
2845 rows.push(self.make_row(&edge, &dst_node, &src_node));
2846 }
2847 }
2848 }
2849 }
2850 self.results = Some(rows);
2851 }
2852 let rows = self.results.as_ref().unwrap();
2853 if self.cursor < rows.len() {
2854 let row = rows[self.cursor].clone();
2855 self.cursor += 1;
2856 return Ok(Some(row));
2857 }
2858 Ok(None)
2859 }
2860}
2861
2862impl EdgeSeekOp {
2863 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
2864 let mut row = Row::new();
2865 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
2866 row.insert(self.src_var.clone(), Value::Node(src.clone()));
2867 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
2868 row
2869 }
2870}
2871
2872struct EdgePointIndexSeekOp {
2879 edge_var: String,
2880 src_var: String,
2881 dst_var: String,
2882 edge_type: String,
2883 property: String,
2884 direction: Direction,
2885 bounds: PointSeekBounds,
2886 results: Option<Vec<Row>>,
2887 cursor: usize,
2888}
2889
2890impl EdgePointIndexSeekOp {
2891 #[allow(clippy::too_many_arguments)]
2892 fn new(
2893 edge_var: String,
2894 src_var: String,
2895 dst_var: String,
2896 edge_type: String,
2897 property: String,
2898 direction: Direction,
2899 bounds: PointSeekBounds,
2900 ) -> Self {
2901 Self {
2902 edge_var,
2903 src_var,
2904 dst_var,
2905 edge_type,
2906 property,
2907 direction,
2908 bounds,
2909 results: None,
2910 cursor: 0,
2911 }
2912 }
2913
2914 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
2915 let mut row = Row::new();
2916 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
2917 row.insert(self.src_var.clone(), Value::Node(src.clone()));
2918 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
2919 row
2920 }
2921}
2922
2923impl Operator for EdgePointIndexSeekOp {
2924 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2925 if self.results.is_none() {
2926 let empty = Row::new();
2927 let ectx = ctx.eval_ctx(&empty);
2928 let ids = match &self.bounds {
2929 PointSeekBounds::Corners { lo, hi } => {
2930 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2931 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2932 match (lo_pt, hi_pt) {
2933 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
2934 &self.edge_type,
2935 &self.property,
2936 lo.srid,
2937 lo.x,
2938 lo.y,
2939 hi.x,
2940 hi.y,
2941 )?,
2942 _ => Vec::new(),
2943 }
2944 }
2945 PointSeekBounds::Radius { center, radius } => {
2946 let center_pt = extract_point(&eval_expr(center, &ectx)?);
2947 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2948 match (center_pt, radius_val) {
2949 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2950 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2951 ctx.store.edges_in_bbox(
2952 &self.edge_type,
2953 &self.property,
2954 c.srid,
2955 xlo,
2956 ylo,
2957 xhi,
2958 yhi,
2959 )?
2960 }
2961 _ => Vec::new(),
2962 }
2963 }
2964 };
2965
2966 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
2967 for id in ids {
2968 let Some(edge) = ctx.store.get_edge(id)? else {
2969 continue;
2970 };
2971 let Some(src_node) = ctx.store.get_node(edge.source)? else {
2972 continue;
2973 };
2974 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
2975 continue;
2976 };
2977 match self.direction {
2978 Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
2979 Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
2980 Direction::Both => {
2981 rows.push(self.make_row(&edge, &src_node, &dst_node));
2982 if edge.source != edge.target {
2983 rows.push(self.make_row(&edge, &dst_node, &src_node));
2984 }
2985 }
2986 }
2987 }
2988 self.results = Some(rows);
2989 }
2990 let rows = self.results.as_ref().unwrap();
2991 if self.cursor < rows.len() {
2992 let row = rows[self.cursor].clone();
2993 self.cursor += 1;
2994 return Ok(Some(row));
2995 }
2996 Ok(None)
2997 }
2998}
2999
3000fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3001 props.iter().all(|(k, v)| {
3002 node.properties
3003 .get(k)
3004 .map(|stored| stored == v)
3005 .unwrap_or(false)
3006 })
3007}
3008
3009struct MergeNodeOp {
3010 var: String,
3011 labels: Vec<String>,
3012 properties: Vec<(String, Expr)>,
3016 on_create: Vec<SetAssignment>,
3021 on_match: Vec<SetAssignment>,
3025 input: Option<Box<dyn Operator>>,
3032 merged_nodes: Vec<Node>,
3039 merge_done: bool,
3043 current_input_row: Option<Row>,
3047 cursor: usize,
3048}
3049
3050impl MergeNodeOp {
3051 fn new(
3052 input: Option<Box<dyn Operator>>,
3053 var: String,
3054 labels: Vec<String>,
3055 properties: Vec<(String, Expr)>,
3056 on_create: Vec<SetAssignment>,
3057 on_match: Vec<SetAssignment>,
3058 ) -> Self {
3059 Self {
3060 var,
3061 labels,
3062 properties,
3063 on_create,
3064 on_match,
3065 input,
3066 merged_nodes: Vec::new(),
3067 merge_done: false,
3068 current_input_row: None,
3069 cursor: 0,
3070 }
3071 }
3072
3073 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3085 let resolved_props: Vec<(String, Property)> = self
3086 .properties
3087 .iter()
3088 .map(|(k, expr)| {
3089 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3090 Ok((k.clone(), value_to_property(v)?))
3091 })
3092 .collect::<Result<Vec<_>>>()?;
3093
3094 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3095 ctx.store.nodes_by_label(primary)?
3096 } else {
3097 ctx.store.all_node_ids()?
3098 };
3099 let mut merged_nodes: Vec<Node> = Vec::new();
3100 for id in candidate_ids {
3101 if let Some(node) = ctx.store.get_node(id)? {
3102 if has_all_labels(&node, &self.labels)
3103 && matches_pattern_props(&node, &resolved_props)
3104 {
3105 merged_nodes.push(node);
3106 }
3107 }
3108 }
3109
3110 if merged_nodes.is_empty() {
3111 let mut node = Node::new();
3112 for label in &self.labels {
3113 node.labels.push(label.clone());
3114 }
3115 for (k, prop) in resolved_props {
3116 node.properties.insert(k, prop);
3117 }
3118 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3119 ctx.writer.put_node(&node)?;
3120 merged_nodes.push(node);
3121 } else if !self.on_match.is_empty() {
3122 for node in merged_nodes.iter_mut() {
3123 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3124 ctx.writer.put_node(node)?;
3125 }
3126 }
3127 Ok(merged_nodes)
3128 }
3129}
3130
3131impl Operator for MergeNodeOp {
3132 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3133 if self.input.is_none() {
3138 if !self.merge_done {
3139 let empty = Row::new();
3140 let nodes = self.run_merge_for(ctx, &empty)?;
3141 self.merged_nodes = nodes;
3142 self.merge_done = true;
3143 }
3144 if self.cursor < self.merged_nodes.len() {
3145 let node = self.merged_nodes[self.cursor].clone();
3146 self.cursor += 1;
3147 let mut row = Row::new();
3148 row.insert(self.var.clone(), Value::Node(node));
3149 return Ok(Some(row));
3150 }
3151 return Ok(None);
3152 }
3153
3154 loop {
3161 if let Some(base) = self.current_input_row.as_ref() {
3162 if self.cursor < self.merged_nodes.len() {
3163 let node = self.merged_nodes[self.cursor].clone();
3164 self.cursor += 1;
3165 let mut row = base.clone();
3166 row.insert(self.var.clone(), Value::Node(node));
3167 return Ok(Some(row));
3168 }
3169 }
3170 match self.input.as_mut().unwrap().next(ctx)? {
3171 None => return Ok(None),
3172 Some(row) => {
3173 let nodes = self.run_merge_for(ctx, &row)?;
3174 self.merged_nodes = nodes;
3175 self.cursor = 0;
3176 self.current_input_row = Some(row);
3177 }
3178 }
3179 }
3180 }
3181}
3182
3183struct MergeEdgeOp {
3202 input: Box<dyn Operator>,
3203 edge_var: String,
3204 src_var: String,
3205 dst_var: String,
3206 edge_type: String,
3207 undirected: bool,
3208 properties: Vec<(String, Expr)>,
3212 on_create: Vec<SetAssignment>,
3213 on_match: Vec<SetAssignment>,
3214 pending: std::collections::VecDeque<Row>,
3221}
3222
3223impl MergeEdgeOp {
3224 #[allow(clippy::too_many_arguments)]
3225 fn new(
3226 input: Box<dyn Operator>,
3227 edge_var: String,
3228 src_var: String,
3229 dst_var: String,
3230 edge_type: String,
3231 undirected: bool,
3232 properties: Vec<(String, Expr)>,
3233 on_create: Vec<SetAssignment>,
3234 on_match: Vec<SetAssignment>,
3235 ) -> Self {
3236 Self {
3237 input,
3238 edge_var,
3239 src_var,
3240 dst_var,
3241 edge_type,
3242 undirected,
3243 properties,
3244 on_create,
3245 on_match,
3246 pending: std::collections::VecDeque::new(),
3247 }
3248 }
3249}
3250
3251impl Operator for MergeEdgeOp {
3252 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3253 loop {
3254 if let Some(row) = self.pending.pop_front() {
3255 return Ok(Some(row));
3256 }
3257 let Some(row) = self.input.next(ctx)? else {
3258 return Ok(None);
3259 };
3260 let src_node = match row.get(&self.src_var) {
3265 Some(Value::Node(n)) => n.clone(),
3266 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3267 };
3268 let dst_node = match row.get(&self.dst_var) {
3269 Some(Value::Node(n)) => n.clone(),
3270 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3271 };
3272
3273 let required_props: Vec<(String, Property)> = self
3277 .properties
3278 .iter()
3279 .map(|(k, expr)| {
3280 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3281 Ok((k.clone(), value_to_property(v)?))
3282 })
3283 .collect::<Result<Vec<_>>>()?;
3284 let edge_matches = |edge: &Edge| -> bool {
3285 required_props.iter().all(|(k, want)| {
3286 edge.properties
3287 .get(k)
3288 .map(|have| have == want)
3289 .unwrap_or(false)
3290 })
3291 };
3292
3293 let mut matched: Vec<Edge> = Vec::new();
3300 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3301 if neighbor_id != dst_node.id {
3302 continue;
3303 }
3304 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3305 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3306 matched.push(edge);
3307 }
3308 }
3309 }
3310 if self.undirected {
3311 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3312 if neighbor_id != dst_node.id {
3313 continue;
3314 }
3315 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3316 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3317 matched.push(edge);
3318 }
3319 }
3320 }
3321 }
3322
3323 if matched.is_empty() {
3324 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3325 for (k, p) in &required_props {
3326 new_edge.properties.insert(k.clone(), p.clone());
3327 }
3328 let mut row_out = row.clone();
3329 apply_merge_edge_actions(
3330 &mut new_edge,
3331 &self.on_create,
3332 &self.edge_var,
3333 ctx,
3334 &mut row_out,
3335 )?;
3336 ctx.writer.put_edge(&new_edge)?;
3337 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3338 self.pending.push_back(row_out);
3339 } else {
3340 for mut existing in matched {
3341 let mut row_out = row.clone();
3342 if !self.on_match.is_empty() {
3343 apply_merge_edge_actions(
3344 &mut existing,
3345 &self.on_match,
3346 &self.edge_var,
3347 ctx,
3348 &mut row_out,
3349 )?;
3350 ctx.writer.put_edge(&existing)?;
3351 }
3352 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3353 self.pending.push_back(row_out);
3354 }
3355 }
3356 }
3357 }
3358}
3359
3360fn apply_merge_edge_actions(
3370 edge: &mut Edge,
3371 actions: &[SetAssignment],
3372 var: &str,
3373 exec_ctx: &ExecCtx,
3374 outer: &mut Row,
3375) -> Result<()> {
3376 if actions.is_empty() {
3377 return Ok(());
3378 }
3379 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3382 for action in actions {
3383 match action {
3384 SetAssignment::Property {
3385 var: target,
3386 key,
3387 value,
3388 } => {
3389 let sub_ctx = exec_ctx.eval_ctx(outer);
3390 let evaluated = eval_expr(value, &sub_ctx)?;
3391 let prop = value_to_property(evaluated)?;
3392 if target == var {
3393 if matches!(prop, Property::Null) {
3394 edge.properties.remove(key);
3395 } else {
3396 edge.properties.insert(key.clone(), prop);
3397 }
3398 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3399 } else {
3400 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3401 }
3402 }
3403 SetAssignment::Merge {
3404 var: target,
3405 properties,
3406 } => {
3407 let sub_ctx = exec_ctx.eval_ctx(outer);
3408 let resolved: Vec<(String, Property)> = properties
3409 .iter()
3410 .map(|(k, expr)| {
3411 let v = eval_expr(expr, &sub_ctx)?;
3412 Ok((k.clone(), value_to_property(v)?))
3413 })
3414 .collect::<Result<Vec<_>>>()?;
3415 if target == var {
3416 for (k, p) in resolved {
3417 edge.properties.insert(k, p);
3418 }
3419 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3420 } else {
3421 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3422 }
3423 }
3424 SetAssignment::Replace {
3425 var: target,
3426 properties,
3427 } => {
3428 let sub_ctx = exec_ctx.eval_ctx(outer);
3429 let resolved: Vec<(String, Property)> = properties
3430 .iter()
3431 .map(|(k, expr)| {
3432 let v = eval_expr(expr, &sub_ctx)?;
3433 Ok((k.clone(), value_to_property(v)?))
3434 })
3435 .collect::<Result<Vec<_>>>()?;
3436 if target == var {
3437 edge.properties.clear();
3438 for (k, p) in resolved {
3439 edge.properties.insert(k, p);
3440 }
3441 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3442 } else {
3443 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3444 }
3445 }
3446 SetAssignment::Labels {
3447 var: target,
3448 labels,
3449 } => {
3450 if target == var {
3451 return Err(Error::UnboundVariable(target.clone()));
3453 }
3454 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3455 }
3456 SetAssignment::ReplaceFromExpr {
3457 var: target,
3458 source,
3459 replace,
3460 } => {
3461 let sub_ctx = exec_ctx.eval_ctx(outer);
3462 let v = eval_expr(source, &sub_ctx)?;
3463 let props = extract_property_map(&v)?;
3464 if target == var {
3465 if *replace {
3466 edge.properties.clear();
3467 }
3468 for (k, p) in props {
3469 edge.properties.insert(k, p);
3470 }
3471 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3472 } else {
3473 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3474 }
3475 }
3476 }
3477 }
3478 Ok(())
3479}
3480
3481fn apply_set_prop_to_outer(
3486 outer: &mut Row,
3487 exec_ctx: &ExecCtx,
3488 target: &str,
3489 key: &str,
3490 prop: Property,
3491) -> Result<()> {
3492 match outer.get_mut(target) {
3493 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3494 return Ok(());
3497 }
3498 Some(Value::Node(n)) => {
3499 if matches!(prop, Property::Null) {
3500 n.properties.remove(key);
3501 } else {
3502 n.properties.insert(key.to_string(), prop);
3503 }
3504 exec_ctx.writer.put_node(n)?;
3505 }
3506 Some(Value::Edge(e)) => {
3507 if matches!(prop, Property::Null) {
3508 e.properties.remove(key);
3509 } else {
3510 e.properties.insert(key.to_string(), prop);
3511 }
3512 exec_ctx.writer.put_edge(e)?;
3513 }
3514 _ => return Err(Error::UnboundVariable(target.to_string())),
3515 }
3516 Ok(())
3517}
3518
3519fn apply_set_map_to_outer(
3523 outer: &mut Row,
3524 exec_ctx: &ExecCtx,
3525 target: &str,
3526 props: Vec<(String, Property)>,
3527 replace: bool,
3528) -> Result<()> {
3529 match outer.get_mut(target) {
3530 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3531 Some(Value::Node(n)) => {
3532 if replace {
3533 n.properties.clear();
3534 }
3535 for (k, p) in props {
3536 if replace || !matches!(p, Property::Null) {
3537 n.properties.insert(k, p);
3538 } else {
3539 n.properties.remove(&k);
3540 }
3541 }
3542 exec_ctx.writer.put_node(n)?;
3543 Ok(())
3544 }
3545 Some(Value::Edge(e)) => {
3546 if replace {
3547 e.properties.clear();
3548 }
3549 for (k, p) in props {
3550 if replace || !matches!(p, Property::Null) {
3551 e.properties.insert(k, p);
3552 } else {
3553 e.properties.remove(&k);
3554 }
3555 }
3556 exec_ctx.writer.put_edge(e)?;
3557 Ok(())
3558 }
3559 _ => Err(Error::UnboundVariable(target.to_string())),
3560 }
3561}
3562
3563fn apply_set_labels_to_outer(
3565 outer: &mut Row,
3566 exec_ctx: &ExecCtx,
3567 target: &str,
3568 labels: &[String],
3569) -> Result<()> {
3570 match outer.get_mut(target) {
3571 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3572 Some(Value::Node(n)) => {
3573 for label in labels {
3574 if !n.labels.contains(label) {
3575 n.labels.push(label.clone());
3576 }
3577 }
3578 exec_ctx.writer.put_node(n)?;
3579 Ok(())
3580 }
3581 _ => Err(Error::UnboundVariable(target.to_string())),
3582 }
3583}
3584
3585fn apply_merge_actions(
3594 node: &mut Node,
3595 actions: &[SetAssignment],
3596 var: &str,
3597 exec_ctx: &ExecCtx,
3598 base_row: &Row,
3599) -> Result<()> {
3600 if actions.is_empty() {
3601 return Ok(());
3602 }
3603 let mut row = base_row.clone();
3606 row.insert(var.to_string(), Value::Node(node.clone()));
3607 for action in actions {
3608 let sub_ctx = exec_ctx.eval_ctx(&row);
3609 match action {
3610 SetAssignment::Property {
3611 var: target,
3612 key,
3613 value,
3614 } => {
3615 if target != var {
3616 return Err(Error::UnboundVariable(target.clone()));
3617 }
3618 let evaluated = eval_expr(value, &sub_ctx)?;
3619 let prop = value_to_property(evaluated)?;
3620 node.properties.insert(key.clone(), prop);
3621 row.insert(var.to_string(), Value::Node(node.clone()));
3622 }
3623 SetAssignment::Labels {
3624 var: target,
3625 labels,
3626 } => {
3627 if target != var {
3628 return Err(Error::UnboundVariable(target.clone()));
3629 }
3630 for label in labels {
3631 if !node.labels.contains(label) {
3632 node.labels.push(label.clone());
3633 }
3634 }
3635 row.insert(var.to_string(), Value::Node(node.clone()));
3636 }
3637 SetAssignment::Replace {
3638 var: target,
3639 properties,
3640 } => {
3641 if target != var {
3642 return Err(Error::UnboundVariable(target.clone()));
3643 }
3644 let resolved: Vec<(String, Property)> = properties
3645 .iter()
3646 .map(|(k, expr)| {
3647 let v = eval_expr(expr, &sub_ctx)?;
3648 Ok((k.clone(), value_to_property(v)?))
3649 })
3650 .collect::<Result<Vec<_>>>()?;
3651 node.properties.clear();
3652 for (k, p) in resolved {
3653 node.properties.insert(k, p);
3654 }
3655 row.insert(var.to_string(), Value::Node(node.clone()));
3656 }
3657 SetAssignment::Merge {
3658 var: target,
3659 properties,
3660 } => {
3661 if target != var {
3662 return Err(Error::UnboundVariable(target.clone()));
3663 }
3664 let resolved: Vec<(String, Property)> = properties
3665 .iter()
3666 .map(|(k, expr)| {
3667 let v = eval_expr(expr, &sub_ctx)?;
3668 Ok((k.clone(), value_to_property(v)?))
3669 })
3670 .collect::<Result<Vec<_>>>()?;
3671 for (k, p) in resolved {
3672 node.properties.insert(k, p);
3673 }
3674 row.insert(var.to_string(), Value::Node(node.clone()));
3675 }
3676 SetAssignment::ReplaceFromExpr {
3677 var: target,
3678 source,
3679 replace,
3680 } => {
3681 if target != var {
3682 return Err(Error::UnboundVariable(target.clone()));
3683 }
3684 let v = eval_expr(source, &sub_ctx)?;
3685 let props = extract_property_map(&v)?;
3686 if *replace {
3687 node.properties.clear();
3688 }
3689 for (k, p) in props {
3690 node.properties.insert(k, p);
3691 }
3692 row.insert(var.to_string(), Value::Node(node.clone()));
3693 }
3694 }
3695 }
3696 Ok(())
3697}
3698
3699struct EdgeExpandOp {
3700 input: Box<dyn Operator>,
3701 src_var: String,
3702 edge_var: Option<String>,
3703 dst_var: String,
3704 dst_labels: Vec<String>,
3705 edge_properties: Vec<(String, Expr)>,
3706 edge_types: Vec<String>,
3707 direction: Direction,
3708 edge_constraint_var: Option<String>,
3714 current_row: Option<Row>,
3715 pending: Vec<(EdgeId, NodeId)>,
3716 pending_idx: usize,
3717}
3718
3719impl EdgeExpandOp {
3720 #[allow(clippy::too_many_arguments)]
3721 fn new(
3722 input: Box<dyn Operator>,
3723 src_var: String,
3724 edge_var: Option<String>,
3725 dst_var: String,
3726 dst_labels: Vec<String>,
3727 edge_properties: Vec<(String, Expr)>,
3728 edge_types: Vec<String>,
3729 direction: Direction,
3730 edge_constraint_var: Option<String>,
3731 ) -> Self {
3732 Self {
3733 input,
3734 src_var,
3735 edge_var,
3736 dst_var,
3737 dst_labels,
3738 edge_properties,
3739 edge_types,
3740 direction,
3741 edge_constraint_var,
3742 current_row: None,
3743 pending: Vec::new(),
3744 pending_idx: 0,
3745 }
3746 }
3747}
3748
3749impl Operator for EdgeExpandOp {
3750 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3751 loop {
3752 while self.pending_idx < self.pending.len() {
3753 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3754 self.pending_idx += 1;
3755
3756 let edge = match ctx.store.get_edge(edge_id)? {
3757 Some(e) => e,
3758 None => continue,
3759 };
3760 if !self.edge_types.is_empty()
3761 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3762 {
3763 continue;
3764 }
3765 if let Some(constraint_var) = &self.edge_constraint_var {
3773 let base = self
3774 .current_row
3775 .as_ref()
3776 .expect("pending edges without source row");
3777 let expected = match ctx.lookup_binding(base, constraint_var) {
3778 Some(Value::Edge(e)) => Some(e.id),
3779 _ => None,
3780 };
3781 match expected {
3782 Some(id) if id != edge.id => continue,
3783 None => continue,
3784 _ => {}
3785 }
3786 }
3787 if !self.edge_properties.is_empty() {
3792 let base = self
3793 .current_row
3794 .as_ref()
3795 .expect("pending edges without source row");
3796 let ectx = ctx.eval_ctx(base);
3797 let mut ok = true;
3798 for (key, expr) in &self.edge_properties {
3799 let expected = eval_expr(expr, &ectx)?;
3800 let actual = match edge.properties.get(key) {
3801 Some(v) => Value::Property(v.clone()),
3802 None => {
3803 ok = false;
3804 break;
3805 }
3806 };
3807 if !values_equal(&actual, &expected) {
3808 ok = false;
3809 break;
3810 }
3811 }
3812 if !ok {
3813 continue;
3814 }
3815 }
3816
3817 let neighbor = match ctx.store.get_node(neighbor_id)? {
3818 Some(n) => n,
3819 None => continue,
3820 };
3821 if !has_all_labels(&neighbor, &self.dst_labels) {
3822 continue;
3823 }
3824
3825 let base = self
3826 .current_row
3827 .as_ref()
3828 .expect("pending edges without source row");
3829 let mut out = base.clone();
3830 if let Some(ev) = &self.edge_var {
3831 out.insert(ev.clone(), Value::Edge(edge));
3832 }
3833 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3834 return Ok(Some(out));
3835 }
3836
3837 match self.input.next(ctx)? {
3838 None => return Ok(None),
3839 Some(row) => {
3840 let src_id = match row.get(&self.src_var) {
3841 Some(Value::Node(n)) => n.id,
3842 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3847 continue
3848 }
3849 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3850 };
3851 self.pending = match self.direction {
3852 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3853 Direction::Incoming => ctx.store.incoming(src_id)?,
3854 Direction::Both => {
3855 let mut all = ctx.store.outgoing(src_id)?;
3861 let mut seen: std::collections::HashSet<EdgeId> =
3862 all.iter().map(|(e, _)| *e).collect();
3863 for (e, n) in ctx.store.incoming(src_id)? {
3864 if seen.insert(e) {
3865 all.push((e, n));
3866 }
3867 }
3868 all
3869 }
3870 };
3871 self.pending_idx = 0;
3872 self.current_row = Some(row);
3873 }
3874 }
3875 }
3876 }
3877}
3878
3879struct OptionalEdgeExpandOp {
3894 input: Box<dyn Operator>,
3895 src_var: String,
3896 edge_var: Option<String>,
3897 dst_var: String,
3898 dst_labels: Vec<String>,
3899 dst_properties: Vec<(String, Expr)>,
3900 edge_types: Vec<String>,
3901 direction: Direction,
3902 dst_constraint_var: Option<String>,
3908 edge_constraint_var: Option<String>,
3913 current_row: Option<Row>,
3914 pending: Vec<(EdgeId, NodeId)>,
3915 pending_idx: usize,
3916 yielded_for_current: bool,
3917}
3918
3919impl OptionalEdgeExpandOp {
3920 #[allow(clippy::too_many_arguments)]
3921 fn new(
3922 input: Box<dyn Operator>,
3923 src_var: String,
3924 edge_var: Option<String>,
3925 dst_var: String,
3926 dst_labels: Vec<String>,
3927 dst_properties: Vec<(String, Expr)>,
3928 edge_types: Vec<String>,
3929 direction: Direction,
3930 dst_constraint_var: Option<String>,
3931 edge_constraint_var: Option<String>,
3932 ) -> Self {
3933 Self {
3934 input,
3935 src_var,
3936 edge_var,
3937 dst_var,
3938 dst_labels,
3939 dst_properties,
3940 edge_types,
3941 direction,
3942 dst_constraint_var,
3943 edge_constraint_var,
3944 current_row: None,
3945 pending: Vec::new(),
3946 pending_idx: 0,
3947 yielded_for_current: false,
3948 }
3949 }
3950}
3951
3952impl Operator for OptionalEdgeExpandOp {
3953 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3954 loop {
3955 while self.pending_idx < self.pending.len() {
3956 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3957 self.pending_idx += 1;
3958
3959 let edge = match ctx.store.get_edge(edge_id)? {
3960 Some(e) => e,
3961 None => continue,
3962 };
3963 if !self.edge_types.is_empty()
3964 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3965 {
3966 continue;
3967 }
3968 if let Some(constraint_var) = &self.edge_constraint_var {
3974 let base = self
3975 .current_row
3976 .as_ref()
3977 .expect("pending without source row");
3978 let expected = match ctx.lookup_binding(base, constraint_var) {
3979 Some(Value::Edge(e)) => Some(e.id),
3980 _ => None,
3981 };
3982 match expected {
3983 Some(id) if id != edge.id => continue,
3984 None => continue,
3985 _ => {}
3986 }
3987 }
3988
3989 let neighbor = match ctx.store.get_node(neighbor_id)? {
3990 Some(n) => n,
3991 None => continue,
3992 };
3993 if !has_all_labels(&neighbor, &self.dst_labels) {
3994 continue;
3995 }
3996 if let Some(constraint_var) = &self.dst_constraint_var {
4003 let base = self
4004 .current_row
4005 .as_ref()
4006 .expect("pending without source row");
4007 let bound_id = match base.get(constraint_var) {
4008 Some(Value::Node(n)) => Some(n.id),
4009 Some(Value::Null)
4010 | Some(Value::Property(meshdb_core::Property::Null))
4011 | None => None,
4012 _ => None,
4013 };
4014 match bound_id {
4015 Some(id) if id != neighbor.id => continue,
4016 None => continue,
4017 _ => {}
4018 }
4019 }
4020 if !self.dst_properties.is_empty() {
4021 let base = self
4022 .current_row
4023 .as_ref()
4024 .expect("pending without source row");
4025 let ectx = ctx.eval_ctx(base);
4026 let mut props_ok = true;
4027 for (key, expr) in &self.dst_properties {
4028 let expected = eval_expr(expr, &ectx)?;
4029 let actual = neighbor
4030 .properties
4031 .get(key)
4032 .cloned()
4033 .map(Value::Property)
4034 .unwrap_or(Value::Null);
4035 if !values_equal(&expected, &actual) {
4036 props_ok = false;
4037 break;
4038 }
4039 }
4040 if !props_ok {
4041 continue;
4042 }
4043 }
4044
4045 let base = self
4046 .current_row
4047 .as_ref()
4048 .expect("pending edges without source row");
4049 let mut out = base.clone();
4050 if let Some(ev) = &self.edge_var {
4051 out.insert(ev.clone(), Value::Edge(edge));
4052 }
4053 out.insert(self.dst_var.clone(), Value::Node(neighbor));
4054 self.yielded_for_current = true;
4055 return Ok(Some(out));
4056 }
4057
4058 if let Some(base) = self.current_row.take() {
4068 if !self.yielded_for_current {
4069 let mut out = base;
4070 if let Some(ev) = &self.edge_var {
4071 let preserve = self
4072 .edge_constraint_var
4073 .as_ref()
4074 .map(|c| c == ev)
4075 .unwrap_or(false);
4076 if !preserve {
4077 out.insert(ev.clone(), Value::Null);
4078 }
4079 }
4080 let preserve_dst = self
4081 .dst_constraint_var
4082 .as_ref()
4083 .map(|c| c == &self.dst_var)
4084 .unwrap_or(false);
4085 if !preserve_dst {
4086 out.insert(self.dst_var.clone(), Value::Null);
4087 }
4088 self.yielded_for_current = true;
4089 return Ok(Some(out));
4090 }
4091 }
4092
4093 match self.input.next(ctx)? {
4094 None => return Ok(None),
4095 Some(row) => {
4096 let src_id = match row.get(&self.src_var) {
4097 Some(Value::Node(n)) => n.id,
4098 Some(Value::Null) => {
4105 self.pending = Vec::new();
4106 self.pending_idx = 0;
4107 self.yielded_for_current = false;
4108 self.current_row = Some(row);
4109 continue;
4110 }
4111 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4112 };
4113 self.pending = match self.direction {
4114 Direction::Outgoing => ctx.store.outgoing(src_id)?,
4115 Direction::Incoming => ctx.store.incoming(src_id)?,
4116 Direction::Both => {
4117 let mut all = ctx.store.outgoing(src_id)?;
4123 let mut seen: std::collections::HashSet<EdgeId> =
4124 all.iter().map(|(e, _)| *e).collect();
4125 for (e, n) in ctx.store.incoming(src_id)? {
4126 if seen.insert(e) {
4127 all.push((e, n));
4128 }
4129 }
4130 all
4131 }
4132 };
4133 self.pending_idx = 0;
4134 self.yielded_for_current = false;
4135 self.current_row = Some(row);
4136 }
4137 }
4138 }
4139 }
4140}
4141
4142struct VarLengthExpandOp {
4143 input: Box<dyn Operator>,
4144 src_var: String,
4145 edge_var: Option<String>,
4146 dst_var: String,
4147 dst_labels: Vec<String>,
4148 edge_types: Vec<String>,
4149 edge_properties: Vec<(String, Expr)>,
4155 direction: Direction,
4156 min_hops: u64,
4157 max_hops: u64,
4158 path_var: Option<String>,
4159 optional: bool,
4165 dst_constraint_var: Option<String>,
4172 bound_edge_list_var: Option<String>,
4176 excluded_edge_vars: Vec<String>,
4184 current_row: Option<Row>,
4185 pending_paths: Vec<Vec<Edge>>,
4186 pending_node_paths: Vec<Vec<NodeId>>,
4187 pending_targets: Vec<NodeId>,
4188 pending_idx: usize,
4189}
4190
4191impl VarLengthExpandOp {
4192 #[allow(clippy::too_many_arguments)]
4193 fn new(
4194 input: Box<dyn Operator>,
4195 src_var: String,
4196 edge_var: Option<String>,
4197 dst_var: String,
4198 dst_labels: Vec<String>,
4199 edge_types: Vec<String>,
4200 edge_properties: Vec<(String, Expr)>,
4201 direction: Direction,
4202 min_hops: u64,
4203 max_hops: u64,
4204 path_var: Option<String>,
4205 optional: bool,
4206 dst_constraint_var: Option<String>,
4207 bound_edge_list_var: Option<String>,
4208 excluded_edge_vars: Vec<String>,
4209 ) -> Self {
4210 Self {
4211 input,
4212 src_var,
4213 edge_var,
4214 dst_var,
4215 dst_labels,
4216 edge_types,
4217 edge_properties,
4218 direction,
4219 min_hops,
4220 max_hops,
4221 path_var,
4222 optional,
4223 dst_constraint_var,
4224 bound_edge_list_var,
4225 excluded_edge_vars,
4226 current_row: None,
4227 pending_paths: Vec::new(),
4228 pending_node_paths: Vec::new(),
4229 pending_targets: Vec::new(),
4230 pending_idx: 0,
4231 }
4232 }
4233
4234 fn enumerate(
4235 &self,
4236 ctx: &ExecCtx,
4237 start: NodeId,
4238 input_row: &Row,
4239 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4240 let mut paths: Vec<Vec<Edge>> = Vec::new();
4241 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4242 let mut targets: Vec<NodeId> = Vec::new();
4243 let mut edge_buf: Vec<Edge> = Vec::new();
4244 let mut node_buf: Vec<NodeId> = vec![start];
4245 let mut used: HashSet<EdgeId> = HashSet::new();
4252 for var in &self.excluded_edge_vars {
4253 match ctx.lookup_binding(input_row, var) {
4254 Some(Value::Edge(e)) => {
4255 used.insert(e.id);
4256 }
4257 Some(Value::List(items)) => {
4258 for item in items {
4259 if let Value::Edge(e) = item {
4260 used.insert(e.id);
4261 }
4262 }
4263 }
4264 _ => {}
4265 }
4266 }
4267 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4271 Vec::new()
4272 } else {
4273 let ectx = ctx.eval_ctx(input_row);
4274 self.edge_properties
4275 .iter()
4276 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4277 .collect::<Result<Vec<_>>>()?
4278 };
4279 self.dfs(
4280 ctx,
4281 start,
4282 &expected_edge_props,
4283 &mut edge_buf,
4284 &mut node_buf,
4285 &mut used,
4286 &mut paths,
4287 &mut node_paths,
4288 &mut targets,
4289 )?;
4290 Ok((paths, node_paths, targets))
4291 }
4292
4293 #[allow(clippy::too_many_arguments)]
4294 fn dfs(
4295 &self,
4296 ctx: &ExecCtx,
4297 current_node: NodeId,
4298 expected_edge_props: &[(String, Value)],
4299 edge_buf: &mut Vec<Edge>,
4300 node_buf: &mut Vec<NodeId>,
4301 used: &mut HashSet<EdgeId>,
4302 out_paths: &mut Vec<Vec<Edge>>,
4303 out_node_paths: &mut Vec<Vec<NodeId>>,
4304 out_targets: &mut Vec<NodeId>,
4305 ) -> Result<()> {
4306 let depth = edge_buf.len() as u64;
4307
4308 if depth >= self.min_hops && depth <= self.max_hops {
4309 let terminal_ok = match ctx.store.get_node(current_node)? {
4310 Some(node) => has_all_labels(&node, &self.dst_labels),
4311 None => false,
4312 };
4313 if terminal_ok {
4314 out_paths.push(edge_buf.clone());
4315 out_node_paths.push(node_buf.clone());
4316 out_targets.push(current_node);
4317 }
4318 }
4319
4320 if depth >= self.max_hops {
4321 return Ok(());
4322 }
4323
4324 let neighbors = match self.direction {
4325 Direction::Outgoing => ctx.store.outgoing(current_node)?,
4326 Direction::Incoming => ctx.store.incoming(current_node)?,
4327 Direction::Both => {
4328 let mut all = ctx.store.outgoing(current_node)?;
4329 all.extend(ctx.store.incoming(current_node)?);
4330 all
4331 }
4332 };
4333
4334 for (eid, neighbor_id) in neighbors {
4335 if used.contains(&eid) {
4336 continue;
4337 }
4338 let edge = match ctx.store.get_edge(eid)? {
4339 Some(e) => e,
4340 None => continue,
4341 };
4342 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4343 {
4344 continue;
4345 }
4346 if !expected_edge_props.is_empty() {
4351 let mut ok = true;
4352 for (key, expected) in expected_edge_props {
4353 let actual = match edge.properties.get(key) {
4354 Some(v) => Value::Property(v.clone()),
4355 None => {
4356 ok = false;
4357 break;
4358 }
4359 };
4360 if !values_equal(&actual, expected) {
4361 ok = false;
4362 break;
4363 }
4364 }
4365 if !ok {
4366 continue;
4367 }
4368 }
4369 used.insert(eid);
4370 edge_buf.push(edge);
4371 node_buf.push(neighbor_id);
4372 self.dfs(
4373 ctx,
4374 neighbor_id,
4375 expected_edge_props,
4376 edge_buf,
4377 node_buf,
4378 used,
4379 out_paths,
4380 out_node_paths,
4381 out_targets,
4382 )?;
4383 edge_buf.pop();
4384 node_buf.pop();
4385 used.remove(&eid);
4386 }
4387
4388 Ok(())
4389 }
4390}
4391
4392fn replay_edge_list(
4410 ctx: &ExecCtx,
4411 row: &Row,
4412 list_var: &str,
4413 src_id: Option<NodeId>,
4414 direction: Direction,
4415 edge_types: &[String],
4416) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4417 let start = match src_id {
4418 Some(id) => id,
4419 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4420 };
4421 let list = match ctx.lookup_binding(row, list_var) {
4422 Some(Value::List(items)) => items.clone(),
4423 Some(Value::Property(meshdb_core::Property::List(items))) => items
4424 .iter()
4425 .cloned()
4426 .map(Value::Property)
4427 .collect::<Vec<_>>(),
4428 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4429 };
4430 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4431 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4432 node_buf.push(start);
4433 let mut current = start;
4434 for item in list {
4435 let edge = match item {
4436 Value::Edge(e) => e,
4437 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4438 };
4439 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4440 return Ok((Vec::new(), Vec::new(), Vec::new()));
4441 }
4442 let next_node = match direction {
4443 Direction::Outgoing => {
4444 if edge.source != current {
4445 return Ok((Vec::new(), Vec::new(), Vec::new()));
4446 }
4447 edge.target
4448 }
4449 Direction::Incoming => {
4450 if edge.target != current {
4451 return Ok((Vec::new(), Vec::new(), Vec::new()));
4452 }
4453 edge.source
4454 }
4455 Direction::Both => {
4456 if edge.source == current {
4457 edge.target
4458 } else if edge.target == current {
4459 edge.source
4460 } else {
4461 return Ok((Vec::new(), Vec::new(), Vec::new()));
4462 }
4463 }
4464 };
4465 if ctx.store.get_node(next_node)?.is_none() {
4469 return Ok((Vec::new(), Vec::new(), Vec::new()));
4470 }
4471 edge_buf.push(edge);
4472 node_buf.push(next_node);
4473 current = next_node;
4474 }
4475 Ok((vec![edge_buf], vec![node_buf], vec![current]))
4476}
4477
4478impl Operator for VarLengthExpandOp {
4479 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4480 loop {
4481 while self.pending_idx < self.pending_paths.len() {
4482 let i = self.pending_idx;
4483 self.pending_idx += 1;
4484
4485 let target_id = self.pending_targets[i];
4486 let target = match ctx.store.get_node(target_id)? {
4487 Some(n) => n,
4488 None => continue,
4489 };
4490
4491 let base = self
4492 .current_row
4493 .as_ref()
4494 .expect("pending without source row");
4495 let mut out = base.clone();
4496 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4497 if let Some(ev) = &self.edge_var {
4498 let edges: Vec<Value> = self.pending_paths[i]
4499 .iter()
4500 .cloned()
4501 .map(Value::Edge)
4502 .collect();
4503 out.insert(ev.clone(), Value::List(edges));
4504 }
4505 if let Some(pv) = &self.path_var {
4506 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4507 for nid in &self.pending_node_paths[i] {
4508 match ctx.store.get_node(*nid)? {
4509 Some(n) => nodes.push(n),
4510 None => continue,
4511 }
4512 }
4513 let edges = self.pending_paths[i].clone();
4514 out.insert(pv.clone(), Value::Path { nodes, edges });
4515 }
4516 return Ok(Some(out));
4517 }
4518
4519 match self.input.next(ctx)? {
4520 None => return Ok(None),
4521 Some(row) => {
4522 let src_id = match row.get(&self.src_var) {
4523 Some(Value::Node(n)) => Some(n.id),
4524 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4530 None
4531 }
4532 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4533 };
4534 let (mut paths, mut node_paths, mut targets) =
4542 if let Some(list_var) = &self.bound_edge_list_var {
4543 replay_edge_list(
4544 ctx,
4545 &row,
4546 list_var,
4547 src_id,
4548 self.direction,
4549 &self.edge_types,
4550 )?
4551 } else {
4552 match src_id {
4553 Some(id) => self.enumerate(ctx, id, &row)?,
4554 None => (Vec::new(), Vec::new(), Vec::new()),
4555 }
4556 };
4557 if let Some(constraint_var) = &self.dst_constraint_var {
4564 let target_id = match row.get(constraint_var) {
4565 Some(Value::Node(n)) => Some(n.id),
4566 _ => None,
4567 };
4568 match target_id {
4569 Some(id) => {
4570 let mut kept_paths = Vec::new();
4571 let mut kept_node_paths = Vec::new();
4572 let mut kept_targets = Vec::new();
4573 for ((p, np), t) in paths
4574 .drain(..)
4575 .zip(node_paths.drain(..))
4576 .zip(targets.drain(..))
4577 {
4578 if t == id {
4579 kept_paths.push(p);
4580 kept_node_paths.push(np);
4581 kept_targets.push(t);
4582 }
4583 }
4584 paths = kept_paths;
4585 node_paths = kept_node_paths;
4586 targets = kept_targets;
4587 }
4588 None => {
4589 paths.clear();
4590 node_paths.clear();
4591 targets.clear();
4592 }
4593 }
4594 }
4595 if paths.is_empty() && self.optional {
4596 let mut out = row;
4601 if let Some(ev) = &self.edge_var {
4602 out.insert(ev.clone(), Value::Null);
4603 }
4604 out.insert(self.dst_var.clone(), Value::Null);
4605 if let Some(pv) = &self.path_var {
4606 out.insert(pv.clone(), Value::Null);
4607 }
4608 return Ok(Some(out));
4609 }
4610 self.pending_paths = paths;
4611 self.pending_node_paths = node_paths;
4612 self.pending_targets = targets;
4613 self.pending_idx = 0;
4614 self.current_row = Some(row);
4615 }
4616 }
4617 }
4618 }
4619}
4620
4621struct FilterOp {
4622 input: Box<dyn Operator>,
4623 predicate: Expr,
4624}
4625
4626impl FilterOp {
4627 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4628 Self { input, predicate }
4629 }
4630}
4631
4632impl Operator for FilterOp {
4633 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4634 while let Some(row) = self.input.next(ctx)? {
4635 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4636 Ok(v) => v,
4637 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4640 Err(e) => return Err(e),
4641 };
4642 if to_bool(&v).unwrap_or(false) {
4643 return Ok(Some(row));
4644 }
4645 }
4646 Ok(None)
4647 }
4648}
4649
4650struct IdentityOp {
4653 input: Box<dyn Operator>,
4654}
4655
4656impl IdentityOp {
4657 fn new(input: Box<dyn Operator>) -> Self {
4658 Self { input }
4659 }
4660}
4661
4662impl Operator for IdentityOp {
4663 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4664 self.input.next(ctx)
4665 }
4666}
4667
4668struct CoalesceNullRowOp {
4674 input: Box<dyn Operator>,
4675 null_vars: Vec<String>,
4676 produced_any: bool,
4677 done: bool,
4678}
4679
4680impl CoalesceNullRowOp {
4681 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
4682 Self {
4683 input,
4684 null_vars,
4685 produced_any: false,
4686 done: false,
4687 }
4688 }
4689}
4690
4691impl Operator for CoalesceNullRowOp {
4692 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4693 if self.done {
4694 return Ok(None);
4695 }
4696 match self.input.next(ctx)? {
4697 Some(row) => {
4698 self.produced_any = true;
4699 Ok(Some(row))
4700 }
4701 None => {
4702 self.done = true;
4703 if self.produced_any {
4704 Ok(None)
4705 } else {
4706 let mut row = Row::new();
4707 for v in &self.null_vars {
4708 row.insert(v.clone(), Value::Null);
4709 }
4710 Ok(Some(row))
4711 }
4712 }
4713 }
4714 }
4715}
4716
4717struct ProjectOp {
4718 input: Box<dyn Operator>,
4719 items: Vec<ReturnItem>,
4720}
4721
4722impl ProjectOp {
4723 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4724 Self { input, items }
4725 }
4726}
4727
4728impl Operator for ProjectOp {
4729 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4730 match self.input.next(ctx)? {
4731 Some(row) => {
4732 let mut out = Row::new();
4733 for (i, item) in self.items.iter().enumerate() {
4734 let name = item.alias.clone().unwrap_or_else(|| {
4735 item.raw_text
4736 .clone()
4737 .unwrap_or_else(|| default_name(&item.expr, i))
4738 });
4739 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4740 out.insert(name, value);
4741 }
4742 Ok(Some(out))
4743 }
4744 None => Ok(None),
4745 }
4746 }
4747}
4748
4749fn default_name(expr: &Expr, idx: usize) -> String {
4750 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4751}
4752
4753fn render_expr_name(expr: &Expr) -> Option<String> {
4754 Some(match expr {
4755 Expr::Identifier(s) => s.clone(),
4756 Expr::Property { var, key } => format!("{var}.{key}"),
4757 Expr::PropertyAccess { base, key } => {
4758 if matches!(
4762 base.as_ref(),
4763 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4764 ) {
4765 format!("({}).{key}", render_expr_name(base)?)
4766 } else {
4767 format!("{}.{key}", render_expr_name(base)?)
4768 }
4769 }
4770 Expr::Parameter(name) => format!("${name}"),
4771 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4772 Expr::Literal(Literal::Integer(i)) => i.to_string(),
4773 Expr::Literal(Literal::Float(f)) => f.to_string(),
4774 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4775 Expr::Literal(Literal::Null) => "NULL".into(),
4776 Expr::Call { name, args } => {
4777 let arg_str = match args {
4778 CallArgs::Star => "*".into(),
4779 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4780 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4781 "DISTINCT "
4782 } else {
4783 ""
4784 };
4785 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4786 if inner.len() != es.len() {
4787 return None;
4788 }
4789 format!("{prefix}{}", inner.join(", "))
4790 }
4791 };
4792 format!("{name}({arg_str})")
4793 }
4794 Expr::BinaryOp { op, left, right } => {
4795 let op_str = match op {
4796 BinaryOp::Add => " + ",
4797 BinaryOp::Sub => " - ",
4798 BinaryOp::Mul => " * ",
4799 BinaryOp::Div => " / ",
4800 BinaryOp::Mod => " % ",
4801 BinaryOp::Pow => " ^ ",
4802 };
4803 format!(
4804 "{}{op_str}{}",
4805 render_expr_name(left)?,
4806 render_expr_name(right)?
4807 )
4808 }
4809 Expr::UnaryOp { op, operand } => {
4810 let op_str = match op {
4811 UnaryOp::Neg => "-",
4812 };
4813 format!("{op_str}{}", render_expr_name(operand)?)
4814 }
4815 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4816 Expr::IsNull { negated, inner } => {
4817 if *negated {
4818 format!("{} IS NOT NULL", render_expr_name(inner)?)
4819 } else {
4820 format!("{} IS NULL", render_expr_name(inner)?)
4821 }
4822 }
4823 Expr::Compare { op, left, right } => {
4824 let op_str = match op {
4825 CompareOp::Eq => " = ",
4826 CompareOp::Ne => " <> ",
4827 CompareOp::Lt => " < ",
4828 CompareOp::Le => " <= ",
4829 CompareOp::Gt => " > ",
4830 CompareOp::Ge => " >= ",
4831 CompareOp::StartsWith => " STARTS WITH ",
4832 CompareOp::EndsWith => " ENDS WITH ",
4833 CompareOp::Contains => " CONTAINS ",
4834 CompareOp::RegexMatch => " =~ ",
4835 };
4836 format!(
4837 "{}{op_str}{}",
4838 render_expr_name(left)?,
4839 render_expr_name(right)?
4840 )
4841 }
4842 Expr::List(items) => {
4843 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4844 if inner.len() != items.len() {
4845 return None;
4846 }
4847 format!("[{}]", inner.join(", "))
4848 }
4849 Expr::Map(entries) => {
4850 let inner: Vec<String> = entries
4851 .iter()
4852 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4853 .collect::<Option<Vec<_>>>()?;
4854 format!("{{{}}}", inner.join(", "))
4855 }
4856 Expr::IndexAccess { base, index } => {
4857 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4858 }
4859 Expr::InList { element, list } => {
4860 format!(
4861 "{} IN {}",
4862 render_expr_name(element)?,
4863 render_expr_name(list)?
4864 )
4865 }
4866 Expr::HasLabels { expr, labels } => {
4867 let mut s = format!("({}", render_expr_name(expr)?);
4868 for l in labels {
4869 s.push(':');
4870 s.push_str(l);
4871 }
4872 s.push(')');
4873 s
4874 }
4875 _ => return None,
4876 })
4877}
4878
4879struct DistinctOp {
4880 input: Box<dyn Operator>,
4881 seen: HashSet<String>,
4882}
4883
4884impl DistinctOp {
4885 fn new(input: Box<dyn Operator>) -> Self {
4886 Self {
4887 input,
4888 seen: HashSet::new(),
4889 }
4890 }
4891}
4892
4893impl Operator for DistinctOp {
4894 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4895 while let Some(row) = self.input.next(ctx)? {
4896 let key = row_key(&row);
4897 if self.seen.insert(key) {
4898 return Ok(Some(row));
4899 }
4900 }
4901 Ok(None)
4902 }
4903}
4904
4905struct BindPathOp {
4921 input: Box<dyn Operator>,
4922 path_var: String,
4923 node_vars: Vec<String>,
4924 edge_vars: Vec<String>,
4925}
4926
4927impl BindPathOp {
4928 fn new(
4929 input: Box<dyn Operator>,
4930 path_var: String,
4931 node_vars: Vec<String>,
4932 edge_vars: Vec<String>,
4933 ) -> Self {
4934 Self {
4935 input,
4936 path_var,
4937 node_vars,
4938 edge_vars,
4939 }
4940 }
4941}
4942
4943impl Operator for BindPathOp {
4944 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4945 let Some(mut row) = self.input.next(ctx)? else {
4946 return Ok(None);
4947 };
4948 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4952 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4953 let mut abort = false;
4954 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4961 nodes.push(n.clone());
4962 } else {
4963 abort = true;
4964 }
4965 if !abort {
4966 for (i, ev) in self.edge_vars.iter().enumerate() {
4967 match row.get(ev) {
4968 Some(Value::Edge(e)) => {
4969 edges.push(e.clone());
4970 match row.get(&self.node_vars[i + 1]) {
4971 Some(Value::Node(n)) => nodes.push(n.clone()),
4972 _ => {
4973 abort = true;
4974 break;
4975 }
4976 }
4977 }
4978 Some(Value::Path {
4979 nodes: sub_nodes,
4980 edges: sub_edges,
4981 }) => {
4982 edges.extend(sub_edges.iter().cloned());
4988 if sub_nodes.len() > 1 {
4989 nodes.extend(sub_nodes[1..].iter().cloned());
4990 }
4991 }
4992 _ => {
4993 abort = true;
4994 break;
4995 }
4996 }
4997 }
4998 }
4999 if abort {
5000 row.insert(self.path_var.clone(), Value::Null);
5001 } else {
5002 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5003 }
5004 Ok(Some(row))
5005 }
5006}
5007
5008struct ShortestPathOp {
5027 input: Box<dyn Operator>,
5028 src_var: String,
5029 dst_var: String,
5030 path_var: String,
5031 edge_types: Vec<String>,
5032 direction: meshdb_cypher::Direction,
5033 max_hops: u64,
5034 kind: meshdb_cypher::ShortestKind,
5035 pending: std::collections::VecDeque<(Row, Value)>,
5042}
5043
5044impl ShortestPathOp {
5045 #[allow(clippy::too_many_arguments)]
5046 fn new(
5047 input: Box<dyn Operator>,
5048 src_var: String,
5049 dst_var: String,
5050 path_var: String,
5051 edge_types: Vec<String>,
5052 direction: meshdb_cypher::Direction,
5053 max_hops: u64,
5054 kind: meshdb_cypher::ShortestKind,
5055 ) -> Self {
5056 Self {
5057 input,
5058 src_var,
5059 dst_var,
5060 path_var,
5061 edge_types,
5062 direction,
5063 max_hops,
5064 kind,
5065 pending: std::collections::VecDeque::new(),
5066 }
5067 }
5068}
5069
5070impl Operator for ShortestPathOp {
5071 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5072 loop {
5073 if let Some((mut row, path)) = self.pending.pop_front() {
5078 row.insert(self.path_var.clone(), path);
5079 return Ok(Some(row));
5080 }
5081 let Some(row) = self.input.next(ctx)? else {
5082 return Ok(None);
5083 };
5084 let src = match row.get(&self.src_var) {
5085 Some(Value::Node(n)) => n.clone(),
5086 _ => continue,
5087 };
5088 let dst = match row.get(&self.dst_var) {
5089 Some(Value::Node(n)) => n.clone(),
5090 _ => continue,
5091 };
5092 let paths = bfs_shortest_paths(
5093 &src,
5094 &dst,
5095 &self.edge_types,
5096 self.direction,
5097 self.max_hops,
5098 self.kind,
5099 ctx.store,
5100 )?;
5101 if paths.is_empty() {
5102 continue;
5104 }
5105 for path in paths {
5106 self.pending.push_back((row.clone(), path));
5107 }
5108 }
5109 }
5110}
5111
5112fn bfs_shortest_paths(
5131 src: &Node,
5132 dst: &Node,
5133 edge_types: &[String],
5134 direction: meshdb_cypher::Direction,
5135 max_hops: u64,
5136 kind: meshdb_cypher::ShortestKind,
5137 reader: &dyn crate::reader::GraphReader,
5138) -> Result<Vec<Value>> {
5139 use meshdb_cypher::Direction;
5140
5141 if src.id == dst.id {
5142 return Ok(vec![Value::Path {
5143 nodes: vec![src.clone()],
5144 edges: vec![],
5145 }]);
5146 }
5147
5148 let mut dist: HashMap<NodeId, u64> = HashMap::new();
5154 dist.insert(src.id, 0);
5155 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5156
5157 let mut frontier: Vec<NodeId> = vec![src.id];
5158 let mut depth: u64 = 0;
5159 let mut found = false;
5160
5161 while !frontier.is_empty() && depth < max_hops && !found {
5162 let mut next_frontier: Vec<NodeId> = Vec::new();
5163 for node_id in &frontier {
5164 let neighbors = match direction {
5165 Direction::Outgoing => reader.outgoing(*node_id)?,
5166 Direction::Incoming => reader.incoming(*node_id)?,
5167 Direction::Both => {
5168 let mut out = reader.outgoing(*node_id)?;
5169 out.extend(reader.incoming(*node_id)?);
5170 out
5171 }
5172 };
5173 for (edge_id, neighbor_id) in neighbors {
5174 if !edge_types.is_empty() {
5177 let edge = match reader.get_edge(edge_id)? {
5178 Some(e) => e,
5179 None => continue,
5180 };
5181 if !edge_types.iter().any(|t| t == &edge.edge_type) {
5182 continue;
5183 }
5184 }
5185 match dist.get(&neighbor_id) {
5186 Some(&d) if d == depth + 1 => {
5187 parents
5193 .entry(neighbor_id)
5194 .or_default()
5195 .push((*node_id, edge_id));
5196 }
5197 Some(_) => {
5198 }
5202 None => {
5203 dist.insert(neighbor_id, depth + 1);
5204 parents
5205 .entry(neighbor_id)
5206 .or_default()
5207 .push((*node_id, edge_id));
5208 if neighbor_id == dst.id {
5209 found = true;
5210 } else {
5211 next_frontier.push(neighbor_id);
5212 }
5213 }
5214 }
5215 }
5216 }
5217 depth += 1;
5218 if !found {
5219 frontier = next_frontier;
5220 }
5221 }
5222
5223 if !found {
5224 return Ok(Vec::new());
5225 }
5226
5227 let mut out: Vec<Value> = Vec::new();
5231 let mut nodes_rev: Vec<Node> = Vec::new();
5232 let mut edges_rev: Vec<Edge> = Vec::new();
5233 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5234 collect_shortest_paths(
5235 src,
5236 dst,
5237 &parents,
5238 reader,
5239 &mut nodes_rev,
5240 &mut edges_rev,
5241 &mut out,
5242 only_first,
5243 )?;
5244 Ok(out)
5245}
5246
5247#[allow(clippy::too_many_arguments)]
5259fn collect_shortest_paths(
5260 src: &Node,
5261 current: &Node,
5262 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5263 reader: &dyn crate::reader::GraphReader,
5264 nodes_rev: &mut Vec<Node>,
5265 edges_rev: &mut Vec<Edge>,
5266 out: &mut Vec<Value>,
5267 only_first: bool,
5268) -> Result<()> {
5269 if current.id == src.id {
5270 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5275 nodes.push(src.clone());
5276 nodes.extend(nodes_rev.iter().rev().cloned());
5277 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5278 out.push(Value::Path { nodes, edges });
5279 return Ok(());
5280 }
5281 let Some(parent_edges) = parents.get(¤t.id) else {
5282 return Ok(());
5286 };
5287 for (parent_id, edge_id) in parent_edges {
5288 if only_first && !out.is_empty() {
5289 return Ok(());
5290 }
5291 let edge = reader
5292 .get_edge(*edge_id)?
5293 .expect("BFS inserted this edge id; it must still exist");
5294 let parent_node = reader
5295 .get_node(*parent_id)?
5296 .expect("BFS visited this node id; it must still exist");
5297 nodes_rev.push(current.clone());
5298 edges_rev.push(edge);
5299 collect_shortest_paths(
5300 src,
5301 &parent_node,
5302 parents,
5303 reader,
5304 nodes_rev,
5305 edges_rev,
5306 out,
5307 only_first,
5308 )?;
5309 nodes_rev.pop();
5310 edges_rev.pop();
5311 }
5312 Ok(())
5313}
5314
5315struct UnionOp {
5324 branches: Vec<Box<dyn Operator>>,
5325 current: usize,
5326 seen: Option<HashSet<String>>,
5327}
5328
5329impl UnionOp {
5330 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5331 Self {
5332 branches,
5333 current: 0,
5334 seen: if all { None } else { Some(HashSet::new()) },
5335 }
5336 }
5337}
5338
5339impl Operator for UnionOp {
5340 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5341 while self.current < self.branches.len() {
5342 match self.branches[self.current].next(ctx)? {
5343 Some(row) => {
5344 if let Some(seen) = self.seen.as_mut() {
5345 let key = row_key(&row);
5346 if !seen.insert(key) {
5347 continue;
5348 }
5349 }
5350 return Ok(Some(row));
5351 }
5352 None => {
5353 self.current += 1;
5354 }
5355 }
5356 }
5357 Ok(None)
5358 }
5359}
5360
5361struct OrderByOp {
5362 input: Box<dyn Operator>,
5363 sort_items: Vec<SortItem>,
5364 sorted: Option<Vec<Row>>,
5365 cursor: usize,
5366}
5367
5368impl OrderByOp {
5369 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5370 Self {
5371 input,
5372 sort_items,
5373 sorted: None,
5374 cursor: 0,
5375 }
5376 }
5377}
5378
5379impl Operator for OrderByOp {
5380 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5381 if self.sorted.is_none() {
5382 let mut rows: Vec<Row> = Vec::new();
5383 while let Some(row) = self.input.next(ctx)? {
5384 rows.push(row);
5385 }
5386 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5387 for row in rows {
5388 let mut keys = Vec::with_capacity(self.sort_items.len());
5389 for item in &self.sort_items {
5390 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5391 }
5392 keyed.push((keys, row));
5393 }
5394 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5395 keyed.sort_by(|a, b| {
5396 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5397 let ord = compare_values(va, vb);
5398 let ord = if descs[i] { ord.reverse() } else { ord };
5399 if ord != Ordering::Equal {
5400 return ord;
5401 }
5402 }
5403 Ordering::Equal
5404 });
5405 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5406 }
5407 let rows = self.sorted.as_ref().unwrap();
5408 if self.cursor < rows.len() {
5409 let row = rows[self.cursor].clone();
5410 self.cursor += 1;
5411 Ok(Some(row))
5412 } else {
5413 Ok(None)
5414 }
5415 }
5416}
5417
5418struct AggregateOp {
5419 input: Box<dyn Operator>,
5420 group_keys: Vec<ReturnItem>,
5421 aggregates: Vec<AggregateSpec>,
5422 results: Option<Vec<Row>>,
5423 cursor: usize,
5424}
5425
5426impl AggregateOp {
5427 fn new(
5428 input: Box<dyn Operator>,
5429 group_keys: Vec<ReturnItem>,
5430 aggregates: Vec<AggregateSpec>,
5431 ) -> Self {
5432 Self {
5433 input,
5434 group_keys,
5435 aggregates,
5436 results: None,
5437 cursor: 0,
5438 }
5439 }
5440
5441 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5442 let mut groups: HashMap<String, GroupState> = HashMap::new();
5443 let mut order: Vec<String> = Vec::new();
5444
5445 let mut saw_any = false;
5448
5449 while let Some(row) = self.input.next(ctx)? {
5450 saw_any = true;
5451 let mut key_values = Vec::with_capacity(self.group_keys.len());
5452 for item in &self.group_keys {
5453 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5454 }
5455 let mut hash_key = String::new();
5456 for v in &key_values {
5457 hash_key.push_str(&value_key(v));
5458 hash_key.push('|');
5459 }
5460 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5461 order.push(hash_key.clone());
5462 GroupState {
5463 key_values: key_values.clone(),
5464 agg_states: self
5465 .aggregates
5466 .iter()
5467 .map(|a| AggState::initial(a.function))
5468 .collect(),
5469 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5470 }
5471 });
5472 for (i, spec) in self.aggregates.iter().enumerate() {
5473 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5474 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5475 if matches!(v, Value::Null) {
5476 continue;
5477 }
5478 let key = value_key(&v);
5479 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5480 if !seen.insert(key) {
5481 continue;
5482 }
5483 }
5484 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5485 if let Some(extra_expr) = &spec.extra_arg {
5489 let need_resolve = matches!(
5490 &entry.agg_states[i],
5491 AggState::PercentileDisc {
5492 percentile: None,
5493 ..
5494 } | AggState::PercentileCont {
5495 percentile: None,
5496 ..
5497 }
5498 );
5499 if need_resolve {
5500 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5501 let p = match pv {
5502 Value::Property(Property::Float64(f)) => f,
5503 Value::Property(Property::Int64(i)) => i as f64,
5504 _ => 0.0,
5505 };
5506 if !(0.0..=1.0).contains(&p) || p.is_nan() {
5510 return Err(Error::Procedure(format!("percentile out of range: {p}")));
5511 }
5512 match &mut entry.agg_states[i] {
5513 AggState::PercentileDisc { percentile, .. }
5514 | AggState::PercentileCont { percentile, .. } => {
5515 *percentile = Some(p);
5516 }
5517 _ => {}
5518 }
5519 }
5520 }
5521 }
5522 }
5523
5524 let mut out = Vec::new();
5525 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
5526 let mut row = Row::new();
5528 for spec in &self.aggregates {
5529 row.insert(
5530 spec.alias.clone(),
5531 AggState::initial(spec.function).finalize(),
5532 );
5533 }
5534 out.push(row);
5535 } else {
5536 for key in order {
5537 let state = groups.remove(&key).unwrap();
5538 let mut row = Row::new();
5539 for (i, item) in self.group_keys.iter().enumerate() {
5540 let name = item
5541 .alias
5542 .clone()
5543 .unwrap_or_else(|| default_name(&item.expr, i));
5544 row.insert(name, state.key_values[i].clone());
5545 }
5546 for (i, spec) in self.aggregates.iter().enumerate() {
5547 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
5548 }
5549 out.push(row);
5550 }
5551 }
5552 self.results = Some(out);
5553 Ok(())
5554 }
5555}
5556
5557impl Operator for AggregateOp {
5558 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5559 if self.results.is_none() {
5560 self.compute(ctx)?;
5561 }
5562 let rows = self.results.as_ref().unwrap();
5563 if self.cursor < rows.len() {
5564 let row = rows[self.cursor].clone();
5565 self.cursor += 1;
5566 Ok(Some(row))
5567 } else {
5568 Ok(None)
5569 }
5570 }
5571}
5572
5573struct GroupState {
5574 key_values: Vec<Value>,
5575 agg_states: Vec<AggState>,
5576 distinct_seen: Vec<Option<HashSet<String>>>,
5577}
5578
5579enum AggState {
5580 Count(i64),
5581 Sum {
5582 int_part: i64,
5583 float_part: f64,
5584 is_float: bool,
5585 },
5586 Avg {
5587 total: f64,
5588 count: i64,
5589 },
5590 Min(Option<Value>),
5591 Max(Option<Value>),
5592 Collect(Vec<Value>),
5593 StDev {
5594 sum: f64,
5595 sum_sq: f64,
5596 count: i64,
5597 },
5598 StDevP {
5599 sum: f64,
5600 sum_sq: f64,
5601 count: i64,
5602 },
5603 PercentileDisc {
5604 items: Vec<Value>,
5605 percentile: Option<f64>,
5606 },
5607 PercentileCont {
5608 items: Vec<Value>,
5609 percentile: Option<f64>,
5610 },
5611}
5612
5613impl AggState {
5614 fn initial(func: AggregateFn) -> Self {
5615 match func {
5616 AggregateFn::Count => AggState::Count(0),
5617 AggregateFn::Sum => AggState::Sum {
5618 int_part: 0,
5619 float_part: 0.0,
5620 is_float: false,
5621 },
5622 AggregateFn::Avg => AggState::Avg {
5623 total: 0.0,
5624 count: 0,
5625 },
5626 AggregateFn::Min => AggState::Min(None),
5627 AggregateFn::Max => AggState::Max(None),
5628 AggregateFn::Collect => AggState::Collect(Vec::new()),
5629 AggregateFn::StDev => AggState::StDev {
5630 sum: 0.0,
5631 sum_sq: 0.0,
5632 count: 0,
5633 },
5634 AggregateFn::StDevP => AggState::StDevP {
5635 sum: 0.0,
5636 sum_sq: 0.0,
5637 count: 0,
5638 },
5639 AggregateFn::PercentileDisc => AggState::PercentileDisc {
5640 items: Vec::new(),
5641 percentile: None,
5642 },
5643 AggregateFn::PercentileCont => AggState::PercentileCont {
5644 items: Vec::new(),
5645 percentile: None,
5646 },
5647 }
5648 }
5649
5650 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
5651 match self {
5652 AggState::Count(c) => match arg {
5653 AggregateArg::Star => *c += 1,
5654 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
5655 if !matches!(eval_expr(e, ctx)?, Value::Null) {
5656 *c += 1;
5657 }
5658 }
5659 },
5660 AggState::Sum {
5661 int_part,
5662 float_part,
5663 is_float,
5664 } => {
5665 let v = expr_arg_value(arg, ctx)?;
5666 match v {
5667 Value::Null => {}
5668 Value::Property(Property::Int64(i)) => *int_part += i,
5669 Value::Property(Property::Float64(f)) => {
5670 *float_part += f;
5671 *is_float = true;
5672 }
5673 _ => return Err(Error::AggregateTypeError),
5674 }
5675 }
5676 AggState::Avg { total, count } => {
5677 let v = expr_arg_value(arg, ctx)?;
5678 match v {
5679 Value::Null => {}
5680 Value::Property(Property::Int64(i)) => {
5681 *total += i as f64;
5682 *count += 1;
5683 }
5684 Value::Property(Property::Float64(f)) => {
5685 *total += f;
5686 *count += 1;
5687 }
5688 _ => return Err(Error::AggregateTypeError),
5689 }
5690 }
5691 AggState::Min(slot) => {
5692 let v = expr_arg_value(arg, ctx)?;
5699 if matches!(v, Value::Null | Value::Property(Property::Null)) {
5700 } else {
5702 match slot {
5703 None => *slot = Some(v),
5704 Some(cur) => {
5705 if compare_values(&v, cur) == Ordering::Less {
5706 *cur = v;
5707 }
5708 }
5709 }
5710 }
5711 }
5712 AggState::Max(slot) => {
5713 let v = expr_arg_value(arg, ctx)?;
5714 if matches!(v, Value::Null | Value::Property(Property::Null)) {
5715 } else {
5717 match slot {
5718 None => *slot = Some(v),
5719 Some(cur) => {
5720 if compare_values(&v, cur) == Ordering::Greater {
5721 *cur = v;
5722 }
5723 }
5724 }
5725 }
5726 }
5727 AggState::Collect(items) => {
5728 let v = expr_arg_value(arg, ctx)?;
5729 if !matches!(v, Value::Null) {
5730 items.push(v);
5731 }
5732 }
5733 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5734 let v = expr_arg_value(arg, ctx)?;
5735 if !matches!(v, Value::Null) {
5736 items.push(v);
5737 }
5738 }
5739 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5740 let v = expr_arg_value(arg, ctx)?;
5741 match v {
5742 Value::Null => {}
5743 Value::Property(Property::Int64(i)) => {
5744 let f = i as f64;
5745 *sum += f;
5746 *sum_sq += f * f;
5747 *count += 1;
5748 }
5749 Value::Property(Property::Float64(f)) => {
5750 *sum += f;
5751 *sum_sq += f * f;
5752 *count += 1;
5753 }
5754 _ => return Err(Error::AggregateTypeError),
5755 }
5756 }
5757 }
5758 Ok(())
5759 }
5760
5761 fn finalize(&self) -> Value {
5762 match self {
5763 AggState::Count(c) => Value::Property(Property::Int64(*c)),
5764 AggState::Sum {
5765 int_part,
5766 float_part,
5767 is_float,
5768 } => {
5769 if *is_float {
5770 Value::Property(Property::Float64(*float_part + *int_part as f64))
5771 } else {
5772 Value::Property(Property::Int64(*int_part))
5773 }
5774 }
5775 AggState::Avg { total, count } => {
5776 if *count == 0 {
5777 Value::Null
5778 } else {
5779 Value::Property(Property::Float64(*total / *count as f64))
5780 }
5781 }
5782 AggState::Min(slot) | AggState::Max(slot) => match slot {
5783 Some(v) => v.clone(),
5784 None => Value::Null,
5785 },
5786 AggState::Collect(items) => Value::List(items.clone()),
5787 AggState::StDevP { sum, sum_sq, count } => {
5788 if *count == 0 {
5789 Value::Property(Property::Float64(0.0))
5790 } else {
5791 let n = *count as f64;
5792 let variance = *sum_sq / n - (*sum / n).powi(2);
5793 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5794 }
5795 }
5796 AggState::StDev { sum, sum_sq, count } => {
5797 if *count < 2 {
5798 Value::Property(Property::Float64(0.0))
5799 } else {
5800 let n = *count as f64;
5801 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5802 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5803 }
5804 }
5805 AggState::PercentileDisc { items, percentile } => {
5806 percentile_disc(items, percentile.unwrap_or(0.0))
5807 }
5808 AggState::PercentileCont { items, percentile } => {
5809 percentile_cont(items, percentile.unwrap_or(0.0))
5810 }
5811 }
5812 }
5813}
5814
5815fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5816 match arg {
5817 AggregateArg::Star => Err(Error::AggregateTypeError),
5818 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5819 }
5820}
5821
5822fn value_to_f64(v: &Value) -> f64 {
5826 match v {
5827 Value::Property(Property::Int64(i)) => *i as f64,
5828 Value::Property(Property::Float64(f)) => *f,
5829 _ => f64::NAN,
5830 }
5831}
5832
5833fn percentile_disc(items: &[Value], p: f64) -> Value {
5838 let mut nums: Vec<(f64, Value)> = items
5839 .iter()
5840 .map(|v| (value_to_f64(v), v.clone()))
5841 .filter(|(f, _)| !f.is_nan())
5842 .collect();
5843 if nums.is_empty() {
5844 return Value::Null;
5845 }
5846 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5847 let p = p.clamp(0.0, 1.0);
5848 let n = nums.len();
5849 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5851 nums[idx.min(n - 1)].1.clone()
5852}
5853
5854fn percentile_cont(items: &[Value], p: f64) -> Value {
5858 let mut nums: Vec<f64> = items
5859 .iter()
5860 .map(value_to_f64)
5861 .filter(|f| !f.is_nan())
5862 .collect();
5863 if nums.is_empty() {
5864 return Value::Null;
5865 }
5866 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5867 let p = p.clamp(0.0, 1.0);
5868 let n = nums.len();
5869 if n == 1 {
5870 return Value::Property(Property::Float64(nums[0]));
5871 }
5872 let pos = p * (n as f64 - 1.0);
5873 let lo = pos.floor() as usize;
5874 let hi = pos.ceil() as usize;
5875 let frac = pos - lo as f64;
5876 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5877 Value::Property(Property::Float64(v))
5878}
5879
5880struct SkipOp {
5881 input: Box<dyn Operator>,
5882 count_expr: Expr,
5883 remaining: Option<i64>,
5884}
5885
5886impl SkipOp {
5887 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5888 Self {
5889 input,
5890 count_expr,
5891 remaining: None,
5892 }
5893 }
5894}
5895
5896impl Operator for SkipOp {
5897 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5898 if self.remaining.is_none() {
5899 let empty = Row::new();
5900 let ectx = ctx.eval_ctx(&empty);
5901 let val = eval_expr(&self.count_expr, &ectx)?;
5902 self.remaining = Some(expr_to_count(val)?);
5903 }
5904 let rem = self.remaining.as_mut().unwrap();
5905 while *rem > 0 {
5906 if self.input.next(ctx)?.is_none() {
5907 return Ok(None);
5908 }
5909 *rem -= 1;
5910 }
5911 self.input.next(ctx)
5912 }
5913}
5914
5915struct LimitOp {
5916 input: Box<dyn Operator>,
5917 count_expr: Expr,
5918 remaining: Option<i64>,
5919}
5920
5921impl LimitOp {
5922 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5923 Self {
5924 input,
5925 count_expr,
5926 remaining: None,
5927 }
5928 }
5929}
5930
5931impl Operator for LimitOp {
5932 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5933 if self.remaining.is_none() {
5934 let empty = Row::new();
5935 let ectx = ctx.eval_ctx(&empty);
5936 let val = eval_expr(&self.count_expr, &ectx)?;
5937 self.remaining = Some(expr_to_count(val)?);
5938 }
5939 let rem = self.remaining.as_mut().unwrap();
5940 if *rem <= 0 {
5941 return Ok(None);
5942 }
5943 match self.input.next(ctx)? {
5944 Some(row) => {
5945 *rem -= 1;
5946 Ok(Some(row))
5947 }
5948 None => Ok(None),
5949 }
5950 }
5951}
5952
5953fn expr_to_count(val: Value) -> Result<i64> {
5954 match val {
5955 Value::Null | Value::Property(Property::Null) => Ok(0),
5956 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5957 _ => Err(Error::TypeMismatch),
5962 }
5963}