1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12 ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13 Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14 ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17 ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18 PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24#[derive(Default)]
30pub struct Tombstones {
31 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36 pub store: &'a dyn GraphReader,
37 pub writer: &'a dyn GraphWriter,
38 pub params: &'a ParamMap,
43 pub procedures: &'a ProcedureRegistry,
48 pub outer_rows: &'a [&'a Row],
56 pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64 fn put_node(&self, _: &Node) -> Result<()> {
65 Ok(())
66 }
67 fn put_edge(&self, _: &Edge) -> Result<()> {
68 Ok(())
69 }
70 fn delete_edge(&self, _: EdgeId) -> Result<()> {
71 Ok(())
72 }
73 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74 Ok(())
75 }
76}
77
78impl<'a> ExecCtx<'a> {
79 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85 where
86 'a: 'b,
87 {
88 EvalCtx {
89 row,
90 params: self.params,
91 reader: self.store,
92 procedures: self.procedures,
93 outer_rows: self.outer_rows,
94 tombstones: self.tombstones,
95 }
96 }
97
98 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104 if let Some(v) = row.get(name) {
105 return Some(v);
106 }
107 for outer in self.outer_rows {
108 if let Some(v) = outer.get(name) {
109 return Some(v);
110 }
111 }
112 None
113 }
114}
115
116pub trait Operator {
117 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124 let params = ParamMap::new();
125 execute_with_reader(
126 plan,
127 store as &dyn GraphReader,
128 store as &dyn GraphWriter,
129 ¶ms,
130 )
131}
132
133pub fn execute_with_writer(
138 plan: &LogicalPlan,
139 store: &RocksDbStorageEngine,
140 writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142 let params = ParamMap::new();
143 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
144}
145
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151 let text = meshdb_cypher::format_plan(plan);
152 let mut row = Row::new();
153 row.insert("plan".to_string(), Value::Property(Property::String(text)));
154 vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158 let result_rows = execute(plan, store)?;
159 let row_count = result_rows.len() as i64;
160 let plan_text = meshdb_cypher::format_plan(plan);
161 let summary = format!("{plan_text}\nRows: {row_count}");
162 let mut row = Row::new();
163 row.insert(
164 "profile".to_string(),
165 Value::Property(Property::String(summary)),
166 );
167 row.insert(
168 "rows".to_string(),
169 Value::Property(Property::Int64(row_count)),
170 );
171 Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175 plan: &LogicalPlan,
176 reader: &dyn GraphReader,
177 writer: &dyn GraphWriter,
178 params: &ParamMap,
179) -> Result<Vec<Row>> {
180 let empty_procs = ProcedureRegistry::new();
181 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184pub fn execute_with_seed(
202 plan: &LogicalPlan,
203 seed: Option<&Row>,
204 reader: &dyn GraphReader,
205 writer: &dyn GraphWriter,
206 params: &ParamMap,
207 procedures: &ProcedureRegistry,
208) -> Result<Vec<Row>> {
209 crate::eval::reset_statement_time();
210 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
211 return Ok(rows);
212 }
213 let suppress_output = is_write_only_plan(plan);
214 let mut op = build_op_inner(plan, seed, &mut None);
215 let tombstones = Tombstones::default();
216 let outer_rows: Vec<&Row> = seed.into_iter().collect();
217 let ctx = ExecCtx {
218 store: reader,
219 writer,
220 params,
221 procedures,
222 outer_rows: &outer_rows,
223 tombstones: &tombstones,
224 };
225 let mut rows = Vec::new();
226 while let Some(row) = op.next(&ctx)? {
227 rows.push(row);
228 }
229 if suppress_output {
230 Ok(Vec::new())
231 } else {
232 Ok(rows)
233 }
234}
235
236pub fn execute_with_in_tx_substitute(
249 plan: &LogicalPlan,
250 in_tx_rows: Vec<Row>,
251 reader: &dyn GraphReader,
252 writer: &dyn GraphWriter,
253 params: &ParamMap,
254 procedures: &ProcedureRegistry,
255) -> Result<Vec<Row>> {
256 crate::eval::reset_statement_time();
257 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
258 return Ok(rows);
259 }
260 let suppress_output = is_write_only_plan(plan);
261 let mut substitute = Some(in_tx_rows);
262 let mut op = build_op_inner(plan, None, &mut substitute);
263 let tombstones = Tombstones::default();
264 let ctx = ExecCtx {
265 store: reader,
266 writer,
267 params,
268 procedures,
269 outer_rows: &[],
270 tombstones: &tombstones,
271 };
272 let mut rows = Vec::new();
273 while let Some(row) = op.next(&ctx)? {
274 rows.push(row);
275 }
276 if suppress_output {
277 Ok(Vec::new())
278 } else {
279 Ok(rows)
280 }
281}
282
283pub fn execute_with_reader_and_procs(
284 plan: &LogicalPlan,
285 reader: &dyn GraphReader,
286 writer: &dyn GraphWriter,
287 params: &ParamMap,
288 procedures: &ProcedureRegistry,
289) -> Result<Vec<Row>> {
290 crate::eval::reset_statement_time();
295 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
300 return Ok(rows);
301 }
302 let suppress_output = is_write_only_plan(plan);
303 let mut op = build_op(plan);
304 let tombstones = Tombstones::default();
305 let ctx = ExecCtx {
306 store: reader,
307 writer,
308 params,
309 procedures,
310 outer_rows: &[],
311 tombstones: &tombstones,
312 };
313 let mut rows = Vec::new();
314 while let Some(row) = op.next(&ctx)? {
315 rows.push(row);
316 }
317 if suppress_output {
318 Ok(Vec::new())
319 } else {
320 Ok(rows)
321 }
322}
323
324fn is_write_only_plan(plan: &LogicalPlan) -> bool {
325 match plan {
329 LogicalPlan::CreatePath { .. }
330 | LogicalPlan::Delete { .. }
331 | LogicalPlan::SetProperty { .. }
332 | LogicalPlan::Remove { .. }
333 | LogicalPlan::Foreach { .. }
334 | LogicalPlan::MergeNode { .. }
335 | LogicalPlan::MergeEdge { .. } => true,
336 _ => false,
337 }
338}
339
340fn plan_contains_writes(plan: &LogicalPlan) -> bool {
347 use LogicalPlan::*;
348 match plan {
349 CreatePath { .. }
351 | Delete { .. }
352 | SetProperty { .. }
353 | Remove { .. }
354 | MergeNode { .. }
355 | MergeEdge { .. }
356 | Foreach { .. } => true,
357
358 Filter { input, .. }
360 | Project { input, .. }
361 | Aggregate { input, .. }
362 | Distinct { input, .. }
363 | OrderBy { input, .. }
364 | Skip { input, .. }
365 | Limit { input, .. }
366 | EdgeExpand { input, .. }
367 | OptionalEdgeExpand { input, .. }
368 | VarLengthExpand { input, .. }
369 | Identity { input, .. }
370 | CoalesceNullRow { input, .. }
371 | UnwindChain { input, .. }
372 | BindPath { input, .. }
373 | ShortestPath { input, .. } => plan_contains_writes(input),
374
375 CartesianProduct { left, right } => {
377 plan_contains_writes(left) || plan_contains_writes(right)
378 }
379
380 Union { branches, .. } => branches.iter().any(plan_contains_writes),
382
383 CallSubquery { input, body } | OptionalApply { input, body, .. } => {
386 plan_contains_writes(input) || plan_contains_writes(body)
387 }
388 CallSubqueryInTransactions { input, .. } => plan_contains_writes(input),
394 ApocPeriodicIterate { .. } => false,
400
401 ProcedureCall { input, .. } | LoadCsv { input, .. } => {
404 input.as_ref().map_or(false, |i| plan_contains_writes(i))
405 }
406
407 _ => false,
409 }
410}
411
412fn try_execute_ddl(
422 plan: &LogicalPlan,
423 reader: &dyn GraphReader,
424 writer: &dyn GraphWriter,
425) -> Result<Option<Vec<Row>>> {
426 match plan {
427 LogicalPlan::CreatePropertyIndex { label, properties } => {
428 writer.create_property_index(label, properties)?;
429 Ok(Some(vec![node_index_ddl_ack_row(
430 "created", label, properties,
431 )]))
432 }
433 LogicalPlan::DropPropertyIndex { label, properties } => {
434 writer.drop_property_index(label, properties)?;
435 Ok(Some(vec![node_index_ddl_ack_row(
436 "dropped", label, properties,
437 )]))
438 }
439 LogicalPlan::CreateEdgePropertyIndex {
440 edge_type,
441 properties,
442 } => {
443 writer.create_edge_property_index(edge_type, properties)?;
444 Ok(Some(vec![edge_index_ddl_ack_row(
445 "created", edge_type, properties,
446 )]))
447 }
448 LogicalPlan::DropEdgePropertyIndex {
449 edge_type,
450 properties,
451 } => {
452 writer.drop_edge_property_index(edge_type, properties)?;
453 Ok(Some(vec![edge_index_ddl_ack_row(
454 "dropped", edge_type, properties,
455 )]))
456 }
457 LogicalPlan::ShowPropertyIndexes => {
458 let mut rows: Vec<Row> = Vec::new();
464 for (label, properties) in reader.list_property_indexes()? {
465 rows.push(show_index_row("NODE", label, properties));
466 }
467 for (edge_type, properties) in reader.list_edge_property_indexes()? {
468 rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
469 }
470 Ok(Some(rows))
471 }
472 LogicalPlan::CreatePointIndex { label, property } => {
473 writer.create_point_index(label, property)?;
474 Ok(Some(vec![point_index_ddl_ack_row(
475 "created", "NODE", label, property,
476 )]))
477 }
478 LogicalPlan::DropPointIndex { label, property } => {
479 writer.drop_point_index(label, property)?;
480 Ok(Some(vec![point_index_ddl_ack_row(
481 "dropped", "NODE", label, property,
482 )]))
483 }
484 LogicalPlan::CreateEdgePointIndex {
485 edge_type,
486 property,
487 } => {
488 writer.create_edge_point_index(edge_type, property)?;
489 Ok(Some(vec![point_index_ddl_ack_row(
490 "created",
491 "RELATIONSHIP",
492 edge_type,
493 property,
494 )]))
495 }
496 LogicalPlan::DropEdgePointIndex {
497 edge_type,
498 property,
499 } => {
500 writer.drop_edge_point_index(edge_type, property)?;
501 Ok(Some(vec![point_index_ddl_ack_row(
502 "dropped",
503 "RELATIONSHIP",
504 edge_type,
505 property,
506 )]))
507 }
508 LogicalPlan::ShowPointIndexes => {
509 let mut rows: Vec<Row> = Vec::new();
516 for (label, property) in reader.list_point_indexes()? {
517 rows.push(show_point_index_row("NODE", label, property));
518 }
519 for (edge_type, property) in reader.list_edge_point_indexes()? {
520 rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
521 }
522 Ok(Some(rows))
523 }
524 LogicalPlan::CreatePropertyConstraint {
525 name,
526 scope,
527 properties,
528 kind,
529 if_not_exists,
530 } => {
531 let storage_kind = match kind {
532 ConstraintKind::Unique => PropertyConstraintKind::Unique,
533 ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
534 ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
535 ConstraintKind::PropertyType(t) => {
536 PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
537 }
538 };
539 let storage_scope = cypher_to_storage_scope(scope);
540 let spec = writer.create_property_constraint(
541 name.as_deref(),
542 &storage_scope,
543 properties,
544 storage_kind,
545 *if_not_exists,
546 )?;
547 Ok(Some(vec![constraint_ack_row("created", &spec)]))
548 }
549 LogicalPlan::DropPropertyConstraint { name, if_exists } => {
550 writer.drop_property_constraint(name, *if_exists)?;
551 let mut row = Row::default();
552 row.insert(
553 "state".into(),
554 Value::Property(Property::String("dropped".into())),
555 );
556 row.insert(
557 "name".into(),
558 Value::Property(Property::String(name.clone())),
559 );
560 Ok(Some(vec![row]))
561 }
562 LogicalPlan::ShowPropertyConstraints => {
563 let specs = reader.list_property_constraints()?;
564 let rows = specs.into_iter().map(constraint_show_row).collect();
565 Ok(Some(rows))
566 }
567 _ => Ok(None),
568 }
569}
570
571fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
576 let mut row = constraint_show_row(spec.clone());
577 row.insert(
578 "state".into(),
579 Value::Property(Property::String(state.into())),
580 );
581 row
582}
583
584fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
590 let mut row = Row::default();
591 row.insert("name".into(), Value::Property(Property::String(spec.name)));
592 let (scope_tag, target) = match spec.scope {
593 meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
594 meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
595 };
596 row.insert(
597 "scope".into(),
598 Value::Property(Property::String(scope_tag.into())),
599 );
600 row.insert("label".into(), Value::Property(Property::String(target)));
605 let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
606 row.insert("properties".into(), Value::Property(Property::List(props)));
607 row.insert(
608 "type".into(),
609 Value::Property(Property::String(spec.kind.as_string())),
610 );
611 row
612}
613
614fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
618 match scope {
619 CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
620 CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
621 }
622}
623
624fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
629 match t {
630 CypherPropertyType::String => StoragePropertyType::String,
631 CypherPropertyType::Integer => StoragePropertyType::Integer,
632 CypherPropertyType::Float => StoragePropertyType::Float,
633 CypherPropertyType::Boolean => StoragePropertyType::Boolean,
634 }
635}
636
637fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
644 let mut row = Row::default();
645 row.insert(
646 "state".into(),
647 Value::Property(Property::String(state.into())),
648 );
649 row.insert(
650 "scope".into(),
651 Value::Property(Property::String("NODE".into())),
652 );
653 row.insert(
654 "label".into(),
655 Value::Property(Property::String(label.into())),
656 );
657 row.insert(
662 "property".into(),
663 Value::Property(Property::String(properties.join(","))),
664 );
665 row.insert("properties".into(), properties_list_value(properties));
666 row
667}
668
669fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
675 let mut row = Row::default();
676 row.insert(
677 "state".into(),
678 Value::Property(Property::String(state.into())),
679 );
680 row.insert(
681 "scope".into(),
682 Value::Property(Property::String("RELATIONSHIP".into())),
683 );
684 row.insert(
685 "label".into(),
686 Value::Property(Property::String(edge_type.into())),
687 );
688 row.insert(
689 "edge_type".into(),
690 Value::Property(Property::String(edge_type.into())),
691 );
692 row.insert(
693 "property".into(),
694 Value::Property(Property::String(properties.join(","))),
695 );
696 row.insert("properties".into(), properties_list_value(properties));
697 row
698}
699
700fn properties_list_value(properties: &[String]) -> Value {
701 Value::List(
702 properties
703 .iter()
704 .map(|p| Value::Property(Property::String(p.clone())))
705 .collect(),
706 )
707}
708
709fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
716 let mut row = Row::default();
717 row.insert(
718 "state".into(),
719 Value::Property(Property::String(state.into())),
720 );
721 row.insert(
722 "scope".into(),
723 Value::Property(Property::String(scope.into())),
724 );
725 row.insert(
726 "type".into(),
727 Value::Property(Property::String("POINT".into())),
728 );
729 row.insert(
730 "label".into(),
731 Value::Property(Property::String(target.into())),
732 );
733 if scope == "RELATIONSHIP" {
734 row.insert(
735 "edge_type".into(),
736 Value::Property(Property::String(target.into())),
737 );
738 }
739 row.insert(
740 "property".into(),
741 Value::Property(Property::String(property.into())),
742 );
743 row
744}
745
746fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
751 let mut row = Row::default();
752 row.insert(
753 "scope".into(),
754 Value::Property(Property::String(scope.into())),
755 );
756 row.insert(
757 "type".into(),
758 Value::Property(Property::String("POINT".into())),
759 );
760 row.insert(
761 "label".into(),
762 Value::Property(Property::String(target.clone())),
763 );
764 if scope == "RELATIONSHIP" {
765 row.insert(
766 "edge_type".into(),
767 Value::Property(Property::String(target)),
768 );
769 }
770 row.insert(
771 "property".into(),
772 Value::Property(Property::String(property)),
773 );
774 row.insert(
775 "state".into(),
776 Value::Property(Property::String("online".into())),
777 );
778 row
779}
780
781fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
789 let mut row = Row::default();
790 row.insert(
791 "scope".into(),
792 Value::Property(Property::String(scope.into())),
793 );
794 row.insert(
795 "label".into(),
796 Value::Property(Property::String(target.clone())),
797 );
798 if scope == "RELATIONSHIP" {
799 row.insert(
800 "edge_type".into(),
801 Value::Property(Property::String(target)),
802 );
803 }
804 row.insert(
805 "property".into(),
806 Value::Property(Property::String(properties.join(","))),
807 );
808 row.insert("properties".into(), properties_list_value(&properties));
809 row.insert(
810 "state".into(),
811 Value::Property(Property::String("online".into())),
812 );
813 row
814}
815
816fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
817 build_op_inner(plan, None, &mut None)
818}
819
820pub(crate) fn build_op_inner(
835 plan: &LogicalPlan,
836 seed: Option<&Row>,
837 in_tx_substitute: &mut Option<Vec<Row>>,
838) -> Box<dyn Operator> {
839 macro_rules! child {
840 ($p:expr) => {
841 build_op_inner($p, seed, in_tx_substitute)
842 };
843 }
844 match plan {
845 LogicalPlan::CreatePath {
846 input,
847 nodes,
848 edges,
849 } => Box::new(CreatePathOp::new(
850 input.as_ref().map(|p| child!(p)),
851 nodes.clone(),
852 edges.clone(),
853 )),
854 LogicalPlan::CartesianProduct { left, right } => {
855 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
856 }
857 LogicalPlan::Delete {
858 input,
859 detach,
860 vars,
861 exprs,
862 } => Box::new(DeleteOp::new(
863 child!(input),
864 *detach,
865 vars.clone(),
866 exprs.clone(),
867 )),
868 LogicalPlan::SetProperty { input, assignments } => {
869 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
870 }
871 LogicalPlan::Remove { input, items } => {
872 Box::new(RemoveOp::new(child!(input), items.clone()))
873 }
874 LogicalPlan::LoadCsv {
875 input,
876 path_expr,
877 var,
878 with_headers,
879 } => Box::new(LoadCsvOp::new(
880 input.as_ref().map(|p| child!(p)),
881 path_expr.clone(),
882 var.clone(),
883 *with_headers,
884 )),
885 LogicalPlan::Foreach {
886 input,
887 var,
888 list_expr,
889 set_assignments,
890 remove_items,
891 } => Box::new(ForeachOp::new(
892 child!(input),
893 var.clone(),
894 list_expr.clone(),
895 set_assignments.clone(),
896 remove_items.clone(),
897 )),
898 LogicalPlan::CallSubquery { input, body } => {
899 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
900 }
901 LogicalPlan::CallSubqueryInTransactions { .. }
902 | LogicalPlan::ApocPeriodicIterate { .. } => {
903 match in_tx_substitute.take() {
920 Some(rows) => Box::new(RowsLiteralOp::new(rows)),
921 None => panic!(
922 "Batched-commit LogicalPlan variant reached `build_op` without a row \
923 substitute — the server-side dispatcher \
924 (execute_cypher_in_tx → execute_call_in_transactions / \
925 execute_apoc_periodic_iterate) must inject one before invoking the \
926 operator pipeline"
927 ),
928 }
929 }
930 LogicalPlan::OptionalApply {
931 input,
932 body,
933 null_vars,
934 } => Box::new(OptionalApplyOp::new(
935 child!(input),
936 (**body).clone(),
937 null_vars.clone(),
938 )),
939 LogicalPlan::ProcedureCall {
940 input,
941 qualified_name,
942 args,
943 yield_spec,
944 standalone,
945 } => Box::new(ProcedureCallOp::new(
946 input.as_ref().map(|p| child!(p)),
947 qualified_name.clone(),
948 args.clone(),
949 yield_spec.clone(),
950 *standalone,
951 )),
952 LogicalPlan::SeedRow => match seed {
953 Some(r) => Box::new(SeededRowOp {
954 row: Some(r.clone()),
955 }),
956 None => Box::new(SeedRowOp { done: false }),
957 },
958 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
959 LogicalPlan::NodeScanByLabels { var, labels } => {
960 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
961 }
962 LogicalPlan::EdgeExpand {
963 input,
964 src_var,
965 edge_var,
966 dst_var,
967 dst_labels,
968 edge_properties,
969 edge_types,
970 direction,
971 edge_constraint_var,
972 } => Box::new(EdgeExpandOp::new(
973 child!(input),
974 src_var.clone(),
975 edge_var.clone(),
976 dst_var.clone(),
977 dst_labels.clone(),
978 edge_properties.clone(),
979 edge_types.clone(),
980 *direction,
981 edge_constraint_var.clone(),
982 )),
983 LogicalPlan::OptionalEdgeExpand {
984 input,
985 src_var,
986 edge_var,
987 dst_var,
988 dst_labels,
989 dst_properties,
990 edge_types,
991 direction,
992 dst_constraint_var,
993 edge_constraint_var,
994 } => Box::new(OptionalEdgeExpandOp::new(
995 child!(input),
996 src_var.clone(),
997 edge_var.clone(),
998 dst_var.clone(),
999 dst_labels.clone(),
1000 dst_properties.clone(),
1001 edge_types.clone(),
1002 *direction,
1003 dst_constraint_var.clone(),
1004 edge_constraint_var.clone(),
1005 )),
1006 LogicalPlan::VarLengthExpand {
1007 input,
1008 src_var,
1009 edge_var,
1010 dst_var,
1011 dst_labels,
1012 edge_types,
1013 edge_properties,
1014 direction,
1015 min_hops,
1016 max_hops,
1017 path_var,
1018 optional,
1019 dst_constraint_var,
1020 bound_edge_list_var,
1021 excluded_edge_vars,
1022 } => Box::new(VarLengthExpandOp::new(
1023 child!(input),
1024 src_var.clone(),
1025 edge_var.clone(),
1026 dst_var.clone(),
1027 dst_labels.clone(),
1028 edge_types.clone(),
1029 edge_properties.clone(),
1030 *direction,
1031 *min_hops,
1032 *max_hops,
1033 path_var.clone(),
1034 *optional,
1035 dst_constraint_var.clone(),
1036 bound_edge_list_var.clone(),
1037 excluded_edge_vars.clone(),
1038 )),
1039 LogicalPlan::Filter { input, predicate } => {
1040 Box::new(FilterOp::new(child!(input), predicate.clone()))
1041 }
1042 LogicalPlan::Project { input, items } => {
1043 Box::new(ProjectOp::new(child!(input), items.clone()))
1044 }
1045 LogicalPlan::Aggregate {
1046 input,
1047 group_keys,
1048 aggregates,
1049 } => Box::new(AggregateOp::new(
1050 child!(input),
1051 group_keys.clone(),
1052 aggregates.clone(),
1053 )),
1054 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
1055 LogicalPlan::CoalesceNullRow { input, null_vars } => {
1056 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
1057 }
1058 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
1059 LogicalPlan::OrderBy { input, sort_items } => {
1060 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
1061 }
1062 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
1063 LogicalPlan::Limit { input, count } => {
1064 let drain_on_complete = plan_contains_writes(input);
1070 Box::new(LimitOp::new(
1071 child!(input),
1072 count.clone(),
1073 drain_on_complete,
1074 ))
1075 }
1076 LogicalPlan::MergeNode {
1077 input,
1078 var,
1079 labels,
1080 properties,
1081 on_create,
1082 on_match,
1083 } => Box::new(MergeNodeOp::new(
1084 input.as_ref().map(|p| child!(p)),
1085 var.clone(),
1086 labels.clone(),
1087 properties.clone(),
1088 on_create.clone(),
1089 on_match.clone(),
1090 )),
1091 LogicalPlan::MergeEdge {
1092 input,
1093 edge_var,
1094 src_var,
1095 dst_var,
1096 edge_type,
1097 undirected,
1098 properties,
1099 on_create,
1100 on_match,
1101 } => Box::new(MergeEdgeOp::new(
1102 child!(input),
1103 edge_var.clone(),
1104 src_var.clone(),
1105 dst_var.clone(),
1106 edge_type.clone(),
1107 *undirected,
1108 properties.clone(),
1109 on_create.clone(),
1110 on_match.clone(),
1111 )),
1112 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
1113 LogicalPlan::UnwindChain { input, var, expr } => {
1114 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
1115 }
1116 LogicalPlan::IndexSeek {
1117 var,
1118 label,
1119 properties,
1120 values,
1121 } => Box::new(IndexSeekOp::new(
1122 var.clone(),
1123 label.clone(),
1124 properties.clone(),
1125 values.clone(),
1126 )),
1127 LogicalPlan::PointIndexSeek {
1128 var,
1129 label,
1130 property,
1131 bounds,
1132 } => Box::new(PointIndexSeekOp::new(
1133 var.clone(),
1134 label.clone(),
1135 property.clone(),
1136 bounds.clone(),
1137 )),
1138 LogicalPlan::EdgeSeek {
1139 edge_var,
1140 src_var,
1141 dst_var,
1142 edge_type,
1143 property,
1144 value,
1145 direction,
1146 residual_properties,
1147 } => Box::new(EdgeSeekOp::new(
1148 edge_var.clone(),
1149 src_var.clone(),
1150 dst_var.clone(),
1151 edge_type.clone(),
1152 property.clone(),
1153 value.clone(),
1154 *direction,
1155 residual_properties.clone(),
1156 )),
1157 LogicalPlan::EdgePointIndexSeek {
1158 edge_var,
1159 src_var,
1160 dst_var,
1161 edge_type,
1162 property,
1163 direction,
1164 bounds,
1165 } => Box::new(EdgePointIndexSeekOp::new(
1166 edge_var.clone(),
1167 src_var.clone(),
1168 dst_var.clone(),
1169 edge_type.clone(),
1170 property.clone(),
1171 *direction,
1172 bounds.clone(),
1173 )),
1174 LogicalPlan::Union { branches, all } => {
1179 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
1180 Box::new(UnionOp::new(branch_ops, *all))
1181 }
1182 LogicalPlan::BindPath {
1183 input,
1184 path_var,
1185 node_vars,
1186 edge_vars,
1187 } => Box::new(BindPathOp::new(
1188 child!(input),
1189 path_var.clone(),
1190 node_vars.clone(),
1191 edge_vars.clone(),
1192 )),
1193 LogicalPlan::ShortestPath {
1194 input,
1195 src_var,
1196 dst_var,
1197 path_var,
1198 edge_types,
1199 direction,
1200 max_hops,
1201 kind,
1202 } => Box::new(ShortestPathOp::new(
1203 child!(input),
1204 src_var.clone(),
1205 dst_var.clone(),
1206 path_var.clone(),
1207 edge_types.clone(),
1208 *direction,
1209 *max_hops,
1210 *kind,
1211 )),
1212 LogicalPlan::CreatePropertyIndex { .. }
1213 | LogicalPlan::DropPropertyIndex { .. }
1214 | LogicalPlan::CreateEdgePropertyIndex { .. }
1215 | LogicalPlan::DropEdgePropertyIndex { .. }
1216 | LogicalPlan::ShowPropertyIndexes
1217 | LogicalPlan::CreatePointIndex { .. }
1218 | LogicalPlan::DropPointIndex { .. }
1219 | LogicalPlan::CreateEdgePointIndex { .. }
1220 | LogicalPlan::DropEdgePointIndex { .. }
1221 | LogicalPlan::ShowPointIndexes
1222 | LogicalPlan::CreatePropertyConstraint { .. }
1223 | LogicalPlan::DropPropertyConstraint { .. }
1224 | LogicalPlan::ShowPropertyConstraints => {
1225 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1226 }
1227 }
1228}
1229
1230struct UnwindOp {
1231 var: String,
1232 expr: Expr,
1233 items: Option<Vec<Value>>,
1234 cursor: usize,
1235}
1236
1237impl UnwindOp {
1238 fn new(var: String, expr: Expr) -> Self {
1239 Self {
1240 var,
1241 expr,
1242 items: None,
1243 cursor: 0,
1244 }
1245 }
1246}
1247
1248impl Operator for UnwindOp {
1249 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1250 if self.items.is_none() {
1251 let empty = Row::new();
1252 let ectx = EvalCtx {
1253 row: &empty,
1254 params: ctx.params,
1255 reader: ctx.store,
1256 procedures: ctx.procedures,
1257 outer_rows: ctx.outer_rows,
1258 tombstones: ctx.tombstones,
1259 };
1260 let val = eval_expr(&self.expr, &ectx)?;
1261 self.items = Some(coerce_unwind_list(val)?);
1262 }
1263 let items = self.items.as_ref().unwrap();
1264 if self.cursor < items.len() {
1265 let v = items[self.cursor].clone();
1266 self.cursor += 1;
1267 let mut row = Row::new();
1268 row.insert(self.var.clone(), v);
1269 Ok(Some(row))
1270 } else {
1271 Ok(None)
1272 }
1273 }
1274}
1275
1276struct UnwindChainOp {
1282 input: Box<dyn Operator>,
1283 var: String,
1284 expr: Expr,
1285 current_row: Option<Row>,
1286 items: Vec<Value>,
1287 cursor: usize,
1288}
1289
1290impl UnwindChainOp {
1291 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1292 Self {
1293 input,
1294 var,
1295 expr,
1296 current_row: None,
1297 items: Vec::new(),
1298 cursor: 0,
1299 }
1300 }
1301}
1302
1303impl Operator for UnwindChainOp {
1304 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1305 loop {
1306 if let Some(base) = &self.current_row {
1307 if self.cursor < self.items.len() {
1308 let v = self.items[self.cursor].clone();
1309 self.cursor += 1;
1310 let mut row = base.clone();
1311 row.insert(self.var.clone(), v);
1312 return Ok(Some(row));
1313 }
1314 self.current_row = None;
1315 self.items.clear();
1316 self.cursor = 0;
1317 }
1318 let base = match self.input.next(ctx)? {
1319 Some(r) => r,
1320 None => return Ok(None),
1321 };
1322 let ectx = EvalCtx {
1323 row: &base,
1324 params: ctx.params,
1325 reader: ctx.store,
1326 procedures: ctx.procedures,
1327 outer_rows: ctx.outer_rows,
1328 tombstones: ctx.tombstones,
1329 };
1330 let val = eval_expr(&self.expr, &ectx)?;
1331 self.items = coerce_unwind_list(val)?;
1332 self.current_row = Some(base);
1333 }
1334 }
1335}
1336
1337fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1342 match val {
1343 Value::List(items) => Ok(items),
1344 Value::Property(Property::List(props)) => {
1345 Ok(props.into_iter().map(Value::Property).collect())
1346 }
1347 Value::Null => Ok(Vec::new()),
1348 _ => Err(Error::TypeMismatch),
1349 }
1350}
1351
1352struct CreatePathOp {
1353 input: Option<Box<dyn Operator>>,
1354 nodes: Vec<CreateNodeSpec>,
1355 edges: Vec<CreateEdgeSpec>,
1356 done: bool,
1357 buffered: Option<Vec<Row>>,
1358 cursor: usize,
1359}
1360
1361impl CreatePathOp {
1362 fn new(
1363 input: Option<Box<dyn Operator>>,
1364 nodes: Vec<CreateNodeSpec>,
1365 edges: Vec<CreateEdgeSpec>,
1366 ) -> Self {
1367 Self {
1368 input,
1369 nodes,
1370 edges,
1371 done: false,
1372 buffered: None,
1373 cursor: 0,
1374 }
1375 }
1376
1377 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1378 let mut out = row.clone();
1379 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1380 for spec in &self.nodes {
1381 match spec {
1382 CreateNodeSpec::New {
1383 var,
1384 labels,
1385 properties,
1386 } => {
1387 let mut node = Node::new();
1388 for label in labels {
1389 node.labels.push(label.clone());
1390 }
1391 for (k, expr) in properties {
1399 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1400 let prop = value_to_property(value)?;
1401 if matches!(prop, Property::Null) {
1402 continue;
1403 }
1404 node.properties.insert(k.clone(), prop);
1405 }
1406 ctx.writer.put_node(&node)?;
1407 node_ids.push(node.id);
1408 if let Some(v) = var {
1409 out.insert(v.clone(), Value::Node(node));
1410 }
1411 }
1412 CreateNodeSpec::Reference(name) => {
1413 let id = match out.get(name) {
1414 Some(Value::Node(n)) => n.id,
1415 _ => return Err(Error::UnboundVariable(name.clone())),
1416 };
1417 node_ids.push(id);
1418 }
1419 }
1420 }
1421 for spec in &self.edges {
1422 let src = node_ids[spec.src_idx];
1423 let dst = node_ids[spec.dst_idx];
1424 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1425 for (k, expr) in &spec.properties {
1426 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1427 let prop = value_to_property(value)?;
1428 if matches!(prop, Property::Null) {
1429 continue;
1430 }
1431 edge.properties.insert(k.clone(), prop);
1432 }
1433 ctx.writer.put_edge(&edge)?;
1434 if let Some(v) = &spec.var {
1435 out.insert(v.clone(), Value::Edge(edge));
1436 }
1437 }
1438 Ok(out)
1439 }
1440}
1441
1442impl Operator for CreatePathOp {
1443 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1444 if self.input.is_some() {
1445 if let Some(buffered) = self.buffered.as_mut() {
1458 if self.cursor < buffered.len() {
1459 let row = buffered[self.cursor].clone();
1460 self.cursor += 1;
1461 return Ok(Some(row));
1462 }
1463 return Ok(None);
1464 }
1465 let input_rows: Vec<Row> = {
1466 let input = self.input.as_mut().unwrap();
1467 let mut acc = Vec::new();
1468 while let Some(row) = input.next(ctx)? {
1469 acc.push(row);
1470 }
1471 acc
1472 };
1473 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1474 for row in input_rows {
1475 applied.push(self.apply(ctx, &row)?);
1476 }
1477 self.buffered = Some(applied);
1478 self.cursor = 0;
1479 self.next(ctx)
1482 } else {
1483 if self.done {
1484 return Ok(None);
1485 }
1486 self.done = true;
1487 let empty = Row::new();
1488 Ok(Some(self.apply(ctx, &empty)?))
1489 }
1490 }
1491}
1492
1493struct CartesianProductOp {
1494 left: Box<dyn Operator>,
1495 right_plan: LogicalPlan,
1496 left_row: Option<Row>,
1497 right_op: Option<Box<dyn Operator>>,
1498}
1499
1500impl CartesianProductOp {
1501 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1502 Self {
1503 left,
1504 right_plan,
1505 left_row: None,
1506 right_op: None,
1507 }
1508 }
1509}
1510
1511impl Operator for CartesianProductOp {
1512 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1513 loop {
1514 if self.left_row.is_none() {
1515 match self.left.next(ctx)? {
1516 None => return Ok(None),
1517 Some(row) => {
1518 self.left_row = Some(row);
1519 self.right_op = Some(build_op(&self.right_plan));
1520 }
1521 }
1522 }
1523 let right_op = self.right_op.as_mut().expect("right_op set");
1524 let left_ref = self.left_row.as_ref().unwrap();
1529 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1530 stacked.push(left_ref);
1531 stacked.extend_from_slice(ctx.outer_rows);
1532 let inner_ctx = ExecCtx {
1533 store: ctx.store,
1534 writer: ctx.writer,
1535 params: ctx.params,
1536 procedures: ctx.procedures,
1537 outer_rows: &stacked,
1538 tombstones: ctx.tombstones,
1539 };
1540 match right_op.next(&inner_ctx)? {
1541 Some(right_row) => {
1542 let mut combined = left_ref.clone();
1543 for (k, v) in right_row {
1544 combined.insert(k, v);
1545 }
1546 return Ok(Some(combined));
1547 }
1548 None => {
1549 self.left_row = None;
1550 self.right_op = None;
1551 }
1552 }
1553 }
1554 }
1555}
1556
1557struct DeleteOp {
1558 input: Box<dyn Operator>,
1559 detach: bool,
1560 #[allow(dead_code)]
1561 vars: Vec<String>,
1562 exprs: Vec<Expr>,
1563 buffered: Option<Vec<Row>>,
1571 cursor: usize,
1572}
1573
1574impl DeleteOp {
1575 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1576 Self {
1577 input,
1578 detach,
1579 vars,
1580 exprs,
1581 buffered: None,
1582 cursor: 0,
1583 }
1584 }
1585
1586 fn apply_deletes(
1596 &self,
1597 ctx: &ExecCtx,
1598 row: &Row,
1599 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1600 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1601 ) -> Result<()> {
1602 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1603 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1604 for expr in &self.exprs {
1605 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1606 match v {
1607 Value::Node(n) => node_ids.push(n.id),
1608 Value::Edge(e) => edge_ids.push(e.id),
1609 Value::Path { nodes, edges } => {
1610 for e in edges {
1611 edge_ids.push(e.id);
1612 }
1613 for n in nodes {
1614 node_ids.push(n.id);
1615 }
1616 }
1617 Value::Null | Value::Property(Property::Null) => continue,
1618 _ => return Err(Error::TypeMismatch),
1619 }
1620 }
1621 for eid in &edge_ids {
1622 if seen_edges.insert(*eid) {
1623 ctx.writer.delete_edge(*eid)?;
1624 ctx.tombstones.edges.borrow_mut().insert(*eid);
1625 }
1626 }
1627 for nid in &node_ids {
1628 if !seen_nodes.insert(*nid) {
1629 continue;
1630 }
1631 if self.detach {
1632 for (eid, _) in ctx.store.outgoing(*nid)? {
1637 ctx.tombstones.edges.borrow_mut().insert(eid);
1638 }
1639 for (eid, _) in ctx.store.incoming(*nid)? {
1640 ctx.tombstones.edges.borrow_mut().insert(eid);
1641 }
1642 ctx.writer.detach_delete_node(*nid)?;
1643 } else {
1644 let out = ctx.store.outgoing(*nid)?;
1645 let inc = ctx.store.incoming(*nid)?;
1646 let still_attached = out
1647 .iter()
1648 .chain(inc.iter())
1649 .any(|(eid, _)| !seen_edges.contains(eid));
1650 if still_attached {
1651 return Err(Error::CannotDeleteAttachedNode);
1652 }
1653 ctx.writer.detach_delete_node(*nid)?;
1654 }
1655 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1656 }
1657 Ok(())
1658 }
1659}
1660
1661impl Operator for DeleteOp {
1662 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1663 if self.buffered.is_none() {
1671 let mut rows: Vec<Row> = Vec::new();
1672 while let Some(row) = self.input.next(ctx)? {
1673 rows.push(row);
1674 }
1675 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1676 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1677 for row in &rows {
1678 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1679 }
1680 self.buffered = Some(rows);
1681 self.cursor = 0;
1682 }
1683 let rows = self.buffered.as_ref().unwrap();
1684 if self.cursor < rows.len() {
1685 let row = rows[self.cursor].clone();
1686 self.cursor += 1;
1687 return Ok(Some(row));
1688 }
1689 Ok(None)
1690 }
1691}
1692
1693struct SetPropertyOp {
1694 input: Box<dyn Operator>,
1695 assignments: Vec<SetAssignment>,
1696 buffered: Option<Vec<Row>>,
1706 cursor: usize,
1707}
1708
1709impl SetPropertyOp {
1710 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1711 Self {
1712 input,
1713 assignments,
1714 buffered: None,
1715 cursor: 0,
1716 }
1717 }
1718
1719 fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1720 enum Action {
1722 SetKey {
1723 var: String,
1724 key: String,
1725 prop: Property,
1726 },
1727 AddLabels {
1728 var: String,
1729 labels: Vec<String>,
1730 },
1731 Replace {
1732 var: String,
1733 props: Vec<(String, Property)>,
1734 },
1735 Merge {
1736 var: String,
1737 props: Vec<(String, Property)>,
1738 },
1739 }
1740 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1741 for a in &self.assignments {
1742 match a {
1743 SetAssignment::Property { var, key, value } => {
1744 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1745 let prop = value_to_property(evaluated)?;
1746 actions.push(Action::SetKey {
1747 var: var.clone(),
1748 key: key.clone(),
1749 prop,
1750 });
1751 }
1752 SetAssignment::Labels { var, labels } => {
1753 actions.push(Action::AddLabels {
1754 var: var.clone(),
1755 labels: labels.clone(),
1756 });
1757 }
1758 SetAssignment::Replace { var, properties } => {
1759 let props = properties
1764 .iter()
1765 .map(|(k, expr)| {
1766 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1767 Ok((k.clone(), value_to_property(v)?))
1768 })
1769 .collect::<Result<Vec<(String, Property)>>>()?;
1770 actions.push(Action::Replace {
1771 var: var.clone(),
1772 props,
1773 });
1774 }
1775 SetAssignment::Merge { var, properties } => {
1776 let props = properties
1777 .iter()
1778 .map(|(k, expr)| {
1779 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1780 Ok((k.clone(), value_to_property(v)?))
1781 })
1782 .collect::<Result<Vec<(String, Property)>>>()?;
1783 actions.push(Action::Merge {
1784 var: var.clone(),
1785 props,
1786 });
1787 }
1788 SetAssignment::ReplaceFromExpr {
1789 var,
1790 source,
1791 replace,
1792 } => {
1793 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1794 let props = extract_property_map(&v)?;
1795 if *replace {
1796 actions.push(Action::Replace {
1797 var: var.clone(),
1798 props,
1799 });
1800 } else {
1801 actions.push(Action::Merge {
1802 var: var.clone(),
1803 props,
1804 });
1805 }
1806 }
1807 }
1808 }
1809
1810 let mut updated_nodes: HashSet<String> = HashSet::new();
1812 let mut updated_edges: HashSet<String> = HashSet::new();
1813 for action in actions {
1814 match action {
1815 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1816 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1820 Some(Value::Node(n)) => {
1823 if matches!(prop, Property::Null) {
1824 n.properties.remove(&key);
1825 } else {
1826 n.properties.insert(key, prop);
1827 }
1828 updated_nodes.insert(var);
1829 }
1830 Some(Value::Edge(e)) => {
1831 if matches!(prop, Property::Null) {
1832 e.properties.remove(&key);
1833 } else {
1834 e.properties.insert(key, prop);
1835 }
1836 updated_edges.insert(var);
1837 }
1838 _ => return Err(Error::UnboundVariable(var)),
1839 },
1840 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1841 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1842 Some(Value::Node(n)) => {
1843 for label in labels {
1844 if !n.labels.contains(&label) {
1845 n.labels.push(label);
1846 }
1847 }
1848 updated_nodes.insert(var);
1849 }
1850 _ => return Err(Error::UnboundVariable(var)),
1851 },
1852 Action::Replace { var, props } => match row.get_mut(&var) {
1853 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1854 Some(Value::Node(n)) => {
1855 n.properties.clear();
1856 for (k, v) in props {
1857 if !matches!(v, Property::Null) {
1858 n.properties.insert(k, v);
1859 }
1860 }
1861 updated_nodes.insert(var);
1862 }
1863 Some(Value::Edge(e)) => {
1864 e.properties.clear();
1865 for (k, v) in props {
1866 if !matches!(v, Property::Null) {
1867 e.properties.insert(k, v);
1868 }
1869 }
1870 updated_edges.insert(var);
1871 }
1872 _ => return Err(Error::UnboundVariable(var)),
1873 },
1874 Action::Merge { var, props } => match row.get_mut(&var) {
1875 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1876 Some(Value::Node(n)) => {
1877 for (k, v) in props {
1878 if matches!(v, Property::Null) {
1879 n.properties.remove(&k);
1880 } else {
1881 n.properties.insert(k, v);
1882 }
1883 }
1884 updated_nodes.insert(var);
1885 }
1886 Some(Value::Edge(e)) => {
1887 for (k, v) in props {
1888 if matches!(v, Property::Null) {
1889 e.properties.remove(&k);
1890 } else {
1891 e.properties.insert(k, v);
1892 }
1893 }
1894 updated_edges.insert(var);
1895 }
1896 _ => return Err(Error::UnboundVariable(var)),
1897 },
1898 }
1899 }
1900
1901 for var in &updated_nodes {
1903 if let Some(Value::Node(n)) = row.get(var) {
1904 ctx.writer.put_node(n)?;
1905 }
1906 }
1907 for var in &updated_edges {
1908 if let Some(Value::Edge(e)) = row.get(var) {
1909 ctx.writer.put_edge(e)?;
1910 }
1911 }
1912
1913 Ok(row)
1914 }
1915}
1916
1917impl Operator for SetPropertyOp {
1918 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1919 if self.buffered.is_none() {
1923 let mut input_rows: Vec<Row> = Vec::new();
1924 while let Some(row) = self.input.next(ctx)? {
1925 input_rows.push(row);
1926 }
1927 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1928 for row in input_rows {
1929 applied.push(self.apply_one(ctx, row)?);
1930 }
1931 self.buffered = Some(applied);
1932 self.cursor = 0;
1933 }
1934 let rows = self.buffered.as_ref().unwrap();
1935 if self.cursor < rows.len() {
1936 let row = rows[self.cursor].clone();
1937 self.cursor += 1;
1938 return Ok(Some(row));
1939 }
1940 Ok(None)
1941 }
1942}
1943
1944struct RemoveOp {
1945 input: Box<dyn Operator>,
1946 items: Vec<RemoveSpec>,
1947 buffered: Option<Vec<Row>>,
1953 cursor: usize,
1954}
1955
1956impl RemoveOp {
1957 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1958 Self {
1959 input,
1960 items,
1961 buffered: None,
1962 cursor: 0,
1963 }
1964 }
1965
1966 fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1967 let mut updated_nodes: HashSet<String> = HashSet::new();
1968 let mut updated_edges: HashSet<String> = HashSet::new();
1969 for item in &self.items {
1970 match item {
1971 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1972 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1975 Some(Value::Node(n)) => {
1976 n.properties.remove(key);
1977 updated_nodes.insert(var.clone());
1978 }
1979 Some(Value::Edge(e)) => {
1980 e.properties.remove(key);
1981 updated_edges.insert(var.clone());
1982 }
1983 _ => return Err(Error::UnboundVariable(var.clone())),
1984 },
1985 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1986 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1987 Some(Value::Node(n)) => {
1988 n.labels.retain(|l| !labels.contains(l));
1989 updated_nodes.insert(var.clone());
1990 }
1991 _ => return Err(Error::UnboundVariable(var.clone())),
1992 },
1993 }
1994 }
1995 for var in &updated_nodes {
1996 if let Some(Value::Node(n)) = row.get(var) {
1997 ctx.writer.put_node(n)?;
1998 }
1999 }
2000 for var in &updated_edges {
2001 if let Some(Value::Edge(e)) = row.get(var) {
2002 ctx.writer.put_edge(e)?;
2003 }
2004 }
2005 Ok(row)
2006 }
2007}
2008
2009impl Operator for RemoveOp {
2010 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2011 if self.buffered.is_none() {
2012 let mut input_rows: Vec<Row> = Vec::new();
2013 while let Some(row) = self.input.next(ctx)? {
2014 input_rows.push(row);
2015 }
2016 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
2017 for row in input_rows {
2018 applied.push(self.apply_one(ctx, row)?);
2019 }
2020 self.buffered = Some(applied);
2021 self.cursor = 0;
2022 }
2023 let rows = self.buffered.as_ref().unwrap();
2024 if self.cursor < rows.len() {
2025 let row = rows[self.cursor].clone();
2026 self.cursor += 1;
2027 return Ok(Some(row));
2028 }
2029 Ok(None)
2030 }
2031}
2032
2033struct LoadCsvOp {
2034 input: Option<Box<dyn Operator>>,
2035 path_expr: Expr,
2036 var: String,
2037 with_headers: bool,
2038 rows: Option<Vec<Value>>,
2039 cursor: usize,
2040}
2041
2042impl LoadCsvOp {
2043 fn new(
2044 input: Option<Box<dyn Operator>>,
2045 path_expr: Expr,
2046 var: String,
2047 with_headers: bool,
2048 ) -> Self {
2049 Self {
2050 input,
2051 path_expr,
2052 var,
2053 with_headers,
2054 rows: None,
2055 cursor: 0,
2056 }
2057 }
2058
2059 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
2060 let ectx = ctx.eval_ctx(base_row);
2061 let path_val = eval_expr(&self.path_expr, &ectx)?;
2062 let path = match path_val {
2063 Value::Property(Property::String(s)) => s,
2064 _ => return Err(Error::TypeMismatch),
2065 };
2066 let content = std::fs::read_to_string(&path).map_err(|e| {
2067 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
2068 })?;
2069 let mut lines = content.lines();
2070 let headers: Option<Vec<String>> = if self.with_headers {
2071 lines
2072 .next()
2073 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
2074 } else {
2075 None
2076 };
2077 let mut csv_rows = Vec::new();
2078 for line in lines {
2079 if line.trim().is_empty() {
2080 continue;
2081 }
2082 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
2083 if let Some(hdrs) = &headers {
2084 let mut map = std::collections::HashMap::new();
2085 for (i, h) in hdrs.iter().enumerate() {
2086 let val = fields.get(i).cloned().unwrap_or_default();
2087 map.insert(h.clone(), Property::String(val));
2088 }
2089 csv_rows.push(Value::Property(Property::Map(map)));
2090 } else {
2091 let list: Vec<Value> = fields
2092 .into_iter()
2093 .map(|f| Value::Property(Property::String(f)))
2094 .collect();
2095 csv_rows.push(Value::List(list));
2096 }
2097 }
2098 self.rows = Some(csv_rows);
2099 self.cursor = 0;
2100 Ok(())
2101 }
2102}
2103
2104impl Operator for LoadCsvOp {
2105 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2106 if self.rows.is_none() {
2107 let base = if let Some(input) = &mut self.input {
2108 match input.next(ctx)? {
2109 Some(r) => r,
2110 None => return Ok(None),
2111 }
2112 } else {
2113 Row::new()
2114 };
2115 self.load(ctx, &base)?;
2116 }
2117 let rows = self.rows.as_ref().unwrap();
2118 if self.cursor < rows.len() {
2119 let val = rows[self.cursor].clone();
2120 self.cursor += 1;
2121 let mut row = Row::new();
2122 row.insert(self.var.clone(), val);
2123 Ok(Some(row))
2124 } else {
2125 Ok(None)
2126 }
2127 }
2128}
2129
2130struct ForeachOp {
2131 input: Box<dyn Operator>,
2132 var: String,
2133 list_expr: Expr,
2134 set_assignments: Vec<SetAssignment>,
2135 remove_items: Vec<RemoveSpec>,
2136}
2137
2138impl ForeachOp {
2139 fn new(
2140 input: Box<dyn Operator>,
2141 var: String,
2142 list_expr: Expr,
2143 set_assignments: Vec<SetAssignment>,
2144 remove_items: Vec<RemoveSpec>,
2145 ) -> Self {
2146 Self {
2147 input,
2148 var,
2149 list_expr,
2150 set_assignments,
2151 remove_items,
2152 }
2153 }
2154}
2155
2156impl Operator for ForeachOp {
2157 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2158 let Some(row) = self.input.next(ctx)? else {
2159 return Ok(None);
2160 };
2161 let ectx = ctx.eval_ctx(&row);
2162 let list_val = eval_expr(&self.list_expr, &ectx)?;
2163 let items = match list_val {
2164 Value::List(items) => items,
2165 Value::Property(Property::List(props)) => {
2166 props.into_iter().map(Value::Property).collect()
2167 }
2168 Value::Null | Value::Property(Property::Null) => Vec::new(),
2169 _ => return Err(Error::TypeMismatch),
2170 };
2171 for item in items {
2172 let mut scratch = row.clone();
2173 scratch.insert(self.var.clone(), item);
2174 for a in &self.set_assignments {
2175 match a {
2176 SetAssignment::Property { var, key, value } => {
2177 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
2178 let prop = value_to_property(evaluated)?;
2179 match scratch.get_mut(var) {
2180 Some(Value::Node(n)) => {
2181 n.properties.insert(key.clone(), prop);
2182 }
2183 Some(Value::Edge(e)) => {
2184 e.properties.insert(key.clone(), prop);
2185 }
2186 _ => return Err(Error::UnboundVariable(var.clone())),
2187 }
2188 }
2189 SetAssignment::Labels { var, labels } => {
2190 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2191 for l in labels {
2192 if !n.labels.contains(l) {
2193 n.labels.push(l.clone());
2194 }
2195 }
2196 }
2197 }
2198 _ => {}
2199 }
2200 }
2201 for ri in &self.remove_items {
2202 match ri {
2203 RemoveSpec::Property { var, key } => {
2204 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2205 n.properties.remove(key);
2206 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
2207 e.properties.remove(key);
2208 }
2209 }
2210 RemoveSpec::Labels { var, labels } => {
2211 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2212 n.labels.retain(|l| !labels.contains(l));
2213 }
2214 }
2215 }
2216 }
2217 for (_, val) in scratch.iter() {
2219 match val {
2220 Value::Node(n) => ctx.writer.put_node(n)?,
2221 Value::Edge(e) => ctx.writer.put_edge(e)?,
2222 _ => {}
2223 }
2224 }
2225 }
2226 Ok(Some(row))
2227 }
2228}
2229
2230struct SeedRowOp {
2231 done: bool,
2232}
2233
2234impl Operator for SeedRowOp {
2235 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2236 if self.done {
2237 return Ok(None);
2238 }
2239 self.done = true;
2240 Ok(Some(Row::new()))
2241 }
2242}
2243
2244struct SeededRowOp {
2245 row: Option<Row>,
2246}
2247
2248impl Operator for SeededRowOp {
2249 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2250 Ok(self.row.take())
2251 }
2252}
2253
2254struct CallSubqueryOp {
2255 input: Box<dyn Operator>,
2256 body_plan: LogicalPlan,
2257 pending: Vec<Row>,
2258 pending_idx: usize,
2259}
2260
2261impl CallSubqueryOp {
2262 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
2263 Self {
2264 input,
2265 body_plan,
2266 pending: Vec::new(),
2267 pending_idx: 0,
2268 }
2269 }
2270}
2271
2272impl Operator for CallSubqueryOp {
2273 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2274 loop {
2275 if self.pending_idx < self.pending.len() {
2276 let row = self.pending[self.pending_idx].clone();
2277 self.pending_idx += 1;
2278 return Ok(Some(row));
2279 }
2280 let outer_row = match self.input.next(ctx)? {
2281 Some(r) => r,
2282 None => return Ok(None),
2283 };
2284 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2285 let mut results = Vec::new();
2286 while let Some(body_row) = body_op.next(ctx)? {
2287 let mut merged = outer_row.clone();
2288 for (k, v) in body_row {
2289 merged.insert(k, v);
2290 }
2291 results.push(merged);
2292 }
2293 if results.is_empty() {
2294 continue;
2295 }
2296 self.pending = results;
2297 self.pending_idx = 0;
2298 }
2299 }
2300}
2301
2302struct OptionalApplyOp {
2309 input: Box<dyn Operator>,
2310 body_plan: LogicalPlan,
2311 null_vars: Vec<String>,
2312 pending: Vec<Row>,
2313 pending_idx: usize,
2314}
2315
2316impl OptionalApplyOp {
2317 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2318 Self {
2319 input,
2320 body_plan,
2321 null_vars,
2322 pending: Vec::new(),
2323 pending_idx: 0,
2324 }
2325 }
2326}
2327
2328impl Operator for OptionalApplyOp {
2329 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2330 loop {
2331 if self.pending_idx < self.pending.len() {
2332 let row = self.pending[self.pending_idx].clone();
2333 self.pending_idx += 1;
2334 return Ok(Some(row));
2335 }
2336 let outer_row = match self.input.next(ctx)? {
2337 Some(r) => r,
2338 None => return Ok(None),
2339 };
2340 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2341 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2345 stacked.push(&outer_row);
2346 stacked.extend_from_slice(ctx.outer_rows);
2347 let inner_ctx = ExecCtx {
2348 store: ctx.store,
2349 writer: ctx.writer,
2350 params: ctx.params,
2351 procedures: ctx.procedures,
2352 outer_rows: &stacked,
2353 tombstones: ctx.tombstones,
2354 };
2355 let mut results = Vec::new();
2356 while let Some(body_row) = body_op.next(&inner_ctx)? {
2357 let mut merged = outer_row.clone();
2358 for (k, v) in body_row {
2359 merged.insert(k, v);
2360 }
2361 results.push(merged);
2362 }
2363 if results.is_empty() {
2364 let mut fallback = outer_row;
2365 for v in &self.null_vars {
2366 fallback.insert(v.clone(), Value::Null);
2367 }
2368 return Ok(Some(fallback));
2369 }
2370 self.pending = results;
2371 self.pending_idx = 0;
2372 }
2373 }
2374}
2375
2376struct ProcedureCallOp {
2395 input: Option<Box<dyn Operator>>,
2396 qualified_name: Vec<String>,
2397 args: Option<Vec<Expr>>,
2398 yield_spec: Option<YieldSpec>,
2399 standalone: bool,
2400 buffered: Vec<Row>,
2401 buffered_idx: usize,
2402 done: bool,
2405}
2406
2407impl ProcedureCallOp {
2408 fn new(
2409 input: Option<Box<dyn Operator>>,
2410 qualified_name: Vec<String>,
2411 args: Option<Vec<Expr>>,
2412 yield_spec: Option<YieldSpec>,
2413 standalone: bool,
2414 ) -> Self {
2415 Self {
2416 input,
2417 qualified_name,
2418 args,
2419 yield_spec,
2420 standalone,
2421 buffered: Vec::new(),
2422 buffered_idx: 0,
2423 done: false,
2424 }
2425 }
2426
2427 fn resolve_projection(
2433 &self,
2434 proc: &crate::procedures::Procedure,
2435 ) -> Result<Vec<(String, String)>> {
2436 match &self.yield_spec {
2437 None => {
2438 if !self.standalone {
2439 if proc.outputs.is_empty() {
2448 return Ok(Vec::new());
2449 }
2450 return Err(Error::Procedure(format!(
2451 "procedure '{}' has outputs but no YIELD clause",
2452 self.qualified_name.join(".")
2453 )));
2454 }
2455 Ok(proc
2456 .outputs
2457 .iter()
2458 .map(|o| (o.name.clone(), o.name.clone()))
2459 .collect())
2460 }
2461 Some(YieldSpec::Star) => {
2462 if !self.standalone {
2463 return Err(Error::Procedure(
2464 "YIELD * is only allowed on standalone CALL".into(),
2465 ));
2466 }
2467 Ok(proc
2468 .outputs
2469 .iter()
2470 .map(|o| (o.name.clone(), o.name.clone()))
2471 .collect())
2472 }
2473 Some(YieldSpec::Items(items)) => {
2474 let mut projection = Vec::with_capacity(items.len());
2475 let mut seen_aliases: std::collections::HashSet<String> =
2476 std::collections::HashSet::new();
2477 for yi in items {
2478 if !proc.outputs.iter().any(|o| o.name == yi.column) {
2479 return Err(Error::Procedure(format!(
2480 "procedure '{}' has no output column '{}'",
2481 self.qualified_name.join("."),
2482 yi.column
2483 )));
2484 }
2485 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2486 if !seen_aliases.insert(alias.clone()) {
2487 return Err(Error::Procedure(format!(
2488 "variable '{alias}' already bound by YIELD"
2489 )));
2490 }
2491 projection.push((yi.column.clone(), alias));
2492 }
2493 Ok(projection)
2494 }
2495 }
2496 }
2497
2498 fn evaluate_args(
2505 &self,
2506 ctx: &ExecCtx,
2507 row: &Row,
2508 proc: &crate::procedures::Procedure,
2509 ) -> Result<Vec<Value>> {
2510 match &self.args {
2511 Some(exprs) => {
2512 if exprs.len() != proc.inputs.len() {
2513 return Err(Error::Procedure(format!(
2514 "procedure '{}' expects {} argument(s), got {}",
2515 self.qualified_name.join("."),
2516 proc.inputs.len(),
2517 exprs.len()
2518 )));
2519 }
2520 let eval_ctx = ctx.eval_ctx(row);
2521 let mut values = Vec::with_capacity(exprs.len());
2522 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2523 let v = eval_expr(expr, &eval_ctx)?;
2524 if !spec.ty.accepts(&v) {
2525 return Err(Error::Procedure(format!(
2526 "argument '{}' has wrong type for procedure '{}'",
2527 spec.name,
2528 self.qualified_name.join(".")
2529 )));
2530 }
2531 values.push(coerce_arg(v, spec.ty));
2532 }
2533 Ok(values)
2534 }
2535 None => {
2536 if !self.standalone {
2538 return Err(Error::Procedure(
2539 "in-query CALL requires explicit argument list".into(),
2540 ));
2541 }
2542 let mut values = Vec::with_capacity(proc.inputs.len());
2543 for spec in &proc.inputs {
2544 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2545 Error::Procedure(format!(
2546 "missing parameter ${} for procedure '{}'",
2547 spec.name,
2548 self.qualified_name.join(".")
2549 ))
2550 })?;
2551 if !spec.ty.accepts(&v) {
2552 return Err(Error::Procedure(format!(
2553 "parameter '{}' has wrong type",
2554 spec.name
2555 )));
2556 }
2557 values.push(coerce_arg(v, spec.ty));
2558 }
2559 Ok(values)
2560 }
2561 }
2562 }
2563
2564 fn invoke_once(
2569 &self,
2570 ctx: &ExecCtx,
2571 input_row: &Row,
2572 proc: &crate::procedures::Procedure,
2573 projection: &[(String, String)],
2574 out: &mut Vec<Row>,
2575 ) -> Result<()> {
2576 if proc.outputs.is_empty() {
2580 if !self.standalone {
2581 out.push(input_row.clone());
2582 }
2583 return Ok(());
2584 }
2585 let args = self.evaluate_args(ctx, input_row, proc)?;
2586 let is_write = proc.is_write_builtin();
2591 let rows = if is_write {
2592 #[cfg(any(feature = "apoc-create", feature = "apoc-refactor"))]
2597 {
2598 proc.resolve_write_rows(ctx.store, ctx.writer, &args)?
2599 }
2600 #[cfg(not(any(feature = "apoc-create", feature = "apoc-refactor")))]
2601 {
2602 let _ = (ctx, &args);
2603 return Err(Error::Procedure(
2604 "write procedure dispatched in a non-write-apoc build".into(),
2605 ));
2606 }
2607 } else {
2608 proc.resolve_rows(ctx.store)?
2609 };
2610 for proc_row in &rows {
2611 if !is_write && !proc.row_matches(proc_row, &args) {
2612 continue;
2613 }
2614 let mut merged = if self.standalone {
2615 Row::new()
2616 } else {
2617 input_row.clone()
2618 };
2619 for (src, alias) in projection {
2620 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2621 merged.insert(alias.clone(), v);
2622 }
2623 out.push(merged);
2624 }
2625 Ok(())
2626 }
2627}
2628
2629fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2634 use crate::procedures::ProcType;
2635 if matches!(ty, ProcType::Float) {
2636 if let Value::Property(Property::Int64(n)) = v {
2637 return Value::Property(Property::Float64(n as f64));
2638 }
2639 }
2640 v
2641}
2642
2643impl Operator for ProcedureCallOp {
2644 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2645 loop {
2646 if self.buffered_idx < self.buffered.len() {
2647 let row = self.buffered[self.buffered_idx].clone();
2648 self.buffered_idx += 1;
2649 return Ok(Some(row));
2650 }
2651 self.buffered.clear();
2652 self.buffered_idx = 0;
2653
2654 let proc = match ctx.procedures.get(&self.qualified_name) {
2655 Some(p) => p,
2656 None => {
2657 return Err(Error::Procedure(format!(
2658 "procedure '{}' not found",
2659 self.qualified_name.join(".")
2660 )));
2661 }
2662 };
2663 let projection = self.resolve_projection(proc)?;
2664
2665 let input_row = match &mut self.input {
2666 Some(inp) => match inp.next(ctx)? {
2667 Some(r) => r,
2668 None => return Ok(None),
2669 },
2670 None => {
2671 if self.done {
2672 return Ok(None);
2673 }
2674 self.done = true;
2675 Row::new()
2676 }
2677 };
2678
2679 let mut produced = Vec::new();
2680 self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2681 if produced.is_empty() {
2682 if self.input.is_some() {
2683 continue;
2684 }
2685 return Ok(None);
2686 }
2687 self.buffered = produced;
2688 }
2689 }
2690}
2691
2692fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2698 match v {
2699 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2700 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2701 Value::Map(pairs) => pairs
2702 .iter()
2703 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2704 .collect(),
2705 Value::Property(Property::Map(entries)) => Ok(entries
2706 .iter()
2707 .map(|(k, p)| (k.clone(), p.clone()))
2708 .collect()),
2709 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2710 _ => Err(Error::InvalidSetValue),
2711 }
2712}
2713
2714fn value_to_property(v: Value) -> Result<Property> {
2715 match v {
2716 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2717 Value::Property(p) => Ok(p),
2718 Value::Null => Ok(Property::Null),
2719 Value::List(items) => {
2720 let props: Vec<Property> = items
2721 .into_iter()
2722 .map(value_to_property)
2723 .collect::<Result<_>>()?;
2724 Ok(Property::List(props))
2725 }
2726 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2730 Err(Error::InvalidSetValue)
2731 }
2732 }
2733}
2734
2735struct NodeScanAllOp {
2736 var: String,
2737 ids: Option<Vec<NodeId>>,
2738 cursor: usize,
2739}
2740
2741impl NodeScanAllOp {
2742 fn new(var: String) -> Self {
2743 Self {
2744 var,
2745 ids: None,
2746 cursor: 0,
2747 }
2748 }
2749}
2750
2751impl Operator for NodeScanAllOp {
2752 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2753 if self.ids.is_none() {
2754 self.ids = Some(ctx.store.all_node_ids()?);
2755 }
2756 let ids = self.ids.as_ref().unwrap();
2757 while self.cursor < ids.len() {
2758 let id = ids[self.cursor];
2759 self.cursor += 1;
2760 if let Some(node) = ctx.store.get_node(id)? {
2761 let mut row = Row::new();
2762 row.insert(self.var.clone(), Value::Node(node));
2763 return Ok(Some(row));
2764 }
2765 }
2766 Ok(None)
2767 }
2768}
2769
2770struct NodeScanByLabelsOp {
2771 var: String,
2772 labels: Vec<String>,
2773 ids: Option<Vec<NodeId>>,
2774 cursor: usize,
2775}
2776
2777impl NodeScanByLabelsOp {
2778 fn new(var: String, labels: Vec<String>) -> Self {
2779 Self {
2780 var,
2781 labels,
2782 ids: None,
2783 cursor: 0,
2784 }
2785 }
2786}
2787
2788impl Operator for NodeScanByLabelsOp {
2789 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2790 if self.ids.is_none() {
2791 let primary = self
2793 .labels
2794 .first()
2795 .expect("NodeScanByLabels must have at least one label");
2796 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2797 }
2798 let ids = self.ids.as_ref().unwrap();
2799 while self.cursor < ids.len() {
2800 let id = ids[self.cursor];
2801 self.cursor += 1;
2802 if let Some(node) = ctx.store.get_node(id)? {
2803 if has_all_labels(&node, &self.labels) {
2804 let mut row = Row::new();
2805 row.insert(self.var.clone(), Value::Node(node));
2806 return Ok(Some(row));
2807 }
2808 }
2809 }
2810 Ok(None)
2811 }
2812}
2813
2814fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2815 labels.iter().all(|l| node.labels.contains(l))
2816}
2817
2818struct IndexSeekOp {
2830 var: String,
2831 label: String,
2832 properties: Vec<String>,
2833 value_exprs: Vec<Expr>,
2834 results: Option<Vec<NodeId>>,
2835 cursor: usize,
2836}
2837
2838impl IndexSeekOp {
2839 fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2840 assert_eq!(
2841 properties.len(),
2842 value_exprs.len(),
2843 "IndexSeekOp: properties and values must have equal length"
2844 );
2845 Self {
2846 var,
2847 label,
2848 properties,
2849 value_exprs,
2850 results: None,
2851 cursor: 0,
2852 }
2853 }
2854}
2855
2856impl Operator for IndexSeekOp {
2857 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2858 if self.results.is_none() {
2859 let empty = Row::new();
2860 let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2861 for expr in &self.value_exprs {
2862 let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2863 let property = match value {
2864 Value::Property(p) => p,
2865 Value::Null => Property::Null,
2866 Value::Node(_)
2867 | Value::Edge(_)
2868 | Value::List(_)
2869 | Value::Map(_)
2870 | Value::Path { .. } => {
2871 return Err(Error::InvalidSetValue);
2872 }
2873 };
2874 values.push(property);
2875 }
2876 let ids = ctx
2877 .store
2878 .nodes_by_properties(&self.label, &self.properties, &values)?;
2879 self.results = Some(ids);
2880 }
2881 let ids = self.results.as_ref().unwrap();
2882 while self.cursor < ids.len() {
2883 let id = ids[self.cursor];
2884 self.cursor += 1;
2885 if let Some(node) = ctx.store.get_node(id)? {
2886 let mut row = Row::new();
2887 row.insert(self.var.clone(), Value::Node(node));
2888 return Ok(Some(row));
2889 }
2890 }
2891 Ok(None)
2892 }
2893}
2894
2895struct PointIndexSeekOp {
2914 var: String,
2915 label: String,
2916 property: String,
2917 bounds: PointSeekBounds,
2918 results: Option<Vec<NodeId>>,
2919 cursor: usize,
2920}
2921
2922impl PointIndexSeekOp {
2923 fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
2924 Self {
2925 var,
2926 label,
2927 property,
2928 bounds,
2929 results: None,
2930 cursor: 0,
2931 }
2932 }
2933}
2934
2935impl Operator for PointIndexSeekOp {
2936 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2937 if self.results.is_none() {
2938 let empty = Row::new();
2939 let ectx = ctx.eval_ctx(&empty);
2940 let ids = match &self.bounds {
2941 PointSeekBounds::Corners { lo, hi } => {
2942 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
2943 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
2944 match (lo_pt, hi_pt) {
2945 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
2946 &self.label,
2947 &self.property,
2948 lo.srid,
2949 lo.x,
2950 lo.y,
2951 hi.x,
2952 hi.y,
2953 )?,
2954 _ => Vec::new(),
2955 }
2956 }
2957 PointSeekBounds::Radius { center, radius } => {
2958 let center_pt = extract_point(&eval_expr(center, &ectx)?);
2959 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
2960 match (center_pt, radius_val) {
2961 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
2962 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
2963 ctx.store.nodes_in_bbox(
2964 &self.label,
2965 &self.property,
2966 c.srid,
2967 xlo,
2968 ylo,
2969 xhi,
2970 yhi,
2971 )?
2972 }
2973 _ => Vec::new(),
2975 }
2976 }
2977 };
2978 self.results = Some(ids);
2979 }
2980 let ids = self.results.as_ref().unwrap();
2981 while self.cursor < ids.len() {
2982 let id = ids[self.cursor];
2983 self.cursor += 1;
2984 if let Some(node) = ctx.store.get_node(id)? {
2985 let mut row = Row::new();
2986 row.insert(self.var.clone(), Value::Node(node));
2987 return Ok(Some(row));
2988 }
2989 }
2990 Ok(None)
2991 }
2992}
2993
2994fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
2995 match v {
2996 Value::Property(Property::Point(p)) => Some(*p),
2997 _ => None,
2998 }
2999}
3000
3001fn extract_f64(v: &Value) -> Option<f64> {
3002 match v {
3003 Value::Property(Property::Float64(f)) => Some(*f),
3004 Value::Property(Property::Int64(i)) => Some(*i as f64),
3005 _ => None,
3006 }
3007}
3008
3009fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
3019 if center.is_geographic() {
3020 const METRES_PER_DEG: f64 = 111_320.0;
3023 let dlat = r / METRES_PER_DEG;
3024 let cos_lat = center.y.to_radians().cos().abs();
3025 let cos_lat_floor = cos_lat.max(1.0e-6);
3029 let dlon = r / (METRES_PER_DEG * cos_lat_floor);
3030 (
3031 center.x - dlon,
3032 center.y - dlat,
3033 center.x + dlon,
3034 center.y + dlat,
3035 )
3036 } else {
3037 (center.x - r, center.y - r, center.x + r, center.y + r)
3038 }
3039}
3040
3041struct EdgeSeekOp {
3050 edge_var: String,
3051 src_var: String,
3052 dst_var: String,
3053 edge_type: String,
3054 property: String,
3055 value_expr: Expr,
3056 direction: Direction,
3057 residual_properties: Vec<(String, Expr)>,
3058 results: Option<Vec<Row>>,
3061 cursor: usize,
3062}
3063
3064impl EdgeSeekOp {
3065 #[allow(clippy::too_many_arguments)]
3066 fn new(
3067 edge_var: String,
3068 src_var: String,
3069 dst_var: String,
3070 edge_type: String,
3071 property: String,
3072 value_expr: Expr,
3073 direction: Direction,
3074 residual_properties: Vec<(String, Expr)>,
3075 ) -> Self {
3076 Self {
3077 edge_var,
3078 src_var,
3079 dst_var,
3080 edge_type,
3081 property,
3082 value_expr,
3083 direction,
3084 residual_properties,
3085 results: None,
3086 cursor: 0,
3087 }
3088 }
3089}
3090
3091impl Operator for EdgeSeekOp {
3092 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3093 if self.results.is_none() {
3094 let empty = Row::new();
3095 let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
3096 let property = match seek_value {
3097 Value::Property(p) => p,
3098 Value::Null => Property::Null,
3099 Value::Node(_)
3100 | Value::Edge(_)
3101 | Value::List(_)
3102 | Value::Map(_)
3103 | Value::Path { .. } => {
3104 return Err(Error::InvalidSetValue);
3105 }
3106 };
3107 let ids = ctx
3108 .store
3109 .edges_by_property(&self.edge_type, &self.property, &property)?;
3110 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3111 for id in ids {
3112 let Some(edge) = ctx.store.get_edge(id)? else {
3113 continue;
3114 };
3115 let mut residuals_ok = true;
3121 for (key, expr) in &self.residual_properties {
3122 let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
3123 let Some(stored) = edge.properties.get(key) else {
3124 residuals_ok = false;
3125 break;
3126 };
3127 if !values_equal(&Value::Property(stored.clone()), &wanted) {
3128 residuals_ok = false;
3129 break;
3130 }
3131 }
3132 if !residuals_ok {
3133 continue;
3134 }
3135 let Some(src_node) = ctx.store.get_node(edge.source)? else {
3140 continue;
3141 };
3142 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3143 continue;
3144 };
3145 match self.direction {
3150 Direction::Outgoing => {
3151 rows.push(self.make_row(&edge, &src_node, &dst_node));
3152 }
3153 Direction::Incoming => {
3154 rows.push(self.make_row(&edge, &dst_node, &src_node));
3155 }
3156 Direction::Both => {
3157 rows.push(self.make_row(&edge, &src_node, &dst_node));
3158 if edge.source != edge.target {
3161 rows.push(self.make_row(&edge, &dst_node, &src_node));
3162 }
3163 }
3164 }
3165 }
3166 self.results = Some(rows);
3167 }
3168 let rows = self.results.as_ref().unwrap();
3169 if self.cursor < rows.len() {
3170 let row = rows[self.cursor].clone();
3171 self.cursor += 1;
3172 return Ok(Some(row));
3173 }
3174 Ok(None)
3175 }
3176}
3177
3178impl EdgeSeekOp {
3179 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3180 let mut row = Row::new();
3181 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3182 row.insert(self.src_var.clone(), Value::Node(src.clone()));
3183 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3184 row
3185 }
3186}
3187
3188struct EdgePointIndexSeekOp {
3195 edge_var: String,
3196 src_var: String,
3197 dst_var: String,
3198 edge_type: String,
3199 property: String,
3200 direction: Direction,
3201 bounds: PointSeekBounds,
3202 results: Option<Vec<Row>>,
3203 cursor: usize,
3204}
3205
3206impl EdgePointIndexSeekOp {
3207 #[allow(clippy::too_many_arguments)]
3208 fn new(
3209 edge_var: String,
3210 src_var: String,
3211 dst_var: String,
3212 edge_type: String,
3213 property: String,
3214 direction: Direction,
3215 bounds: PointSeekBounds,
3216 ) -> Self {
3217 Self {
3218 edge_var,
3219 src_var,
3220 dst_var,
3221 edge_type,
3222 property,
3223 direction,
3224 bounds,
3225 results: None,
3226 cursor: 0,
3227 }
3228 }
3229
3230 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3231 let mut row = Row::new();
3232 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3233 row.insert(self.src_var.clone(), Value::Node(src.clone()));
3234 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3235 row
3236 }
3237}
3238
3239impl Operator for EdgePointIndexSeekOp {
3240 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3241 if self.results.is_none() {
3242 let empty = Row::new();
3243 let ectx = ctx.eval_ctx(&empty);
3244 let ids = match &self.bounds {
3245 PointSeekBounds::Corners { lo, hi } => {
3246 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3247 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3248 match (lo_pt, hi_pt) {
3249 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
3250 &self.edge_type,
3251 &self.property,
3252 lo.srid,
3253 lo.x,
3254 lo.y,
3255 hi.x,
3256 hi.y,
3257 )?,
3258 _ => Vec::new(),
3259 }
3260 }
3261 PointSeekBounds::Radius { center, radius } => {
3262 let center_pt = extract_point(&eval_expr(center, &ectx)?);
3263 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3264 match (center_pt, radius_val) {
3265 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3266 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3267 ctx.store.edges_in_bbox(
3268 &self.edge_type,
3269 &self.property,
3270 c.srid,
3271 xlo,
3272 ylo,
3273 xhi,
3274 yhi,
3275 )?
3276 }
3277 _ => Vec::new(),
3278 }
3279 }
3280 };
3281
3282 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3283 for id in ids {
3284 let Some(edge) = ctx.store.get_edge(id)? else {
3285 continue;
3286 };
3287 let Some(src_node) = ctx.store.get_node(edge.source)? else {
3288 continue;
3289 };
3290 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3291 continue;
3292 };
3293 match self.direction {
3294 Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
3295 Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
3296 Direction::Both => {
3297 rows.push(self.make_row(&edge, &src_node, &dst_node));
3298 if edge.source != edge.target {
3299 rows.push(self.make_row(&edge, &dst_node, &src_node));
3300 }
3301 }
3302 }
3303 }
3304 self.results = Some(rows);
3305 }
3306 let rows = self.results.as_ref().unwrap();
3307 if self.cursor < rows.len() {
3308 let row = rows[self.cursor].clone();
3309 self.cursor += 1;
3310 return Ok(Some(row));
3311 }
3312 Ok(None)
3313 }
3314}
3315
3316fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3317 props.iter().all(|(k, v)| {
3318 node.properties
3319 .get(k)
3320 .map(|stored| stored == v)
3321 .unwrap_or(false)
3322 })
3323}
3324
3325struct MergeNodeOp {
3326 var: String,
3327 labels: Vec<String>,
3328 properties: Vec<(String, Expr)>,
3332 on_create: Vec<SetAssignment>,
3337 on_match: Vec<SetAssignment>,
3341 input: Option<Box<dyn Operator>>,
3348 merged_nodes: Vec<Node>,
3355 merge_done: bool,
3359 cursor: usize,
3360 input_buffered: Option<Vec<Row>>,
3369 input_cursor: usize,
3370}
3371
3372impl MergeNodeOp {
3373 fn new(
3374 input: Option<Box<dyn Operator>>,
3375 var: String,
3376 labels: Vec<String>,
3377 properties: Vec<(String, Expr)>,
3378 on_create: Vec<SetAssignment>,
3379 on_match: Vec<SetAssignment>,
3380 ) -> Self {
3381 Self {
3382 var,
3383 labels,
3384 properties,
3385 on_create,
3386 on_match,
3387 input,
3388 merged_nodes: Vec::new(),
3389 merge_done: false,
3390 cursor: 0,
3391 input_buffered: None,
3392 input_cursor: 0,
3393 }
3394 }
3395
3396 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3408 let resolved_props: Vec<(String, Property)> = self
3409 .properties
3410 .iter()
3411 .map(|(k, expr)| {
3412 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3413 Ok((k.clone(), value_to_property(v)?))
3414 })
3415 .collect::<Result<Vec<_>>>()?;
3416
3417 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3418 ctx.store.nodes_by_label(primary)?
3419 } else {
3420 ctx.store.all_node_ids()?
3421 };
3422 let mut merged_nodes: Vec<Node> = Vec::new();
3423 for id in candidate_ids {
3424 if let Some(node) = ctx.store.get_node(id)? {
3425 if has_all_labels(&node, &self.labels)
3426 && matches_pattern_props(&node, &resolved_props)
3427 {
3428 merged_nodes.push(node);
3429 }
3430 }
3431 }
3432
3433 if merged_nodes.is_empty() {
3434 let mut node = Node::new();
3435 for label in &self.labels {
3436 node.labels.push(label.clone());
3437 }
3438 for (k, prop) in resolved_props {
3439 node.properties.insert(k, prop);
3440 }
3441 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3442 ctx.writer.put_node(&node)?;
3443 merged_nodes.push(node);
3444 } else if !self.on_match.is_empty() {
3445 for node in merged_nodes.iter_mut() {
3446 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3447 ctx.writer.put_node(node)?;
3448 }
3449 }
3450 Ok(merged_nodes)
3451 }
3452}
3453
3454impl Operator for MergeNodeOp {
3455 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3456 if self.input.is_none() {
3461 if !self.merge_done {
3462 let empty = Row::new();
3463 let nodes = self.run_merge_for(ctx, &empty)?;
3464 self.merged_nodes = nodes;
3465 self.merge_done = true;
3466 }
3467 if self.cursor < self.merged_nodes.len() {
3468 let node = self.merged_nodes[self.cursor].clone();
3469 self.cursor += 1;
3470 let mut row = Row::new();
3471 row.insert(self.var.clone(), Value::Node(node));
3472 return Ok(Some(row));
3473 }
3474 return Ok(None);
3475 }
3476
3477 if self.input_buffered.is_none() {
3485 let mut input_rows: Vec<Row> = Vec::new();
3486 while let Some(row) = self.input.as_mut().unwrap().next(ctx)? {
3487 input_rows.push(row);
3488 }
3489 let mut output: Vec<Row> = Vec::new();
3490 for input_row in input_rows {
3491 let nodes = self.run_merge_for(ctx, &input_row)?;
3492 for node in nodes {
3493 let mut out = input_row.clone();
3494 out.insert(self.var.clone(), Value::Node(node));
3495 output.push(out);
3496 }
3497 }
3498 self.input_buffered = Some(output);
3499 self.input_cursor = 0;
3500 }
3501 let rows = self.input_buffered.as_ref().unwrap();
3502 if self.input_cursor < rows.len() {
3503 let row = rows[self.input_cursor].clone();
3504 self.input_cursor += 1;
3505 return Ok(Some(row));
3506 }
3507 Ok(None)
3508 }
3509}
3510
3511struct MergeEdgeOp {
3530 input: Box<dyn Operator>,
3531 edge_var: String,
3532 src_var: String,
3533 dst_var: String,
3534 edge_type: String,
3535 undirected: bool,
3536 properties: Vec<(String, Expr)>,
3540 on_create: Vec<SetAssignment>,
3541 on_match: Vec<SetAssignment>,
3542 pending: std::collections::VecDeque<Row>,
3549 drained: bool,
3553}
3554
3555impl MergeEdgeOp {
3556 #[allow(clippy::too_many_arguments)]
3557 fn new(
3558 input: Box<dyn Operator>,
3559 edge_var: String,
3560 src_var: String,
3561 dst_var: String,
3562 edge_type: String,
3563 undirected: bool,
3564 properties: Vec<(String, Expr)>,
3565 on_create: Vec<SetAssignment>,
3566 on_match: Vec<SetAssignment>,
3567 ) -> Self {
3568 Self {
3569 input,
3570 edge_var,
3571 src_var,
3572 dst_var,
3573 edge_type,
3574 undirected,
3575 properties,
3576 on_create,
3577 on_match,
3578 pending: std::collections::VecDeque::new(),
3579 drained: false,
3580 }
3581 }
3582}
3583
3584impl MergeEdgeOp {
3585 fn merge_for(&self, ctx: &ExecCtx, row: Row, out: &mut Vec<Row>) -> Result<()> {
3590 let src_node = match row.get(&self.src_var) {
3595 Some(Value::Node(n)) => n.clone(),
3596 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3597 };
3598 let dst_node = match row.get(&self.dst_var) {
3599 Some(Value::Node(n)) => n.clone(),
3600 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3601 };
3602
3603 let required_props: Vec<(String, Property)> = self
3607 .properties
3608 .iter()
3609 .map(|(k, expr)| {
3610 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3611 Ok((k.clone(), value_to_property(v)?))
3612 })
3613 .collect::<Result<Vec<_>>>()?;
3614 let edge_matches = |edge: &Edge| -> bool {
3615 required_props.iter().all(|(k, want)| {
3616 edge.properties
3617 .get(k)
3618 .map(|have| have == want)
3619 .unwrap_or(false)
3620 })
3621 };
3622
3623 let mut matched: Vec<Edge> = Vec::new();
3627 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3628 if neighbor_id != dst_node.id {
3629 continue;
3630 }
3631 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3632 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3633 matched.push(edge);
3634 }
3635 }
3636 }
3637 if self.undirected {
3638 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3639 if neighbor_id != dst_node.id {
3640 continue;
3641 }
3642 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3643 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3644 matched.push(edge);
3645 }
3646 }
3647 }
3648 }
3649
3650 if matched.is_empty() {
3651 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3652 for (k, p) in &required_props {
3653 new_edge.properties.insert(k.clone(), p.clone());
3654 }
3655 let mut row_out = row.clone();
3656 apply_merge_edge_actions(
3657 &mut new_edge,
3658 &self.on_create,
3659 &self.edge_var,
3660 ctx,
3661 &mut row_out,
3662 )?;
3663 ctx.writer.put_edge(&new_edge)?;
3664 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3665 out.push(row_out);
3666 } else {
3667 for mut existing in matched {
3668 let mut row_out = row.clone();
3669 if !self.on_match.is_empty() {
3670 apply_merge_edge_actions(
3671 &mut existing,
3672 &self.on_match,
3673 &self.edge_var,
3674 ctx,
3675 &mut row_out,
3676 )?;
3677 ctx.writer.put_edge(&existing)?;
3678 }
3679 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3680 out.push(row_out);
3681 }
3682 }
3683 Ok(())
3684 }
3685}
3686
3687impl Operator for MergeEdgeOp {
3688 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3689 if self.pending.is_empty() && !self.drained {
3694 let mut input_rows: Vec<Row> = Vec::new();
3695 while let Some(row) = self.input.next(ctx)? {
3696 input_rows.push(row);
3697 }
3698 let mut out: Vec<Row> = Vec::new();
3699 for row in input_rows {
3700 self.merge_for(ctx, row, &mut out)?;
3701 }
3702 self.pending.extend(out);
3703 self.drained = true;
3704 }
3705 Ok(self.pending.pop_front())
3706 }
3707}
3708
3709fn apply_merge_edge_actions(
3719 edge: &mut Edge,
3720 actions: &[SetAssignment],
3721 var: &str,
3722 exec_ctx: &ExecCtx,
3723 outer: &mut Row,
3724) -> Result<()> {
3725 if actions.is_empty() {
3726 return Ok(());
3727 }
3728 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3731 for action in actions {
3732 match action {
3733 SetAssignment::Property {
3734 var: target,
3735 key,
3736 value,
3737 } => {
3738 let sub_ctx = exec_ctx.eval_ctx(outer);
3739 let evaluated = eval_expr(value, &sub_ctx)?;
3740 let prop = value_to_property(evaluated)?;
3741 if target == var {
3742 if matches!(prop, Property::Null) {
3743 edge.properties.remove(key);
3744 } else {
3745 edge.properties.insert(key.clone(), prop);
3746 }
3747 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3748 } else {
3749 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3750 }
3751 }
3752 SetAssignment::Merge {
3753 var: target,
3754 properties,
3755 } => {
3756 let sub_ctx = exec_ctx.eval_ctx(outer);
3757 let resolved: Vec<(String, Property)> = properties
3758 .iter()
3759 .map(|(k, expr)| {
3760 let v = eval_expr(expr, &sub_ctx)?;
3761 Ok((k.clone(), value_to_property(v)?))
3762 })
3763 .collect::<Result<Vec<_>>>()?;
3764 if target == var {
3765 for (k, p) in resolved {
3766 edge.properties.insert(k, p);
3767 }
3768 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3769 } else {
3770 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3771 }
3772 }
3773 SetAssignment::Replace {
3774 var: target,
3775 properties,
3776 } => {
3777 let sub_ctx = exec_ctx.eval_ctx(outer);
3778 let resolved: Vec<(String, Property)> = properties
3779 .iter()
3780 .map(|(k, expr)| {
3781 let v = eval_expr(expr, &sub_ctx)?;
3782 Ok((k.clone(), value_to_property(v)?))
3783 })
3784 .collect::<Result<Vec<_>>>()?;
3785 if target == var {
3786 edge.properties.clear();
3787 for (k, p) in resolved {
3788 edge.properties.insert(k, p);
3789 }
3790 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3791 } else {
3792 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3793 }
3794 }
3795 SetAssignment::Labels {
3796 var: target,
3797 labels,
3798 } => {
3799 if target == var {
3800 return Err(Error::UnboundVariable(target.clone()));
3802 }
3803 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3804 }
3805 SetAssignment::ReplaceFromExpr {
3806 var: target,
3807 source,
3808 replace,
3809 } => {
3810 let sub_ctx = exec_ctx.eval_ctx(outer);
3811 let v = eval_expr(source, &sub_ctx)?;
3812 let props = extract_property_map(&v)?;
3813 if target == var {
3814 if *replace {
3815 edge.properties.clear();
3816 }
3817 for (k, p) in props {
3818 edge.properties.insert(k, p);
3819 }
3820 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3821 } else {
3822 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3823 }
3824 }
3825 }
3826 }
3827 Ok(())
3828}
3829
3830fn apply_set_prop_to_outer(
3835 outer: &mut Row,
3836 exec_ctx: &ExecCtx,
3837 target: &str,
3838 key: &str,
3839 prop: Property,
3840) -> Result<()> {
3841 match outer.get_mut(target) {
3842 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3843 return Ok(());
3846 }
3847 Some(Value::Node(n)) => {
3848 if matches!(prop, Property::Null) {
3849 n.properties.remove(key);
3850 } else {
3851 n.properties.insert(key.to_string(), prop);
3852 }
3853 exec_ctx.writer.put_node(n)?;
3854 }
3855 Some(Value::Edge(e)) => {
3856 if matches!(prop, Property::Null) {
3857 e.properties.remove(key);
3858 } else {
3859 e.properties.insert(key.to_string(), prop);
3860 }
3861 exec_ctx.writer.put_edge(e)?;
3862 }
3863 _ => return Err(Error::UnboundVariable(target.to_string())),
3864 }
3865 Ok(())
3866}
3867
3868fn apply_set_map_to_outer(
3872 outer: &mut Row,
3873 exec_ctx: &ExecCtx,
3874 target: &str,
3875 props: Vec<(String, Property)>,
3876 replace: bool,
3877) -> Result<()> {
3878 match outer.get_mut(target) {
3879 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3880 Some(Value::Node(n)) => {
3881 if replace {
3882 n.properties.clear();
3883 }
3884 for (k, p) in props {
3885 if replace || !matches!(p, Property::Null) {
3886 n.properties.insert(k, p);
3887 } else {
3888 n.properties.remove(&k);
3889 }
3890 }
3891 exec_ctx.writer.put_node(n)?;
3892 Ok(())
3893 }
3894 Some(Value::Edge(e)) => {
3895 if replace {
3896 e.properties.clear();
3897 }
3898 for (k, p) in props {
3899 if replace || !matches!(p, Property::Null) {
3900 e.properties.insert(k, p);
3901 } else {
3902 e.properties.remove(&k);
3903 }
3904 }
3905 exec_ctx.writer.put_edge(e)?;
3906 Ok(())
3907 }
3908 _ => Err(Error::UnboundVariable(target.to_string())),
3909 }
3910}
3911
3912fn apply_set_labels_to_outer(
3914 outer: &mut Row,
3915 exec_ctx: &ExecCtx,
3916 target: &str,
3917 labels: &[String],
3918) -> Result<()> {
3919 match outer.get_mut(target) {
3920 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3921 Some(Value::Node(n)) => {
3922 for label in labels {
3923 if !n.labels.contains(label) {
3924 n.labels.push(label.clone());
3925 }
3926 }
3927 exec_ctx.writer.put_node(n)?;
3928 Ok(())
3929 }
3930 _ => Err(Error::UnboundVariable(target.to_string())),
3931 }
3932}
3933
3934fn apply_merge_actions(
3943 node: &mut Node,
3944 actions: &[SetAssignment],
3945 var: &str,
3946 exec_ctx: &ExecCtx,
3947 base_row: &Row,
3948) -> Result<()> {
3949 if actions.is_empty() {
3950 return Ok(());
3951 }
3952 let mut row = base_row.clone();
3955 row.insert(var.to_string(), Value::Node(node.clone()));
3956 for action in actions {
3957 let sub_ctx = exec_ctx.eval_ctx(&row);
3958 match action {
3959 SetAssignment::Property {
3960 var: target,
3961 key,
3962 value,
3963 } => {
3964 if target != var {
3965 return Err(Error::UnboundVariable(target.clone()));
3966 }
3967 let evaluated = eval_expr(value, &sub_ctx)?;
3968 let prop = value_to_property(evaluated)?;
3969 node.properties.insert(key.clone(), prop);
3970 row.insert(var.to_string(), Value::Node(node.clone()));
3971 }
3972 SetAssignment::Labels {
3973 var: target,
3974 labels,
3975 } => {
3976 if target != var {
3977 return Err(Error::UnboundVariable(target.clone()));
3978 }
3979 for label in labels {
3980 if !node.labels.contains(label) {
3981 node.labels.push(label.clone());
3982 }
3983 }
3984 row.insert(var.to_string(), Value::Node(node.clone()));
3985 }
3986 SetAssignment::Replace {
3987 var: target,
3988 properties,
3989 } => {
3990 if target != var {
3991 return Err(Error::UnboundVariable(target.clone()));
3992 }
3993 let resolved: Vec<(String, Property)> = properties
3994 .iter()
3995 .map(|(k, expr)| {
3996 let v = eval_expr(expr, &sub_ctx)?;
3997 Ok((k.clone(), value_to_property(v)?))
3998 })
3999 .collect::<Result<Vec<_>>>()?;
4000 node.properties.clear();
4001 for (k, p) in resolved {
4002 node.properties.insert(k, p);
4003 }
4004 row.insert(var.to_string(), Value::Node(node.clone()));
4005 }
4006 SetAssignment::Merge {
4007 var: target,
4008 properties,
4009 } => {
4010 if target != var {
4011 return Err(Error::UnboundVariable(target.clone()));
4012 }
4013 let resolved: Vec<(String, Property)> = properties
4014 .iter()
4015 .map(|(k, expr)| {
4016 let v = eval_expr(expr, &sub_ctx)?;
4017 Ok((k.clone(), value_to_property(v)?))
4018 })
4019 .collect::<Result<Vec<_>>>()?;
4020 for (k, p) in resolved {
4021 node.properties.insert(k, p);
4022 }
4023 row.insert(var.to_string(), Value::Node(node.clone()));
4024 }
4025 SetAssignment::ReplaceFromExpr {
4026 var: target,
4027 source,
4028 replace,
4029 } => {
4030 if target != var {
4031 return Err(Error::UnboundVariable(target.clone()));
4032 }
4033 let v = eval_expr(source, &sub_ctx)?;
4034 let props = extract_property_map(&v)?;
4035 if *replace {
4036 node.properties.clear();
4037 }
4038 for (k, p) in props {
4039 node.properties.insert(k, p);
4040 }
4041 row.insert(var.to_string(), Value::Node(node.clone()));
4042 }
4043 }
4044 }
4045 Ok(())
4046}
4047
4048struct EdgeExpandOp {
4049 input: Box<dyn Operator>,
4050 src_var: String,
4051 edge_var: Option<String>,
4052 dst_var: String,
4053 dst_labels: Vec<String>,
4054 edge_properties: Vec<(String, Expr)>,
4055 edge_types: Vec<String>,
4056 direction: Direction,
4057 edge_constraint_var: Option<String>,
4063 current_row: Option<Row>,
4064 pending: Vec<(EdgeId, NodeId)>,
4065 pending_idx: usize,
4066}
4067
4068impl EdgeExpandOp {
4069 #[allow(clippy::too_many_arguments)]
4070 fn new(
4071 input: Box<dyn Operator>,
4072 src_var: String,
4073 edge_var: Option<String>,
4074 dst_var: String,
4075 dst_labels: Vec<String>,
4076 edge_properties: Vec<(String, Expr)>,
4077 edge_types: Vec<String>,
4078 direction: Direction,
4079 edge_constraint_var: Option<String>,
4080 ) -> Self {
4081 Self {
4082 input,
4083 src_var,
4084 edge_var,
4085 dst_var,
4086 dst_labels,
4087 edge_properties,
4088 edge_types,
4089 direction,
4090 edge_constraint_var,
4091 current_row: None,
4092 pending: Vec::new(),
4093 pending_idx: 0,
4094 }
4095 }
4096}
4097
4098impl Operator for EdgeExpandOp {
4099 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4100 loop {
4101 while self.pending_idx < self.pending.len() {
4102 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4103 self.pending_idx += 1;
4104
4105 let edge = match ctx.store.get_edge(edge_id)? {
4106 Some(e) => e,
4107 None => continue,
4108 };
4109 if !self.edge_types.is_empty()
4110 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4111 {
4112 continue;
4113 }
4114 if let Some(constraint_var) = &self.edge_constraint_var {
4122 let base = self
4123 .current_row
4124 .as_ref()
4125 .expect("pending edges without source row");
4126 let expected = match ctx.lookup_binding(base, constraint_var) {
4127 Some(Value::Edge(e)) => Some(e.id),
4128 _ => None,
4129 };
4130 match expected {
4131 Some(id) if id != edge.id => continue,
4132 None => continue,
4133 _ => {}
4134 }
4135 }
4136 if !self.edge_properties.is_empty() {
4141 let base = self
4142 .current_row
4143 .as_ref()
4144 .expect("pending edges without source row");
4145 let ectx = ctx.eval_ctx(base);
4146 let mut ok = true;
4147 for (key, expr) in &self.edge_properties {
4148 let expected = eval_expr(expr, &ectx)?;
4149 let actual = match edge.properties.get(key) {
4150 Some(v) => Value::Property(v.clone()),
4151 None => {
4152 ok = false;
4153 break;
4154 }
4155 };
4156 if !values_equal(&actual, &expected) {
4157 ok = false;
4158 break;
4159 }
4160 }
4161 if !ok {
4162 continue;
4163 }
4164 }
4165
4166 let neighbor = match ctx.store.get_node(neighbor_id)? {
4167 Some(n) => n,
4168 None => continue,
4169 };
4170 if !has_all_labels(&neighbor, &self.dst_labels) {
4171 continue;
4172 }
4173
4174 let base = self
4175 .current_row
4176 .as_ref()
4177 .expect("pending edges without source row");
4178 let mut out = base.clone();
4179 if let Some(ev) = &self.edge_var {
4180 out.insert(ev.clone(), Value::Edge(edge));
4181 }
4182 out.insert(self.dst_var.clone(), Value::Node(neighbor));
4183 return Ok(Some(out));
4184 }
4185
4186 match self.input.next(ctx)? {
4187 None => return Ok(None),
4188 Some(row) => {
4189 let src_id = match row.get(&self.src_var) {
4190 Some(Value::Node(n)) => n.id,
4191 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4196 continue
4197 }
4198 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4199 };
4200 self.pending = match self.direction {
4201 Direction::Outgoing => ctx.store.outgoing(src_id)?,
4202 Direction::Incoming => ctx.store.incoming(src_id)?,
4203 Direction::Both => {
4204 let mut all = ctx.store.outgoing(src_id)?;
4210 let mut seen: std::collections::HashSet<EdgeId> =
4211 all.iter().map(|(e, _)| *e).collect();
4212 for (e, n) in ctx.store.incoming(src_id)? {
4213 if seen.insert(e) {
4214 all.push((e, n));
4215 }
4216 }
4217 all
4218 }
4219 };
4220 self.pending_idx = 0;
4221 self.current_row = Some(row);
4222 }
4223 }
4224 }
4225 }
4226}
4227
4228struct OptionalEdgeExpandOp {
4243 input: Box<dyn Operator>,
4244 src_var: String,
4245 edge_var: Option<String>,
4246 dst_var: String,
4247 dst_labels: Vec<String>,
4248 dst_properties: Vec<(String, Expr)>,
4249 edge_types: Vec<String>,
4250 direction: Direction,
4251 dst_constraint_var: Option<String>,
4257 edge_constraint_var: Option<String>,
4262 current_row: Option<Row>,
4263 pending: Vec<(EdgeId, NodeId)>,
4264 pending_idx: usize,
4265 yielded_for_current: bool,
4266}
4267
4268impl OptionalEdgeExpandOp {
4269 #[allow(clippy::too_many_arguments)]
4270 fn new(
4271 input: Box<dyn Operator>,
4272 src_var: String,
4273 edge_var: Option<String>,
4274 dst_var: String,
4275 dst_labels: Vec<String>,
4276 dst_properties: Vec<(String, Expr)>,
4277 edge_types: Vec<String>,
4278 direction: Direction,
4279 dst_constraint_var: Option<String>,
4280 edge_constraint_var: Option<String>,
4281 ) -> Self {
4282 Self {
4283 input,
4284 src_var,
4285 edge_var,
4286 dst_var,
4287 dst_labels,
4288 dst_properties,
4289 edge_types,
4290 direction,
4291 dst_constraint_var,
4292 edge_constraint_var,
4293 current_row: None,
4294 pending: Vec::new(),
4295 pending_idx: 0,
4296 yielded_for_current: false,
4297 }
4298 }
4299}
4300
4301impl Operator for OptionalEdgeExpandOp {
4302 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4303 loop {
4304 while self.pending_idx < self.pending.len() {
4305 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4306 self.pending_idx += 1;
4307
4308 let edge = match ctx.store.get_edge(edge_id)? {
4309 Some(e) => e,
4310 None => continue,
4311 };
4312 if !self.edge_types.is_empty()
4313 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4314 {
4315 continue;
4316 }
4317 if let Some(constraint_var) = &self.edge_constraint_var {
4323 let base = self
4324 .current_row
4325 .as_ref()
4326 .expect("pending without source row");
4327 let expected = match ctx.lookup_binding(base, constraint_var) {
4328 Some(Value::Edge(e)) => Some(e.id),
4329 _ => None,
4330 };
4331 match expected {
4332 Some(id) if id != edge.id => continue,
4333 None => continue,
4334 _ => {}
4335 }
4336 }
4337
4338 let neighbor = match ctx.store.get_node(neighbor_id)? {
4339 Some(n) => n,
4340 None => continue,
4341 };
4342 if !has_all_labels(&neighbor, &self.dst_labels) {
4343 continue;
4344 }
4345 if let Some(constraint_var) = &self.dst_constraint_var {
4352 let base = self
4353 .current_row
4354 .as_ref()
4355 .expect("pending without source row");
4356 let bound_id = match base.get(constraint_var) {
4357 Some(Value::Node(n)) => Some(n.id),
4358 Some(Value::Null)
4359 | Some(Value::Property(meshdb_core::Property::Null))
4360 | None => None,
4361 _ => None,
4362 };
4363 match bound_id {
4364 Some(id) if id != neighbor.id => continue,
4365 None => continue,
4366 _ => {}
4367 }
4368 }
4369 if !self.dst_properties.is_empty() {
4370 let base = self
4371 .current_row
4372 .as_ref()
4373 .expect("pending without source row");
4374 let ectx = ctx.eval_ctx(base);
4375 let mut props_ok = true;
4376 for (key, expr) in &self.dst_properties {
4377 let expected = eval_expr(expr, &ectx)?;
4378 let actual = neighbor
4379 .properties
4380 .get(key)
4381 .cloned()
4382 .map(Value::Property)
4383 .unwrap_or(Value::Null);
4384 if !values_equal(&expected, &actual) {
4385 props_ok = false;
4386 break;
4387 }
4388 }
4389 if !props_ok {
4390 continue;
4391 }
4392 }
4393
4394 let base = self
4395 .current_row
4396 .as_ref()
4397 .expect("pending edges without source row");
4398 let mut out = base.clone();
4399 if let Some(ev) = &self.edge_var {
4400 out.insert(ev.clone(), Value::Edge(edge));
4401 }
4402 out.insert(self.dst_var.clone(), Value::Node(neighbor));
4403 self.yielded_for_current = true;
4404 return Ok(Some(out));
4405 }
4406
4407 if let Some(base) = self.current_row.take() {
4417 if !self.yielded_for_current {
4418 let mut out = base;
4419 if let Some(ev) = &self.edge_var {
4420 let preserve = self
4421 .edge_constraint_var
4422 .as_ref()
4423 .map(|c| c == ev)
4424 .unwrap_or(false);
4425 if !preserve {
4426 out.insert(ev.clone(), Value::Null);
4427 }
4428 }
4429 let preserve_dst = self
4430 .dst_constraint_var
4431 .as_ref()
4432 .map(|c| c == &self.dst_var)
4433 .unwrap_or(false);
4434 if !preserve_dst {
4435 out.insert(self.dst_var.clone(), Value::Null);
4436 }
4437 self.yielded_for_current = true;
4438 return Ok(Some(out));
4439 }
4440 }
4441
4442 match self.input.next(ctx)? {
4443 None => return Ok(None),
4444 Some(row) => {
4445 let src_id = match row.get(&self.src_var) {
4446 Some(Value::Node(n)) => n.id,
4447 Some(Value::Null) => {
4454 self.pending = Vec::new();
4455 self.pending_idx = 0;
4456 self.yielded_for_current = false;
4457 self.current_row = Some(row);
4458 continue;
4459 }
4460 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4461 };
4462 self.pending = match self.direction {
4463 Direction::Outgoing => ctx.store.outgoing(src_id)?,
4464 Direction::Incoming => ctx.store.incoming(src_id)?,
4465 Direction::Both => {
4466 let mut all = ctx.store.outgoing(src_id)?;
4472 let mut seen: std::collections::HashSet<EdgeId> =
4473 all.iter().map(|(e, _)| *e).collect();
4474 for (e, n) in ctx.store.incoming(src_id)? {
4475 if seen.insert(e) {
4476 all.push((e, n));
4477 }
4478 }
4479 all
4480 }
4481 };
4482 self.pending_idx = 0;
4483 self.yielded_for_current = false;
4484 self.current_row = Some(row);
4485 }
4486 }
4487 }
4488 }
4489}
4490
4491struct VarLengthExpandOp {
4492 input: Box<dyn Operator>,
4493 src_var: String,
4494 edge_var: Option<String>,
4495 dst_var: String,
4496 dst_labels: Vec<String>,
4497 edge_types: Vec<String>,
4498 edge_properties: Vec<(String, Expr)>,
4504 direction: Direction,
4505 min_hops: u64,
4506 max_hops: u64,
4507 path_var: Option<String>,
4508 optional: bool,
4514 dst_constraint_var: Option<String>,
4521 bound_edge_list_var: Option<String>,
4525 excluded_edge_vars: Vec<String>,
4533 current_row: Option<Row>,
4534 pending_paths: Vec<Vec<Edge>>,
4535 pending_node_paths: Vec<Vec<NodeId>>,
4536 pending_targets: Vec<NodeId>,
4537 pending_idx: usize,
4538}
4539
4540impl VarLengthExpandOp {
4541 #[allow(clippy::too_many_arguments)]
4542 fn new(
4543 input: Box<dyn Operator>,
4544 src_var: String,
4545 edge_var: Option<String>,
4546 dst_var: String,
4547 dst_labels: Vec<String>,
4548 edge_types: Vec<String>,
4549 edge_properties: Vec<(String, Expr)>,
4550 direction: Direction,
4551 min_hops: u64,
4552 max_hops: u64,
4553 path_var: Option<String>,
4554 optional: bool,
4555 dst_constraint_var: Option<String>,
4556 bound_edge_list_var: Option<String>,
4557 excluded_edge_vars: Vec<String>,
4558 ) -> Self {
4559 Self {
4560 input,
4561 src_var,
4562 edge_var,
4563 dst_var,
4564 dst_labels,
4565 edge_types,
4566 edge_properties,
4567 direction,
4568 min_hops,
4569 max_hops,
4570 path_var,
4571 optional,
4572 dst_constraint_var,
4573 bound_edge_list_var,
4574 excluded_edge_vars,
4575 current_row: None,
4576 pending_paths: Vec::new(),
4577 pending_node_paths: Vec::new(),
4578 pending_targets: Vec::new(),
4579 pending_idx: 0,
4580 }
4581 }
4582
4583 fn enumerate(
4584 &self,
4585 ctx: &ExecCtx,
4586 start: NodeId,
4587 input_row: &Row,
4588 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4589 let mut paths: Vec<Vec<Edge>> = Vec::new();
4590 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4591 let mut targets: Vec<NodeId> = Vec::new();
4592 let mut edge_buf: Vec<Edge> = Vec::new();
4593 let mut node_buf: Vec<NodeId> = vec![start];
4594 let mut used: HashSet<EdgeId> = HashSet::new();
4601 for var in &self.excluded_edge_vars {
4602 match ctx.lookup_binding(input_row, var) {
4603 Some(Value::Edge(e)) => {
4604 used.insert(e.id);
4605 }
4606 Some(Value::List(items)) => {
4607 for item in items {
4608 if let Value::Edge(e) = item {
4609 used.insert(e.id);
4610 }
4611 }
4612 }
4613 _ => {}
4614 }
4615 }
4616 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4620 Vec::new()
4621 } else {
4622 let ectx = ctx.eval_ctx(input_row);
4623 self.edge_properties
4624 .iter()
4625 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4626 .collect::<Result<Vec<_>>>()?
4627 };
4628 self.dfs(
4629 ctx,
4630 start,
4631 &expected_edge_props,
4632 &mut edge_buf,
4633 &mut node_buf,
4634 &mut used,
4635 &mut paths,
4636 &mut node_paths,
4637 &mut targets,
4638 )?;
4639 Ok((paths, node_paths, targets))
4640 }
4641
4642 #[allow(clippy::too_many_arguments)]
4643 fn dfs(
4644 &self,
4645 ctx: &ExecCtx,
4646 current_node: NodeId,
4647 expected_edge_props: &[(String, Value)],
4648 edge_buf: &mut Vec<Edge>,
4649 node_buf: &mut Vec<NodeId>,
4650 used: &mut HashSet<EdgeId>,
4651 out_paths: &mut Vec<Vec<Edge>>,
4652 out_node_paths: &mut Vec<Vec<NodeId>>,
4653 out_targets: &mut Vec<NodeId>,
4654 ) -> Result<()> {
4655 let depth = edge_buf.len() as u64;
4656
4657 if depth >= self.min_hops && depth <= self.max_hops {
4658 let terminal_ok = match ctx.store.get_node(current_node)? {
4659 Some(node) => has_all_labels(&node, &self.dst_labels),
4660 None => false,
4661 };
4662 if terminal_ok {
4663 out_paths.push(edge_buf.clone());
4664 out_node_paths.push(node_buf.clone());
4665 out_targets.push(current_node);
4666 }
4667 }
4668
4669 if depth >= self.max_hops {
4670 return Ok(());
4671 }
4672
4673 let neighbors = match self.direction {
4674 Direction::Outgoing => ctx.store.outgoing(current_node)?,
4675 Direction::Incoming => ctx.store.incoming(current_node)?,
4676 Direction::Both => {
4677 let mut all = ctx.store.outgoing(current_node)?;
4678 all.extend(ctx.store.incoming(current_node)?);
4679 all
4680 }
4681 };
4682
4683 for (eid, neighbor_id) in neighbors {
4684 if used.contains(&eid) {
4685 continue;
4686 }
4687 let edge = match ctx.store.get_edge(eid)? {
4688 Some(e) => e,
4689 None => continue,
4690 };
4691 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4692 {
4693 continue;
4694 }
4695 if !expected_edge_props.is_empty() {
4700 let mut ok = true;
4701 for (key, expected) in expected_edge_props {
4702 let actual = match edge.properties.get(key) {
4703 Some(v) => Value::Property(v.clone()),
4704 None => {
4705 ok = false;
4706 break;
4707 }
4708 };
4709 if !values_equal(&actual, expected) {
4710 ok = false;
4711 break;
4712 }
4713 }
4714 if !ok {
4715 continue;
4716 }
4717 }
4718 used.insert(eid);
4719 edge_buf.push(edge);
4720 node_buf.push(neighbor_id);
4721 self.dfs(
4722 ctx,
4723 neighbor_id,
4724 expected_edge_props,
4725 edge_buf,
4726 node_buf,
4727 used,
4728 out_paths,
4729 out_node_paths,
4730 out_targets,
4731 )?;
4732 edge_buf.pop();
4733 node_buf.pop();
4734 used.remove(&eid);
4735 }
4736
4737 Ok(())
4738 }
4739}
4740
4741fn replay_edge_list(
4759 ctx: &ExecCtx,
4760 row: &Row,
4761 list_var: &str,
4762 src_id: Option<NodeId>,
4763 direction: Direction,
4764 edge_types: &[String],
4765) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4766 let start = match src_id {
4767 Some(id) => id,
4768 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4769 };
4770 let list = match ctx.lookup_binding(row, list_var) {
4771 Some(Value::List(items)) => items.clone(),
4772 Some(Value::Property(meshdb_core::Property::List(items))) => items
4773 .iter()
4774 .cloned()
4775 .map(Value::Property)
4776 .collect::<Vec<_>>(),
4777 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4778 };
4779 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4780 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4781 node_buf.push(start);
4782 let mut current = start;
4783 for item in list {
4784 let edge = match item {
4785 Value::Edge(e) => e,
4786 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4787 };
4788 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4789 return Ok((Vec::new(), Vec::new(), Vec::new()));
4790 }
4791 let next_node = match direction {
4792 Direction::Outgoing => {
4793 if edge.source != current {
4794 return Ok((Vec::new(), Vec::new(), Vec::new()));
4795 }
4796 edge.target
4797 }
4798 Direction::Incoming => {
4799 if edge.target != current {
4800 return Ok((Vec::new(), Vec::new(), Vec::new()));
4801 }
4802 edge.source
4803 }
4804 Direction::Both => {
4805 if edge.source == current {
4806 edge.target
4807 } else if edge.target == current {
4808 edge.source
4809 } else {
4810 return Ok((Vec::new(), Vec::new(), Vec::new()));
4811 }
4812 }
4813 };
4814 if ctx.store.get_node(next_node)?.is_none() {
4818 return Ok((Vec::new(), Vec::new(), Vec::new()));
4819 }
4820 edge_buf.push(edge);
4821 node_buf.push(next_node);
4822 current = next_node;
4823 }
4824 Ok((vec![edge_buf], vec![node_buf], vec![current]))
4825}
4826
4827impl Operator for VarLengthExpandOp {
4828 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4829 loop {
4830 while self.pending_idx < self.pending_paths.len() {
4831 let i = self.pending_idx;
4832 self.pending_idx += 1;
4833
4834 let target_id = self.pending_targets[i];
4835 let target = match ctx.store.get_node(target_id)? {
4836 Some(n) => n,
4837 None => continue,
4838 };
4839
4840 let base = self
4841 .current_row
4842 .as_ref()
4843 .expect("pending without source row");
4844 let mut out = base.clone();
4845 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4846 if let Some(ev) = &self.edge_var {
4847 let edges: Vec<Value> = self.pending_paths[i]
4848 .iter()
4849 .cloned()
4850 .map(Value::Edge)
4851 .collect();
4852 out.insert(ev.clone(), Value::List(edges));
4853 }
4854 if let Some(pv) = &self.path_var {
4855 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4856 for nid in &self.pending_node_paths[i] {
4857 match ctx.store.get_node(*nid)? {
4858 Some(n) => nodes.push(n),
4859 None => continue,
4860 }
4861 }
4862 let edges = self.pending_paths[i].clone();
4863 out.insert(pv.clone(), Value::Path { nodes, edges });
4864 }
4865 return Ok(Some(out));
4866 }
4867
4868 match self.input.next(ctx)? {
4869 None => return Ok(None),
4870 Some(row) => {
4871 let src_id = match row.get(&self.src_var) {
4872 Some(Value::Node(n)) => Some(n.id),
4873 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4879 None
4880 }
4881 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4882 };
4883 let (mut paths, mut node_paths, mut targets) =
4891 if let Some(list_var) = &self.bound_edge_list_var {
4892 replay_edge_list(
4893 ctx,
4894 &row,
4895 list_var,
4896 src_id,
4897 self.direction,
4898 &self.edge_types,
4899 )?
4900 } else {
4901 match src_id {
4902 Some(id) => self.enumerate(ctx, id, &row)?,
4903 None => (Vec::new(), Vec::new(), Vec::new()),
4904 }
4905 };
4906 if let Some(constraint_var) = &self.dst_constraint_var {
4913 let target_id = match row.get(constraint_var) {
4914 Some(Value::Node(n)) => Some(n.id),
4915 _ => None,
4916 };
4917 match target_id {
4918 Some(id) => {
4919 let mut kept_paths = Vec::new();
4920 let mut kept_node_paths = Vec::new();
4921 let mut kept_targets = Vec::new();
4922 for ((p, np), t) in paths
4923 .drain(..)
4924 .zip(node_paths.drain(..))
4925 .zip(targets.drain(..))
4926 {
4927 if t == id {
4928 kept_paths.push(p);
4929 kept_node_paths.push(np);
4930 kept_targets.push(t);
4931 }
4932 }
4933 paths = kept_paths;
4934 node_paths = kept_node_paths;
4935 targets = kept_targets;
4936 }
4937 None => {
4938 paths.clear();
4939 node_paths.clear();
4940 targets.clear();
4941 }
4942 }
4943 }
4944 if paths.is_empty() && self.optional {
4945 let mut out = row;
4950 if let Some(ev) = &self.edge_var {
4951 out.insert(ev.clone(), Value::Null);
4952 }
4953 out.insert(self.dst_var.clone(), Value::Null);
4954 if let Some(pv) = &self.path_var {
4955 out.insert(pv.clone(), Value::Null);
4956 }
4957 return Ok(Some(out));
4958 }
4959 self.pending_paths = paths;
4960 self.pending_node_paths = node_paths;
4961 self.pending_targets = targets;
4962 self.pending_idx = 0;
4963 self.current_row = Some(row);
4964 }
4965 }
4966 }
4967 }
4968}
4969
4970struct FilterOp {
4971 input: Box<dyn Operator>,
4972 predicate: Expr,
4973}
4974
4975impl FilterOp {
4976 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
4977 Self { input, predicate }
4978 }
4979}
4980
4981impl Operator for FilterOp {
4982 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4983 while let Some(row) = self.input.next(ctx)? {
4984 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
4985 Ok(v) => v,
4986 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
4989 Err(e) => return Err(e),
4990 };
4991 if to_bool(&v).unwrap_or(false) {
4992 return Ok(Some(row));
4993 }
4994 }
4995 Ok(None)
4996 }
4997}
4998
4999struct IdentityOp {
5002 input: Box<dyn Operator>,
5003}
5004
5005impl IdentityOp {
5006 fn new(input: Box<dyn Operator>) -> Self {
5007 Self { input }
5008 }
5009}
5010
5011impl Operator for IdentityOp {
5012 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5013 self.input.next(ctx)
5014 }
5015}
5016
5017struct CoalesceNullRowOp {
5023 input: Box<dyn Operator>,
5024 null_vars: Vec<String>,
5025 produced_any: bool,
5026 done: bool,
5027}
5028
5029impl CoalesceNullRowOp {
5030 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
5031 Self {
5032 input,
5033 null_vars,
5034 produced_any: false,
5035 done: false,
5036 }
5037 }
5038}
5039
5040impl Operator for CoalesceNullRowOp {
5041 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5042 if self.done {
5043 return Ok(None);
5044 }
5045 match self.input.next(ctx)? {
5046 Some(row) => {
5047 self.produced_any = true;
5048 Ok(Some(row))
5049 }
5050 None => {
5051 self.done = true;
5052 if self.produced_any {
5053 Ok(None)
5054 } else {
5055 let mut row = Row::new();
5056 for v in &self.null_vars {
5057 row.insert(v.clone(), Value::Null);
5058 }
5059 Ok(Some(row))
5060 }
5061 }
5062 }
5063 }
5064}
5065
5066struct ProjectOp {
5067 input: Box<dyn Operator>,
5068 items: Vec<ReturnItem>,
5069}
5070
5071impl ProjectOp {
5072 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
5073 Self { input, items }
5074 }
5075}
5076
5077impl Operator for ProjectOp {
5078 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5079 match self.input.next(ctx)? {
5080 Some(row) => {
5081 let mut out = Row::new();
5082 for (i, item) in self.items.iter().enumerate() {
5083 let name = item.alias.clone().unwrap_or_else(|| {
5084 item.raw_text
5085 .clone()
5086 .unwrap_or_else(|| default_name(&item.expr, i))
5087 });
5088 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
5089 out.insert(name, value);
5090 }
5091 Ok(Some(out))
5092 }
5093 None => Ok(None),
5094 }
5095 }
5096}
5097
5098fn default_name(expr: &Expr, idx: usize) -> String {
5099 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
5100}
5101
5102fn render_expr_name(expr: &Expr) -> Option<String> {
5103 Some(match expr {
5104 Expr::Identifier(s) => s.clone(),
5105 Expr::Property { var, key } => format!("{var}.{key}"),
5106 Expr::PropertyAccess { base, key } => {
5107 if matches!(
5111 base.as_ref(),
5112 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
5113 ) {
5114 format!("({}).{key}", render_expr_name(base)?)
5115 } else {
5116 format!("{}.{key}", render_expr_name(base)?)
5117 }
5118 }
5119 Expr::Parameter(name) => format!("${name}"),
5120 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
5121 Expr::Literal(Literal::Integer(i)) => i.to_string(),
5122 Expr::Literal(Literal::Float(f)) => f.to_string(),
5123 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
5124 Expr::Literal(Literal::Null) => "NULL".into(),
5125 Expr::Call { name, args } => {
5126 let arg_str = match args {
5127 CallArgs::Star => "*".into(),
5128 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
5129 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
5130 "DISTINCT "
5131 } else {
5132 ""
5133 };
5134 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
5135 if inner.len() != es.len() {
5136 return None;
5137 }
5138 format!("{prefix}{}", inner.join(", "))
5139 }
5140 };
5141 format!("{name}({arg_str})")
5142 }
5143 Expr::BinaryOp { op, left, right } => {
5144 let op_str = match op {
5145 BinaryOp::Add => " + ",
5146 BinaryOp::Sub => " - ",
5147 BinaryOp::Mul => " * ",
5148 BinaryOp::Div => " / ",
5149 BinaryOp::Mod => " % ",
5150 BinaryOp::Pow => " ^ ",
5151 };
5152 format!(
5153 "{}{op_str}{}",
5154 render_expr_name(left)?,
5155 render_expr_name(right)?
5156 )
5157 }
5158 Expr::UnaryOp { op, operand } => {
5159 let op_str = match op {
5160 UnaryOp::Neg => "-",
5161 };
5162 format!("{op_str}{}", render_expr_name(operand)?)
5163 }
5164 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
5165 Expr::IsNull { negated, inner } => {
5166 if *negated {
5167 format!("{} IS NOT NULL", render_expr_name(inner)?)
5168 } else {
5169 format!("{} IS NULL", render_expr_name(inner)?)
5170 }
5171 }
5172 Expr::Compare { op, left, right } => {
5173 let op_str = match op {
5174 CompareOp::Eq => " = ",
5175 CompareOp::Ne => " <> ",
5176 CompareOp::Lt => " < ",
5177 CompareOp::Le => " <= ",
5178 CompareOp::Gt => " > ",
5179 CompareOp::Ge => " >= ",
5180 CompareOp::StartsWith => " STARTS WITH ",
5181 CompareOp::EndsWith => " ENDS WITH ",
5182 CompareOp::Contains => " CONTAINS ",
5183 CompareOp::RegexMatch => " =~ ",
5184 };
5185 format!(
5186 "{}{op_str}{}",
5187 render_expr_name(left)?,
5188 render_expr_name(right)?
5189 )
5190 }
5191 Expr::List(items) => {
5192 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
5193 if inner.len() != items.len() {
5194 return None;
5195 }
5196 format!("[{}]", inner.join(", "))
5197 }
5198 Expr::Map(entries) => {
5199 let inner: Vec<String> = entries
5200 .iter()
5201 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
5202 .collect::<Option<Vec<_>>>()?;
5203 format!("{{{}}}", inner.join(", "))
5204 }
5205 Expr::IndexAccess { base, index } => {
5206 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
5207 }
5208 Expr::InList { element, list } => {
5209 format!(
5210 "{} IN {}",
5211 render_expr_name(element)?,
5212 render_expr_name(list)?
5213 )
5214 }
5215 Expr::HasLabels { expr, labels } => {
5216 let mut s = format!("({}", render_expr_name(expr)?);
5217 for l in labels {
5218 s.push(':');
5219 s.push_str(l);
5220 }
5221 s.push(')');
5222 s
5223 }
5224 _ => return None,
5225 })
5226}
5227
5228struct DistinctOp {
5229 input: Box<dyn Operator>,
5230 seen: HashSet<String>,
5231}
5232
5233impl DistinctOp {
5234 fn new(input: Box<dyn Operator>) -> Self {
5235 Self {
5236 input,
5237 seen: HashSet::new(),
5238 }
5239 }
5240}
5241
5242impl Operator for DistinctOp {
5243 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5244 while let Some(row) = self.input.next(ctx)? {
5245 let key = row_key(&row);
5246 if self.seen.insert(key) {
5247 return Ok(Some(row));
5248 }
5249 }
5250 Ok(None)
5251 }
5252}
5253
5254struct BindPathOp {
5270 input: Box<dyn Operator>,
5271 path_var: String,
5272 node_vars: Vec<String>,
5273 edge_vars: Vec<String>,
5274}
5275
5276impl BindPathOp {
5277 fn new(
5278 input: Box<dyn Operator>,
5279 path_var: String,
5280 node_vars: Vec<String>,
5281 edge_vars: Vec<String>,
5282 ) -> Self {
5283 Self {
5284 input,
5285 path_var,
5286 node_vars,
5287 edge_vars,
5288 }
5289 }
5290}
5291
5292impl Operator for BindPathOp {
5293 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5294 let Some(mut row) = self.input.next(ctx)? else {
5295 return Ok(None);
5296 };
5297 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
5301 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
5302 let mut abort = false;
5303 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
5310 nodes.push(n.clone());
5311 } else {
5312 abort = true;
5313 }
5314 if !abort {
5315 for (i, ev) in self.edge_vars.iter().enumerate() {
5316 match row.get(ev) {
5317 Some(Value::Edge(e)) => {
5318 edges.push(e.clone());
5319 match row.get(&self.node_vars[i + 1]) {
5320 Some(Value::Node(n)) => nodes.push(n.clone()),
5321 _ => {
5322 abort = true;
5323 break;
5324 }
5325 }
5326 }
5327 Some(Value::Path {
5328 nodes: sub_nodes,
5329 edges: sub_edges,
5330 }) => {
5331 edges.extend(sub_edges.iter().cloned());
5337 if sub_nodes.len() > 1 {
5338 nodes.extend(sub_nodes[1..].iter().cloned());
5339 }
5340 }
5341 _ => {
5342 abort = true;
5343 break;
5344 }
5345 }
5346 }
5347 }
5348 if abort {
5349 row.insert(self.path_var.clone(), Value::Null);
5350 } else {
5351 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5352 }
5353 Ok(Some(row))
5354 }
5355}
5356
5357struct ShortestPathOp {
5376 input: Box<dyn Operator>,
5377 src_var: String,
5378 dst_var: String,
5379 path_var: String,
5380 edge_types: Vec<String>,
5381 direction: meshdb_cypher::Direction,
5382 max_hops: u64,
5383 kind: meshdb_cypher::ShortestKind,
5384 pending: std::collections::VecDeque<(Row, Value)>,
5391}
5392
5393impl ShortestPathOp {
5394 #[allow(clippy::too_many_arguments)]
5395 fn new(
5396 input: Box<dyn Operator>,
5397 src_var: String,
5398 dst_var: String,
5399 path_var: String,
5400 edge_types: Vec<String>,
5401 direction: meshdb_cypher::Direction,
5402 max_hops: u64,
5403 kind: meshdb_cypher::ShortestKind,
5404 ) -> Self {
5405 Self {
5406 input,
5407 src_var,
5408 dst_var,
5409 path_var,
5410 edge_types,
5411 direction,
5412 max_hops,
5413 kind,
5414 pending: std::collections::VecDeque::new(),
5415 }
5416 }
5417}
5418
5419impl Operator for ShortestPathOp {
5420 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5421 loop {
5422 if let Some((mut row, path)) = self.pending.pop_front() {
5427 row.insert(self.path_var.clone(), path);
5428 return Ok(Some(row));
5429 }
5430 let Some(row) = self.input.next(ctx)? else {
5431 return Ok(None);
5432 };
5433 let src = match row.get(&self.src_var) {
5434 Some(Value::Node(n)) => n.clone(),
5435 _ => continue,
5436 };
5437 let dst = match row.get(&self.dst_var) {
5438 Some(Value::Node(n)) => n.clone(),
5439 _ => continue,
5440 };
5441 let paths = bfs_shortest_paths(
5442 &src,
5443 &dst,
5444 &self.edge_types,
5445 self.direction,
5446 self.max_hops,
5447 self.kind,
5448 ctx.store,
5449 )?;
5450 if paths.is_empty() {
5451 continue;
5453 }
5454 for path in paths {
5455 self.pending.push_back((row.clone(), path));
5456 }
5457 }
5458 }
5459}
5460
5461fn bfs_shortest_paths(
5480 src: &Node,
5481 dst: &Node,
5482 edge_types: &[String],
5483 direction: meshdb_cypher::Direction,
5484 max_hops: u64,
5485 kind: meshdb_cypher::ShortestKind,
5486 reader: &dyn crate::reader::GraphReader,
5487) -> Result<Vec<Value>> {
5488 use meshdb_cypher::Direction;
5489
5490 if src.id == dst.id {
5491 return Ok(vec![Value::Path {
5492 nodes: vec![src.clone()],
5493 edges: vec![],
5494 }]);
5495 }
5496
5497 let mut dist: HashMap<NodeId, u64> = HashMap::new();
5503 dist.insert(src.id, 0);
5504 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5505
5506 let mut frontier: Vec<NodeId> = vec![src.id];
5507 let mut depth: u64 = 0;
5508 let mut found = false;
5509
5510 while !frontier.is_empty() && depth < max_hops && !found {
5511 let mut next_frontier: Vec<NodeId> = Vec::new();
5512 for node_id in &frontier {
5513 let neighbors = match direction {
5514 Direction::Outgoing => reader.outgoing(*node_id)?,
5515 Direction::Incoming => reader.incoming(*node_id)?,
5516 Direction::Both => {
5517 let mut out = reader.outgoing(*node_id)?;
5518 out.extend(reader.incoming(*node_id)?);
5519 out
5520 }
5521 };
5522 for (edge_id, neighbor_id) in neighbors {
5523 if !edge_types.is_empty() {
5526 let edge = match reader.get_edge(edge_id)? {
5527 Some(e) => e,
5528 None => continue,
5529 };
5530 if !edge_types.iter().any(|t| t == &edge.edge_type) {
5531 continue;
5532 }
5533 }
5534 match dist.get(&neighbor_id) {
5535 Some(&d) if d == depth + 1 => {
5536 parents
5542 .entry(neighbor_id)
5543 .or_default()
5544 .push((*node_id, edge_id));
5545 }
5546 Some(_) => {
5547 }
5551 None => {
5552 dist.insert(neighbor_id, depth + 1);
5553 parents
5554 .entry(neighbor_id)
5555 .or_default()
5556 .push((*node_id, edge_id));
5557 if neighbor_id == dst.id {
5558 found = true;
5559 } else {
5560 next_frontier.push(neighbor_id);
5561 }
5562 }
5563 }
5564 }
5565 }
5566 depth += 1;
5567 if !found {
5568 frontier = next_frontier;
5569 }
5570 }
5571
5572 if !found {
5573 return Ok(Vec::new());
5574 }
5575
5576 let mut out: Vec<Value> = Vec::new();
5580 let mut nodes_rev: Vec<Node> = Vec::new();
5581 let mut edges_rev: Vec<Edge> = Vec::new();
5582 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5583 collect_shortest_paths(
5584 src,
5585 dst,
5586 &parents,
5587 reader,
5588 &mut nodes_rev,
5589 &mut edges_rev,
5590 &mut out,
5591 only_first,
5592 )?;
5593 Ok(out)
5594}
5595
5596#[allow(clippy::too_many_arguments)]
5608fn collect_shortest_paths(
5609 src: &Node,
5610 current: &Node,
5611 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5612 reader: &dyn crate::reader::GraphReader,
5613 nodes_rev: &mut Vec<Node>,
5614 edges_rev: &mut Vec<Edge>,
5615 out: &mut Vec<Value>,
5616 only_first: bool,
5617) -> Result<()> {
5618 if current.id == src.id {
5619 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5624 nodes.push(src.clone());
5625 nodes.extend(nodes_rev.iter().rev().cloned());
5626 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5627 out.push(Value::Path { nodes, edges });
5628 return Ok(());
5629 }
5630 let Some(parent_edges) = parents.get(¤t.id) else {
5631 return Ok(());
5635 };
5636 for (parent_id, edge_id) in parent_edges {
5637 if only_first && !out.is_empty() {
5638 return Ok(());
5639 }
5640 let edge = reader
5641 .get_edge(*edge_id)?
5642 .expect("BFS inserted this edge id; it must still exist");
5643 let parent_node = reader
5644 .get_node(*parent_id)?
5645 .expect("BFS visited this node id; it must still exist");
5646 nodes_rev.push(current.clone());
5647 edges_rev.push(edge);
5648 collect_shortest_paths(
5649 src,
5650 &parent_node,
5651 parents,
5652 reader,
5653 nodes_rev,
5654 edges_rev,
5655 out,
5656 only_first,
5657 )?;
5658 nodes_rev.pop();
5659 edges_rev.pop();
5660 }
5661 Ok(())
5662}
5663
5664struct UnionOp {
5673 branches: Vec<Box<dyn Operator>>,
5674 current: usize,
5675 seen: Option<HashSet<String>>,
5676}
5677
5678impl UnionOp {
5679 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5680 Self {
5681 branches,
5682 current: 0,
5683 seen: if all { None } else { Some(HashSet::new()) },
5684 }
5685 }
5686}
5687
5688impl Operator for UnionOp {
5689 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5690 while self.current < self.branches.len() {
5691 match self.branches[self.current].next(ctx)? {
5692 Some(row) => {
5693 if let Some(seen) = self.seen.as_mut() {
5694 let key = row_key(&row);
5695 if !seen.insert(key) {
5696 continue;
5697 }
5698 }
5699 return Ok(Some(row));
5700 }
5701 None => {
5702 self.current += 1;
5703 }
5704 }
5705 }
5706 Ok(None)
5707 }
5708}
5709
5710struct OrderByOp {
5711 input: Box<dyn Operator>,
5712 sort_items: Vec<SortItem>,
5713 sorted: Option<Vec<Row>>,
5714 cursor: usize,
5715}
5716
5717impl OrderByOp {
5718 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5719 Self {
5720 input,
5721 sort_items,
5722 sorted: None,
5723 cursor: 0,
5724 }
5725 }
5726}
5727
5728impl Operator for OrderByOp {
5729 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5730 if self.sorted.is_none() {
5731 let mut rows: Vec<Row> = Vec::new();
5732 while let Some(row) = self.input.next(ctx)? {
5733 rows.push(row);
5734 }
5735 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5736 for row in rows {
5737 let mut keys = Vec::with_capacity(self.sort_items.len());
5738 for item in &self.sort_items {
5739 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5740 }
5741 keyed.push((keys, row));
5742 }
5743 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5744 keyed.sort_by(|a, b| {
5745 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5746 let ord = compare_values(va, vb);
5747 let ord = if descs[i] { ord.reverse() } else { ord };
5748 if ord != Ordering::Equal {
5749 return ord;
5750 }
5751 }
5752 Ordering::Equal
5753 });
5754 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5755 }
5756 let rows = self.sorted.as_ref().unwrap();
5757 if self.cursor < rows.len() {
5758 let row = rows[self.cursor].clone();
5759 self.cursor += 1;
5760 Ok(Some(row))
5761 } else {
5762 Ok(None)
5763 }
5764 }
5765}
5766
5767struct AggregateOp {
5768 input: Box<dyn Operator>,
5769 group_keys: Vec<ReturnItem>,
5770 aggregates: Vec<AggregateSpec>,
5771 results: Option<Vec<Row>>,
5772 cursor: usize,
5773}
5774
5775impl AggregateOp {
5776 fn new(
5777 input: Box<dyn Operator>,
5778 group_keys: Vec<ReturnItem>,
5779 aggregates: Vec<AggregateSpec>,
5780 ) -> Self {
5781 Self {
5782 input,
5783 group_keys,
5784 aggregates,
5785 results: None,
5786 cursor: 0,
5787 }
5788 }
5789
5790 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5791 let mut groups: HashMap<String, GroupState> = HashMap::new();
5792 let mut order: Vec<String> = Vec::new();
5793
5794 let mut saw_any = false;
5797
5798 while let Some(row) = self.input.next(ctx)? {
5799 saw_any = true;
5800 let mut key_values = Vec::with_capacity(self.group_keys.len());
5801 for item in &self.group_keys {
5802 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5803 }
5804 let mut hash_key = String::new();
5805 for v in &key_values {
5806 hash_key.push_str(&value_key(v));
5807 hash_key.push('|');
5808 }
5809 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5810 order.push(hash_key.clone());
5811 GroupState {
5812 key_values: key_values.clone(),
5813 agg_states: self
5814 .aggregates
5815 .iter()
5816 .map(|a| AggState::initial(a.function))
5817 .collect(),
5818 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5819 }
5820 });
5821 for (i, spec) in self.aggregates.iter().enumerate() {
5822 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5823 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5824 if matches!(v, Value::Null) {
5825 continue;
5826 }
5827 let key = value_key(&v);
5828 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5829 if !seen.insert(key) {
5830 continue;
5831 }
5832 }
5833 if let Some(extra_expr) = &spec.extra_arg {
5839 let need_resolve_percentile = matches!(
5840 &entry.agg_states[i],
5841 AggState::PercentileDisc {
5842 percentile: None,
5843 ..
5844 } | AggState::PercentileCont {
5845 percentile: None,
5846 ..
5847 }
5848 );
5849 let need_resolve_nth =
5850 matches!(&entry.agg_states[i], AggState::ApocNth { target: None, .. });
5851 if need_resolve_percentile {
5852 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5853 let p = match pv {
5854 Value::Property(Property::Float64(f)) => f,
5855 Value::Property(Property::Int64(i)) => i as f64,
5856 _ => 0.0,
5857 };
5858 if !(0.0..=1.0).contains(&p) || p.is_nan() {
5862 return Err(Error::Procedure(format!("percentile out of range: {p}")));
5863 }
5864 match &mut entry.agg_states[i] {
5865 AggState::PercentileDisc { percentile, .. }
5866 | AggState::PercentileCont { percentile, .. } => {
5867 *percentile = Some(p);
5868 }
5869 _ => {}
5870 }
5871 }
5872 if need_resolve_nth {
5873 let nv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5874 let n = match nv {
5875 Value::Property(Property::Int64(i)) => i,
5876 _ => {
5877 return Err(Error::Procedure(
5878 "apoc.agg.nth expects an integer index".into(),
5879 ))
5880 }
5881 };
5882 if n < 0 {
5883 return Err(Error::Procedure(format!(
5884 "apoc.agg.nth index out of range: {n}"
5885 )));
5886 }
5887 if let AggState::ApocNth { target, .. } = &mut entry.agg_states[i] {
5888 *target = Some(n);
5889 }
5890 }
5891 }
5892 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5893 }
5894 }
5895
5896 let mut out = Vec::new();
5897 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
5898 let mut row = Row::new();
5900 for spec in &self.aggregates {
5901 row.insert(
5902 spec.alias.clone(),
5903 AggState::initial(spec.function).finalize(),
5904 );
5905 }
5906 out.push(row);
5907 } else {
5908 for key in order {
5909 let state = groups.remove(&key).unwrap();
5910 let mut row = Row::new();
5911 for (i, item) in self.group_keys.iter().enumerate() {
5912 let name = item
5913 .alias
5914 .clone()
5915 .unwrap_or_else(|| default_name(&item.expr, i));
5916 row.insert(name, state.key_values[i].clone());
5917 }
5918 for (i, spec) in self.aggregates.iter().enumerate() {
5919 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
5920 }
5921 out.push(row);
5922 }
5923 }
5924 self.results = Some(out);
5925 Ok(())
5926 }
5927}
5928
5929impl Operator for AggregateOp {
5930 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5931 if self.results.is_none() {
5932 self.compute(ctx)?;
5933 }
5934 let rows = self.results.as_ref().unwrap();
5935 if self.cursor < rows.len() {
5936 let row = rows[self.cursor].clone();
5937 self.cursor += 1;
5938 Ok(Some(row))
5939 } else {
5940 Ok(None)
5941 }
5942 }
5943}
5944
5945struct GroupState {
5946 key_values: Vec<Value>,
5947 agg_states: Vec<AggState>,
5948 distinct_seen: Vec<Option<HashSet<String>>>,
5949}
5950
5951enum AggState {
5952 Count(i64),
5953 Sum {
5954 int_part: i64,
5955 float_part: f64,
5956 is_float: bool,
5957 },
5958 Avg {
5959 total: f64,
5960 count: i64,
5961 },
5962 Min(Option<Value>),
5963 Max(Option<Value>),
5964 Collect(Vec<Value>),
5965 StDev {
5966 sum: f64,
5967 sum_sq: f64,
5968 count: i64,
5969 },
5970 StDevP {
5971 sum: f64,
5972 sum_sq: f64,
5973 count: i64,
5974 },
5975 PercentileDisc {
5976 items: Vec<Value>,
5977 percentile: Option<f64>,
5978 },
5979 PercentileCont {
5980 items: Vec<Value>,
5981 percentile: Option<f64>,
5982 },
5983 ApocFirst(Option<Value>),
5985 ApocLast(Option<Value>),
5987 ApocNth {
5992 target: Option<i64>,
5993 count: i64,
5994 slot: Option<Value>,
5995 },
5996 ApocMedian(Vec<f64>),
5999 ApocProduct {
6002 int_part: i64,
6003 float_part: f64,
6004 is_float: bool,
6005 seen: bool,
6006 },
6007}
6008
6009impl AggState {
6010 fn initial(func: AggregateFn) -> Self {
6011 match func {
6012 AggregateFn::Count => AggState::Count(0),
6013 AggregateFn::Sum => AggState::Sum {
6014 int_part: 0,
6015 float_part: 0.0,
6016 is_float: false,
6017 },
6018 AggregateFn::Avg => AggState::Avg {
6019 total: 0.0,
6020 count: 0,
6021 },
6022 AggregateFn::Min => AggState::Min(None),
6023 AggregateFn::Max => AggState::Max(None),
6024 AggregateFn::Collect => AggState::Collect(Vec::new()),
6025 AggregateFn::StDev => AggState::StDev {
6026 sum: 0.0,
6027 sum_sq: 0.0,
6028 count: 0,
6029 },
6030 AggregateFn::StDevP => AggState::StDevP {
6031 sum: 0.0,
6032 sum_sq: 0.0,
6033 count: 0,
6034 },
6035 AggregateFn::PercentileDisc => AggState::PercentileDisc {
6036 items: Vec::new(),
6037 percentile: None,
6038 },
6039 AggregateFn::PercentileCont => AggState::PercentileCont {
6040 items: Vec::new(),
6041 percentile: None,
6042 },
6043 AggregateFn::ApocFirst => AggState::ApocFirst(None),
6044 AggregateFn::ApocLast => AggState::ApocLast(None),
6045 AggregateFn::ApocNth => AggState::ApocNth {
6046 target: None,
6047 count: 0,
6048 slot: None,
6049 },
6050 AggregateFn::ApocMedian => AggState::ApocMedian(Vec::new()),
6051 AggregateFn::ApocProduct => AggState::ApocProduct {
6052 int_part: 1,
6053 float_part: 1.0,
6054 is_float: false,
6055 seen: false,
6056 },
6057 }
6058 }
6059
6060 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
6061 match self {
6062 AggState::Count(c) => match arg {
6063 AggregateArg::Star => *c += 1,
6064 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
6065 if !matches!(eval_expr(e, ctx)?, Value::Null) {
6066 *c += 1;
6067 }
6068 }
6069 },
6070 AggState::Sum {
6071 int_part,
6072 float_part,
6073 is_float,
6074 } => {
6075 let v = expr_arg_value(arg, ctx)?;
6076 match v {
6077 Value::Null => {}
6078 Value::Property(Property::Int64(i)) => *int_part += i,
6079 Value::Property(Property::Float64(f)) => {
6080 *float_part += f;
6081 *is_float = true;
6082 }
6083 _ => return Err(Error::AggregateTypeError),
6084 }
6085 }
6086 AggState::Avg { total, count } => {
6087 let v = expr_arg_value(arg, ctx)?;
6088 match v {
6089 Value::Null => {}
6090 Value::Property(Property::Int64(i)) => {
6091 *total += i as f64;
6092 *count += 1;
6093 }
6094 Value::Property(Property::Float64(f)) => {
6095 *total += f;
6096 *count += 1;
6097 }
6098 _ => return Err(Error::AggregateTypeError),
6099 }
6100 }
6101 AggState::Min(slot) => {
6102 let v = expr_arg_value(arg, ctx)?;
6109 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6110 } else {
6112 match slot {
6113 None => *slot = Some(v),
6114 Some(cur) => {
6115 if compare_values(&v, cur) == Ordering::Less {
6116 *cur = v;
6117 }
6118 }
6119 }
6120 }
6121 }
6122 AggState::Max(slot) => {
6123 let v = expr_arg_value(arg, ctx)?;
6124 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6125 } else {
6127 match slot {
6128 None => *slot = Some(v),
6129 Some(cur) => {
6130 if compare_values(&v, cur) == Ordering::Greater {
6131 *cur = v;
6132 }
6133 }
6134 }
6135 }
6136 }
6137 AggState::Collect(items) => {
6138 let v = expr_arg_value(arg, ctx)?;
6139 if !matches!(v, Value::Null) {
6140 items.push(v);
6141 }
6142 }
6143 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
6144 let v = expr_arg_value(arg, ctx)?;
6145 if !matches!(v, Value::Null) {
6146 items.push(v);
6147 }
6148 }
6149 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
6150 let v = expr_arg_value(arg, ctx)?;
6151 match v {
6152 Value::Null => {}
6153 Value::Property(Property::Int64(i)) => {
6154 let f = i as f64;
6155 *sum += f;
6156 *sum_sq += f * f;
6157 *count += 1;
6158 }
6159 Value::Property(Property::Float64(f)) => {
6160 *sum += f;
6161 *sum_sq += f * f;
6162 *count += 1;
6163 }
6164 _ => return Err(Error::AggregateTypeError),
6165 }
6166 }
6167 AggState::ApocFirst(slot) => {
6168 if slot.is_some() {
6169 return Ok(());
6170 }
6171 let v = expr_arg_value(arg, ctx)?;
6172 if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6173 *slot = Some(v);
6174 }
6175 }
6176 AggState::ApocLast(slot) => {
6177 let v = expr_arg_value(arg, ctx)?;
6178 if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6179 *slot = Some(v);
6180 }
6181 }
6182 AggState::ApocNth {
6183 target,
6184 count,
6185 slot,
6186 } => {
6187 if slot.is_some() {
6188 return Ok(());
6189 }
6190 let v = expr_arg_value(arg, ctx)?;
6191 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6192 return Ok(());
6193 }
6194 if let Some(t) = *target {
6195 if *count == t {
6196 *slot = Some(v);
6197 }
6198 *count += 1;
6199 }
6200 }
6201 AggState::ApocMedian(items) => {
6202 let v = expr_arg_value(arg, ctx)?;
6203 match v {
6204 Value::Null | Value::Property(Property::Null) => {}
6205 Value::Property(Property::Int64(i)) => items.push(i as f64),
6206 Value::Property(Property::Float64(f)) => items.push(f),
6207 _ => return Err(Error::AggregateTypeError),
6208 }
6209 }
6210 AggState::ApocProduct {
6211 int_part,
6212 float_part,
6213 is_float,
6214 seen,
6215 } => {
6216 let v = expr_arg_value(arg, ctx)?;
6217 match v {
6218 Value::Null | Value::Property(Property::Null) => {}
6219 Value::Property(Property::Int64(i)) => {
6220 *int_part = int_part.saturating_mul(i);
6221 *seen = true;
6222 }
6223 Value::Property(Property::Float64(f)) => {
6224 *float_part *= f;
6225 *is_float = true;
6226 *seen = true;
6227 }
6228 _ => return Err(Error::AggregateTypeError),
6229 }
6230 }
6231 }
6232 Ok(())
6233 }
6234
6235 fn finalize(&self) -> Value {
6236 match self {
6237 AggState::Count(c) => Value::Property(Property::Int64(*c)),
6238 AggState::Sum {
6239 int_part,
6240 float_part,
6241 is_float,
6242 } => {
6243 if *is_float {
6244 Value::Property(Property::Float64(*float_part + *int_part as f64))
6245 } else {
6246 Value::Property(Property::Int64(*int_part))
6247 }
6248 }
6249 AggState::Avg { total, count } => {
6250 if *count == 0 {
6251 Value::Null
6252 } else {
6253 Value::Property(Property::Float64(*total / *count as f64))
6254 }
6255 }
6256 AggState::Min(slot) | AggState::Max(slot) => match slot {
6257 Some(v) => v.clone(),
6258 None => Value::Null,
6259 },
6260 AggState::Collect(items) => Value::List(items.clone()),
6261 AggState::StDevP { sum, sum_sq, count } => {
6262 if *count == 0 {
6263 Value::Property(Property::Float64(0.0))
6264 } else {
6265 let n = *count as f64;
6266 let variance = *sum_sq / n - (*sum / n).powi(2);
6267 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6268 }
6269 }
6270 AggState::StDev { sum, sum_sq, count } => {
6271 if *count < 2 {
6272 Value::Property(Property::Float64(0.0))
6273 } else {
6274 let n = *count as f64;
6275 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
6276 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6277 }
6278 }
6279 AggState::PercentileDisc { items, percentile } => {
6280 percentile_disc(items, percentile.unwrap_or(0.0))
6281 }
6282 AggState::PercentileCont { items, percentile } => {
6283 percentile_cont(items, percentile.unwrap_or(0.0))
6284 }
6285 AggState::ApocFirst(slot) | AggState::ApocLast(slot) => match slot {
6286 Some(v) => v.clone(),
6287 None => Value::Null,
6288 },
6289 AggState::ApocNth { slot, .. } => match slot {
6290 Some(v) => v.clone(),
6291 None => Value::Null,
6292 },
6293 AggState::ApocMedian(items) => {
6294 if items.is_empty() {
6295 return Value::Null;
6296 }
6297 let mut sorted = items.clone();
6298 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6299 let n = sorted.len();
6300 let median = if n % 2 == 1 {
6301 sorted[n / 2]
6302 } else {
6303 (sorted[n / 2 - 1] + sorted[n / 2]) / 2.0
6304 };
6305 Value::Property(Property::Float64(median))
6306 }
6307 AggState::ApocProduct {
6308 int_part,
6309 float_part,
6310 is_float,
6311 seen,
6312 } => {
6313 if !*seen {
6316 return Value::Null;
6317 }
6318 if *is_float {
6319 Value::Property(Property::Float64(*float_part * *int_part as f64))
6320 } else {
6321 Value::Property(Property::Int64(*int_part))
6322 }
6323 }
6324 }
6325 }
6326}
6327
6328fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
6329 match arg {
6330 AggregateArg::Star => Err(Error::AggregateTypeError),
6331 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
6332 }
6333}
6334
6335fn value_to_f64(v: &Value) -> f64 {
6339 match v {
6340 Value::Property(Property::Int64(i)) => *i as f64,
6341 Value::Property(Property::Float64(f)) => *f,
6342 _ => f64::NAN,
6343 }
6344}
6345
6346fn percentile_disc(items: &[Value], p: f64) -> Value {
6351 let mut nums: Vec<(f64, Value)> = items
6352 .iter()
6353 .map(|v| (value_to_f64(v), v.clone()))
6354 .filter(|(f, _)| !f.is_nan())
6355 .collect();
6356 if nums.is_empty() {
6357 return Value::Null;
6358 }
6359 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
6360 let p = p.clamp(0.0, 1.0);
6361 let n = nums.len();
6362 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
6364 nums[idx.min(n - 1)].1.clone()
6365}
6366
6367fn percentile_cont(items: &[Value], p: f64) -> Value {
6371 let mut nums: Vec<f64> = items
6372 .iter()
6373 .map(value_to_f64)
6374 .filter(|f| !f.is_nan())
6375 .collect();
6376 if nums.is_empty() {
6377 return Value::Null;
6378 }
6379 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6380 let p = p.clamp(0.0, 1.0);
6381 let n = nums.len();
6382 if n == 1 {
6383 return Value::Property(Property::Float64(nums[0]));
6384 }
6385 let pos = p * (n as f64 - 1.0);
6386 let lo = pos.floor() as usize;
6387 let hi = pos.ceil() as usize;
6388 let frac = pos - lo as f64;
6389 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
6390 Value::Property(Property::Float64(v))
6391}
6392
6393struct SkipOp {
6394 input: Box<dyn Operator>,
6395 count_expr: Expr,
6396 remaining: Option<i64>,
6397}
6398
6399impl SkipOp {
6400 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
6401 Self {
6402 input,
6403 count_expr,
6404 remaining: None,
6405 }
6406 }
6407}
6408
6409impl Operator for SkipOp {
6410 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6411 if self.remaining.is_none() {
6412 let empty = Row::new();
6413 let ectx = ctx.eval_ctx(&empty);
6414 let val = eval_expr(&self.count_expr, &ectx)?;
6415 self.remaining = Some(expr_to_count(val)?);
6416 }
6417 let rem = self.remaining.as_mut().unwrap();
6418 while *rem > 0 {
6419 if self.input.next(ctx)?.is_none() {
6420 return Ok(None);
6421 }
6422 *rem -= 1;
6423 }
6424 self.input.next(ctx)
6425 }
6426}
6427
6428struct LimitOp {
6429 input: Box<dyn Operator>,
6430 count_expr: Expr,
6431 remaining: Option<i64>,
6432 drain_on_complete: bool,
6439 drained: bool,
6442}
6443
6444impl LimitOp {
6445 fn new(input: Box<dyn Operator>, count_expr: Expr, drain_on_complete: bool) -> Self {
6446 Self {
6447 input,
6448 count_expr,
6449 remaining: None,
6450 drain_on_complete,
6451 drained: false,
6452 }
6453 }
6454
6455 fn drain_input(&mut self, ctx: &ExecCtx) -> Result<()> {
6459 if self.drained {
6460 return Ok(());
6461 }
6462 while self.input.next(ctx)?.is_some() {}
6463 self.drained = true;
6464 Ok(())
6465 }
6466}
6467
6468impl Operator for LimitOp {
6469 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6470 if self.remaining.is_none() {
6471 let empty = Row::new();
6472 let ectx = ctx.eval_ctx(&empty);
6473 let val = eval_expr(&self.count_expr, &ectx)?;
6474 self.remaining = Some(expr_to_count(val)?);
6475 }
6476 let rem = self.remaining.as_mut().unwrap();
6477 if *rem <= 0 {
6478 if self.drain_on_complete {
6479 self.drain_input(ctx)?;
6480 }
6481 return Ok(None);
6482 }
6483 match self.input.next(ctx)? {
6484 Some(row) => {
6485 *rem -= 1;
6486 Ok(Some(row))
6487 }
6488 None => Ok(None),
6489 }
6490 }
6491}
6492
6493fn expr_to_count(val: Value) -> Result<i64> {
6494 match val {
6495 Value::Null | Value::Property(Property::Null) => Ok(0),
6496 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
6497 _ => Err(Error::TypeMismatch),
6502 }
6503}
6504
6505struct RowsLiteralOp {
6512 rows: Vec<Row>,
6513 cursor: usize,
6514}
6515
6516impl RowsLiteralOp {
6517 fn new(rows: Vec<Row>) -> Self {
6518 Self { rows, cursor: 0 }
6519 }
6520}
6521
6522impl Operator for RowsLiteralOp {
6523 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
6524 if self.cursor < self.rows.len() {
6525 let row = self.rows[self.cursor].clone();
6526 self.cursor += 1;
6527 Ok(Some(row))
6528 } else {
6529 Ok(None)
6530 }
6531 }
6532}