1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12 ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13 Literal, LogicalPlan, PointSeekBounds, PropertyType as CypherPropertyType, RemoveSpec,
14 ReturnItem, SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17 ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18 PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24#[derive(Default)]
30pub struct Tombstones {
31 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36 pub store: &'a dyn GraphReader,
37 pub writer: &'a dyn GraphWriter,
38 pub params: &'a ParamMap,
43 pub procedures: &'a ProcedureRegistry,
48 pub outer_rows: &'a [&'a Row],
56 pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64 fn put_node(&self, _: &Node) -> Result<()> {
65 Ok(())
66 }
67 fn put_edge(&self, _: &Edge) -> Result<()> {
68 Ok(())
69 }
70 fn delete_edge(&self, _: EdgeId) -> Result<()> {
71 Ok(())
72 }
73 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74 Ok(())
75 }
76}
77
78impl<'a> ExecCtx<'a> {
79 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85 where
86 'a: 'b,
87 {
88 EvalCtx {
89 row,
90 params: self.params,
91 reader: self.store,
92 procedures: self.procedures,
93 outer_rows: self.outer_rows,
94 tombstones: self.tombstones,
95 }
96 }
97
98 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104 if let Some(v) = row.get(name) {
105 return Some(v);
106 }
107 for outer in self.outer_rows {
108 if let Some(v) = outer.get(name) {
109 return Some(v);
110 }
111 }
112 None
113 }
114}
115
116pub trait Operator {
117 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124 let params = ParamMap::new();
125 execute_with_reader(
126 plan,
127 store as &dyn GraphReader,
128 store as &dyn GraphWriter,
129 ¶ms,
130 )
131}
132
133pub fn execute_with_writer(
138 plan: &LogicalPlan,
139 store: &RocksDbStorageEngine,
140 writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142 let params = ParamMap::new();
143 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
144}
145
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151 let text = meshdb_cypher::format_plan(plan);
152 let mut row = Row::new();
153 row.insert("plan".to_string(), Value::Property(Property::String(text)));
154 vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158 let result_rows = execute(plan, store)?;
159 let row_count = result_rows.len() as i64;
160 let plan_text = meshdb_cypher::format_plan(plan);
161 let summary = format!("{plan_text}\nRows: {row_count}");
162 let mut row = Row::new();
163 row.insert(
164 "profile".to_string(),
165 Value::Property(Property::String(summary)),
166 );
167 row.insert(
168 "rows".to_string(),
169 Value::Property(Property::Int64(row_count)),
170 );
171 Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175 plan: &LogicalPlan,
176 reader: &dyn GraphReader,
177 writer: &dyn GraphWriter,
178 params: &ParamMap,
179) -> Result<Vec<Row>> {
180 let empty_procs = ProcedureRegistry::new();
181 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184pub fn execute_with_seed(
202 plan: &LogicalPlan,
203 seed: Option<&Row>,
204 reader: &dyn GraphReader,
205 writer: &dyn GraphWriter,
206 params: &ParamMap,
207 procedures: &ProcedureRegistry,
208) -> Result<Vec<Row>> {
209 crate::eval::reset_statement_time();
210 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
211 return Ok(rows);
212 }
213 let suppress_output = is_write_only_plan(plan);
214 let mut op = build_op_inner(plan, seed, &mut None);
215 let tombstones = Tombstones::default();
216 let outer_rows: Vec<&Row> = seed.into_iter().collect();
217 let ctx = ExecCtx {
218 store: reader,
219 writer,
220 params,
221 procedures,
222 outer_rows: &outer_rows,
223 tombstones: &tombstones,
224 };
225 let mut rows = Vec::new();
226 while let Some(row) = op.next(&ctx)? {
227 rows.push(row);
228 }
229 if suppress_output {
230 Ok(Vec::new())
231 } else {
232 Ok(rows)
233 }
234}
235
236pub fn execute_with_in_tx_substitute(
249 plan: &LogicalPlan,
250 in_tx_rows: Vec<Row>,
251 reader: &dyn GraphReader,
252 writer: &dyn GraphWriter,
253 params: &ParamMap,
254 procedures: &ProcedureRegistry,
255) -> Result<Vec<Row>> {
256 crate::eval::reset_statement_time();
257 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
258 return Ok(rows);
259 }
260 let suppress_output = is_write_only_plan(plan);
261 let mut substitute = Some(in_tx_rows);
262 let mut op = build_op_inner(plan, None, &mut substitute);
263 let tombstones = Tombstones::default();
264 let ctx = ExecCtx {
265 store: reader,
266 writer,
267 params,
268 procedures,
269 outer_rows: &[],
270 tombstones: &tombstones,
271 };
272 let mut rows = Vec::new();
273 while let Some(row) = op.next(&ctx)? {
274 rows.push(row);
275 }
276 if suppress_output {
277 Ok(Vec::new())
278 } else {
279 Ok(rows)
280 }
281}
282
283pub fn execute_with_reader_and_procs(
284 plan: &LogicalPlan,
285 reader: &dyn GraphReader,
286 writer: &dyn GraphWriter,
287 params: &ParamMap,
288 procedures: &ProcedureRegistry,
289) -> Result<Vec<Row>> {
290 crate::eval::reset_statement_time();
295 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
300 return Ok(rows);
301 }
302 let suppress_output = is_write_only_plan(plan);
303 let mut op = build_op(plan);
304 let tombstones = Tombstones::default();
305 let ctx = ExecCtx {
306 store: reader,
307 writer,
308 params,
309 procedures,
310 outer_rows: &[],
311 tombstones: &tombstones,
312 };
313 let mut rows = Vec::new();
314 while let Some(row) = op.next(&ctx)? {
315 rows.push(row);
316 }
317 if suppress_output {
318 Ok(Vec::new())
319 } else {
320 Ok(rows)
321 }
322}
323
324fn is_write_only_plan(plan: &LogicalPlan) -> bool {
325 match plan {
329 LogicalPlan::CreatePath { .. }
330 | LogicalPlan::Delete { .. }
331 | LogicalPlan::SetProperty { .. }
332 | LogicalPlan::Remove { .. }
333 | LogicalPlan::Foreach { .. }
334 | LogicalPlan::MergeNode { .. }
335 | LogicalPlan::MergeEdge { .. } => true,
336 _ => false,
337 }
338}
339
340pub(crate) fn plan_contains_writes(plan: &LogicalPlan) -> bool {
347 use LogicalPlan::*;
348 match plan {
349 CreatePath { .. }
351 | Delete { .. }
352 | SetProperty { .. }
353 | Remove { .. }
354 | MergeNode { .. }
355 | MergeEdge { .. }
356 | Foreach { .. } => true,
357
358 Filter { input, .. }
360 | Project { input, .. }
361 | Aggregate { input, .. }
362 | Distinct { input, .. }
363 | OrderBy { input, .. }
364 | Skip { input, .. }
365 | Limit { input, .. }
366 | EdgeExpand { input, .. }
367 | OptionalEdgeExpand { input, .. }
368 | VarLengthExpand { input, .. }
369 | Identity { input, .. }
370 | CoalesceNullRow { input, .. }
371 | UnwindChain { input, .. }
372 | BindPath { input, .. }
373 | ShortestPath { input, .. } => plan_contains_writes(input),
374
375 CartesianProduct { left, right } => {
377 plan_contains_writes(left) || plan_contains_writes(right)
378 }
379
380 Union { branches, .. } => branches.iter().any(plan_contains_writes),
382
383 CallSubquery { input, body } | OptionalApply { input, body, .. } => {
386 plan_contains_writes(input) || plan_contains_writes(body)
387 }
388 CallSubqueryInTransactions { input, .. } => plan_contains_writes(input),
394 ApocPeriodicIterate { .. } => false,
400
401 ProcedureCall { input, .. } | LoadCsv { input, .. } => {
404 input.as_ref().map_or(false, |i| plan_contains_writes(i))
405 }
406
407 _ => false,
409 }
410}
411
412fn try_execute_ddl(
422 plan: &LogicalPlan,
423 reader: &dyn GraphReader,
424 writer: &dyn GraphWriter,
425) -> Result<Option<Vec<Row>>> {
426 match plan {
427 LogicalPlan::CreatePropertyIndex { label, properties } => {
428 writer.create_property_index(label, properties)?;
429 Ok(Some(vec![node_index_ddl_ack_row(
430 "created", label, properties,
431 )]))
432 }
433 LogicalPlan::DropPropertyIndex { label, properties } => {
434 writer.drop_property_index(label, properties)?;
435 Ok(Some(vec![node_index_ddl_ack_row(
436 "dropped", label, properties,
437 )]))
438 }
439 LogicalPlan::CreateEdgePropertyIndex {
440 edge_type,
441 properties,
442 } => {
443 writer.create_edge_property_index(edge_type, properties)?;
444 Ok(Some(vec![edge_index_ddl_ack_row(
445 "created", edge_type, properties,
446 )]))
447 }
448 LogicalPlan::DropEdgePropertyIndex {
449 edge_type,
450 properties,
451 } => {
452 writer.drop_edge_property_index(edge_type, properties)?;
453 Ok(Some(vec![edge_index_ddl_ack_row(
454 "dropped", edge_type, properties,
455 )]))
456 }
457 LogicalPlan::ShowPropertyIndexes => {
458 let mut rows: Vec<Row> = Vec::new();
464 for (label, properties) in reader.list_property_indexes()? {
465 rows.push(show_index_row("NODE", label, properties));
466 }
467 for (edge_type, properties) in reader.list_edge_property_indexes()? {
468 rows.push(show_index_row("RELATIONSHIP", edge_type, properties));
469 }
470 Ok(Some(rows))
471 }
472 LogicalPlan::CreatePointIndex { label, property } => {
473 writer.create_point_index(label, property)?;
474 Ok(Some(vec![point_index_ddl_ack_row(
475 "created", "NODE", label, property,
476 )]))
477 }
478 LogicalPlan::DropPointIndex { label, property } => {
479 writer.drop_point_index(label, property)?;
480 Ok(Some(vec![point_index_ddl_ack_row(
481 "dropped", "NODE", label, property,
482 )]))
483 }
484 LogicalPlan::CreateEdgePointIndex {
485 edge_type,
486 property,
487 } => {
488 writer.create_edge_point_index(edge_type, property)?;
489 Ok(Some(vec![point_index_ddl_ack_row(
490 "created",
491 "RELATIONSHIP",
492 edge_type,
493 property,
494 )]))
495 }
496 LogicalPlan::DropEdgePointIndex {
497 edge_type,
498 property,
499 } => {
500 writer.drop_edge_point_index(edge_type, property)?;
501 Ok(Some(vec![point_index_ddl_ack_row(
502 "dropped",
503 "RELATIONSHIP",
504 edge_type,
505 property,
506 )]))
507 }
508 LogicalPlan::ShowPointIndexes => {
509 let mut rows: Vec<Row> = Vec::new();
516 for (label, property) in reader.list_point_indexes()? {
517 rows.push(show_point_index_row("NODE", label, property));
518 }
519 for (edge_type, property) in reader.list_edge_point_indexes()? {
520 rows.push(show_point_index_row("RELATIONSHIP", edge_type, property));
521 }
522 Ok(Some(rows))
523 }
524 LogicalPlan::CreatePropertyConstraint {
525 name,
526 scope,
527 properties,
528 kind,
529 if_not_exists,
530 } => {
531 let storage_kind = match kind {
532 ConstraintKind::Unique => PropertyConstraintKind::Unique,
533 ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
534 ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
535 ConstraintKind::PropertyType(t) => {
536 PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
537 }
538 };
539 let storage_scope = cypher_to_storage_scope(scope);
540 let spec = writer.create_property_constraint(
541 name.as_deref(),
542 &storage_scope,
543 properties,
544 storage_kind,
545 *if_not_exists,
546 )?;
547 Ok(Some(vec![constraint_ack_row("created", &spec)]))
548 }
549 LogicalPlan::DropPropertyConstraint { name, if_exists } => {
550 writer.drop_property_constraint(name, *if_exists)?;
551 let mut row = Row::default();
552 row.insert(
553 "state".into(),
554 Value::Property(Property::String("dropped".into())),
555 );
556 row.insert(
557 "name".into(),
558 Value::Property(Property::String(name.clone())),
559 );
560 Ok(Some(vec![row]))
561 }
562 LogicalPlan::ShowPropertyConstraints => {
563 let specs = reader.list_property_constraints()?;
564 let rows = specs.into_iter().map(constraint_show_row).collect();
565 Ok(Some(rows))
566 }
567 _ => Ok(None),
568 }
569}
570
571fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
576 let mut row = constraint_show_row(spec.clone());
577 row.insert(
578 "state".into(),
579 Value::Property(Property::String(state.into())),
580 );
581 row
582}
583
584fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
590 let mut row = Row::default();
591 row.insert("name".into(), Value::Property(Property::String(spec.name)));
592 let (scope_tag, target) = match spec.scope {
593 meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
594 meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
595 };
596 row.insert(
597 "scope".into(),
598 Value::Property(Property::String(scope_tag.into())),
599 );
600 row.insert("label".into(), Value::Property(Property::String(target)));
605 let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
606 row.insert("properties".into(), Value::Property(Property::List(props)));
607 row.insert(
608 "type".into(),
609 Value::Property(Property::String(spec.kind.as_string())),
610 );
611 row
612}
613
614fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
618 match scope {
619 CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
620 CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
621 }
622}
623
624fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
629 match t {
630 CypherPropertyType::String => StoragePropertyType::String,
631 CypherPropertyType::Integer => StoragePropertyType::Integer,
632 CypherPropertyType::Float => StoragePropertyType::Float,
633 CypherPropertyType::Boolean => StoragePropertyType::Boolean,
634 }
635}
636
637fn node_index_ddl_ack_row(state: &str, label: &str, properties: &[String]) -> Row {
644 let mut row = Row::default();
645 row.insert(
646 "state".into(),
647 Value::Property(Property::String(state.into())),
648 );
649 row.insert(
650 "scope".into(),
651 Value::Property(Property::String("NODE".into())),
652 );
653 row.insert(
654 "label".into(),
655 Value::Property(Property::String(label.into())),
656 );
657 row.insert(
662 "property".into(),
663 Value::Property(Property::String(properties.join(","))),
664 );
665 row.insert("properties".into(), properties_list_value(properties));
666 row
667}
668
669fn edge_index_ddl_ack_row(state: &str, edge_type: &str, properties: &[String]) -> Row {
675 let mut row = Row::default();
676 row.insert(
677 "state".into(),
678 Value::Property(Property::String(state.into())),
679 );
680 row.insert(
681 "scope".into(),
682 Value::Property(Property::String("RELATIONSHIP".into())),
683 );
684 row.insert(
685 "label".into(),
686 Value::Property(Property::String(edge_type.into())),
687 );
688 row.insert(
689 "edge_type".into(),
690 Value::Property(Property::String(edge_type.into())),
691 );
692 row.insert(
693 "property".into(),
694 Value::Property(Property::String(properties.join(","))),
695 );
696 row.insert("properties".into(), properties_list_value(properties));
697 row
698}
699
700fn properties_list_value(properties: &[String]) -> Value {
701 Value::List(
702 properties
703 .iter()
704 .map(|p| Value::Property(Property::String(p.clone())))
705 .collect(),
706 )
707}
708
709fn point_index_ddl_ack_row(state: &str, scope: &str, target: &str, property: &str) -> Row {
716 let mut row = Row::default();
717 row.insert(
718 "state".into(),
719 Value::Property(Property::String(state.into())),
720 );
721 row.insert(
722 "scope".into(),
723 Value::Property(Property::String(scope.into())),
724 );
725 row.insert(
726 "type".into(),
727 Value::Property(Property::String("POINT".into())),
728 );
729 row.insert(
730 "label".into(),
731 Value::Property(Property::String(target.into())),
732 );
733 if scope == "RELATIONSHIP" {
734 row.insert(
735 "edge_type".into(),
736 Value::Property(Property::String(target.into())),
737 );
738 }
739 row.insert(
740 "property".into(),
741 Value::Property(Property::String(property.into())),
742 );
743 row
744}
745
746fn show_point_index_row(scope: &str, target: String, property: String) -> Row {
751 let mut row = Row::default();
752 row.insert(
753 "scope".into(),
754 Value::Property(Property::String(scope.into())),
755 );
756 row.insert(
757 "type".into(),
758 Value::Property(Property::String("POINT".into())),
759 );
760 row.insert(
761 "label".into(),
762 Value::Property(Property::String(target.clone())),
763 );
764 if scope == "RELATIONSHIP" {
765 row.insert(
766 "edge_type".into(),
767 Value::Property(Property::String(target)),
768 );
769 }
770 row.insert(
771 "property".into(),
772 Value::Property(Property::String(property)),
773 );
774 row.insert(
775 "state".into(),
776 Value::Property(Property::String("online".into())),
777 );
778 row
779}
780
781fn show_index_row(scope: &str, target: String, properties: Vec<String>) -> Row {
789 let mut row = Row::default();
790 row.insert(
791 "scope".into(),
792 Value::Property(Property::String(scope.into())),
793 );
794 row.insert(
795 "label".into(),
796 Value::Property(Property::String(target.clone())),
797 );
798 if scope == "RELATIONSHIP" {
799 row.insert(
800 "edge_type".into(),
801 Value::Property(Property::String(target)),
802 );
803 }
804 row.insert(
805 "property".into(),
806 Value::Property(Property::String(properties.join(","))),
807 );
808 row.insert("properties".into(), properties_list_value(&properties));
809 row.insert(
810 "state".into(),
811 Value::Property(Property::String("online".into())),
812 );
813 row
814}
815
816fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
817 build_op_inner(plan, None, &mut None)
818}
819
820pub(crate) fn build_op_inner(
835 plan: &LogicalPlan,
836 seed: Option<&Row>,
837 in_tx_substitute: &mut Option<Vec<Row>>,
838) -> Box<dyn Operator> {
839 macro_rules! child {
840 ($p:expr) => {
841 build_op_inner($p, seed, in_tx_substitute)
842 };
843 }
844 match plan {
845 LogicalPlan::CreatePath {
846 input,
847 nodes,
848 edges,
849 } => Box::new(CreatePathOp::new(
850 input.as_ref().map(|p| child!(p)),
851 nodes.clone(),
852 edges.clone(),
853 )),
854 LogicalPlan::CartesianProduct { left, right } => {
855 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
856 }
857 LogicalPlan::Delete {
858 input,
859 detach,
860 vars,
861 exprs,
862 } => Box::new(DeleteOp::new(
863 child!(input),
864 *detach,
865 vars.clone(),
866 exprs.clone(),
867 )),
868 LogicalPlan::SetProperty { input, assignments } => {
869 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
870 }
871 LogicalPlan::Remove { input, items } => {
872 Box::new(RemoveOp::new(child!(input), items.clone()))
873 }
874 LogicalPlan::LoadCsv {
875 input,
876 path_expr,
877 var,
878 with_headers,
879 } => Box::new(LoadCsvOp::new(
880 input.as_ref().map(|p| child!(p)),
881 path_expr.clone(),
882 var.clone(),
883 *with_headers,
884 )),
885 LogicalPlan::Foreach {
886 input,
887 var,
888 list_expr,
889 set_assignments,
890 remove_items,
891 } => Box::new(ForeachOp::new(
892 child!(input),
893 var.clone(),
894 list_expr.clone(),
895 set_assignments.clone(),
896 remove_items.clone(),
897 )),
898 LogicalPlan::CallSubquery { input, body } => {
899 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
900 }
901 LogicalPlan::CallSubqueryInTransactions { .. }
902 | LogicalPlan::ApocPeriodicIterate { .. } => {
903 match in_tx_substitute.take() {
920 Some(rows) => Box::new(RowsLiteralOp::new(rows)),
921 None => panic!(
922 "Batched-commit LogicalPlan variant reached `build_op` without a row \
923 substitute — the server-side dispatcher \
924 (execute_cypher_in_tx → execute_call_in_transactions / \
925 execute_apoc_periodic_iterate) must inject one before invoking the \
926 operator pipeline"
927 ),
928 }
929 }
930 LogicalPlan::OptionalApply {
931 input,
932 body,
933 null_vars,
934 } => Box::new(OptionalApplyOp::new(
935 child!(input),
936 (**body).clone(),
937 null_vars.clone(),
938 )),
939 LogicalPlan::ProcedureCall {
940 input,
941 qualified_name,
942 args,
943 yield_spec,
944 standalone,
945 } => Box::new(ProcedureCallOp::new(
946 input.as_ref().map(|p| child!(p)),
947 qualified_name.clone(),
948 args.clone(),
949 yield_spec.clone(),
950 *standalone,
951 )),
952 LogicalPlan::SeedRow => match seed {
953 Some(r) => Box::new(SeededRowOp {
954 row: Some(r.clone()),
955 }),
956 None => Box::new(SeedRowOp { done: false }),
957 },
958 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
959 LogicalPlan::NodeScanByLabels { var, labels } => {
960 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
961 }
962 LogicalPlan::EdgeExpand {
963 input,
964 src_var,
965 edge_var,
966 dst_var,
967 dst_labels,
968 edge_properties,
969 edge_types,
970 direction,
971 edge_constraint_var,
972 } => Box::new(EdgeExpandOp::new(
973 child!(input),
974 src_var.clone(),
975 edge_var.clone(),
976 dst_var.clone(),
977 dst_labels.clone(),
978 edge_properties.clone(),
979 edge_types.clone(),
980 *direction,
981 edge_constraint_var.clone(),
982 )),
983 LogicalPlan::OptionalEdgeExpand {
984 input,
985 src_var,
986 edge_var,
987 dst_var,
988 dst_labels,
989 dst_properties,
990 edge_types,
991 direction,
992 dst_constraint_var,
993 edge_constraint_var,
994 } => Box::new(OptionalEdgeExpandOp::new(
995 child!(input),
996 src_var.clone(),
997 edge_var.clone(),
998 dst_var.clone(),
999 dst_labels.clone(),
1000 dst_properties.clone(),
1001 edge_types.clone(),
1002 *direction,
1003 dst_constraint_var.clone(),
1004 edge_constraint_var.clone(),
1005 )),
1006 LogicalPlan::VarLengthExpand {
1007 input,
1008 src_var,
1009 edge_var,
1010 dst_var,
1011 dst_labels,
1012 edge_types,
1013 edge_properties,
1014 direction,
1015 min_hops,
1016 max_hops,
1017 path_var,
1018 optional,
1019 dst_constraint_var,
1020 bound_edge_list_var,
1021 excluded_edge_vars,
1022 } => Box::new(VarLengthExpandOp::new(
1023 child!(input),
1024 src_var.clone(),
1025 edge_var.clone(),
1026 dst_var.clone(),
1027 dst_labels.clone(),
1028 edge_types.clone(),
1029 edge_properties.clone(),
1030 *direction,
1031 *min_hops,
1032 *max_hops,
1033 path_var.clone(),
1034 *optional,
1035 dst_constraint_var.clone(),
1036 bound_edge_list_var.clone(),
1037 excluded_edge_vars.clone(),
1038 )),
1039 LogicalPlan::Filter { input, predicate } => {
1040 Box::new(FilterOp::new(child!(input), predicate.clone()))
1041 }
1042 LogicalPlan::Project { input, items } => {
1043 Box::new(ProjectOp::new(child!(input), items.clone()))
1044 }
1045 LogicalPlan::Aggregate {
1046 input,
1047 group_keys,
1048 aggregates,
1049 } => Box::new(AggregateOp::new(
1050 child!(input),
1051 group_keys.clone(),
1052 aggregates.clone(),
1053 )),
1054 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
1055 LogicalPlan::CoalesceNullRow { input, null_vars } => {
1056 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
1057 }
1058 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
1059 LogicalPlan::OrderBy { input, sort_items } => {
1060 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
1061 }
1062 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
1063 LogicalPlan::Limit { input, count } => {
1064 let drain_on_complete = plan_contains_writes(input);
1070 Box::new(LimitOp::new(
1071 child!(input),
1072 count.clone(),
1073 drain_on_complete,
1074 ))
1075 }
1076 LogicalPlan::MergeNode {
1077 input,
1078 var,
1079 labels,
1080 properties,
1081 on_create,
1082 on_match,
1083 } => Box::new(MergeNodeOp::new(
1084 input.as_ref().map(|p| child!(p)),
1085 var.clone(),
1086 labels.clone(),
1087 properties.clone(),
1088 on_create.clone(),
1089 on_match.clone(),
1090 )),
1091 LogicalPlan::MergeEdge {
1092 input,
1093 edge_var,
1094 src_var,
1095 dst_var,
1096 edge_type,
1097 undirected,
1098 properties,
1099 on_create,
1100 on_match,
1101 } => Box::new(MergeEdgeOp::new(
1102 child!(input),
1103 edge_var.clone(),
1104 src_var.clone(),
1105 dst_var.clone(),
1106 edge_type.clone(),
1107 *undirected,
1108 properties.clone(),
1109 on_create.clone(),
1110 on_match.clone(),
1111 )),
1112 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
1113 LogicalPlan::UnwindChain { input, var, expr } => {
1114 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
1115 }
1116 LogicalPlan::IndexSeek {
1117 var,
1118 label,
1119 properties,
1120 values,
1121 } => Box::new(IndexSeekOp::new(
1122 var.clone(),
1123 label.clone(),
1124 properties.clone(),
1125 values.clone(),
1126 )),
1127 LogicalPlan::PointIndexSeek {
1128 var,
1129 label,
1130 property,
1131 bounds,
1132 } => Box::new(PointIndexSeekOp::new(
1133 var.clone(),
1134 label.clone(),
1135 property.clone(),
1136 bounds.clone(),
1137 )),
1138 LogicalPlan::EdgeSeek {
1139 edge_var,
1140 src_var,
1141 dst_var,
1142 edge_type,
1143 property,
1144 value,
1145 direction,
1146 residual_properties,
1147 } => Box::new(EdgeSeekOp::new(
1148 edge_var.clone(),
1149 src_var.clone(),
1150 dst_var.clone(),
1151 edge_type.clone(),
1152 property.clone(),
1153 value.clone(),
1154 *direction,
1155 residual_properties.clone(),
1156 )),
1157 LogicalPlan::EdgePointIndexSeek {
1158 edge_var,
1159 src_var,
1160 dst_var,
1161 edge_type,
1162 property,
1163 direction,
1164 bounds,
1165 } => Box::new(EdgePointIndexSeekOp::new(
1166 edge_var.clone(),
1167 src_var.clone(),
1168 dst_var.clone(),
1169 edge_type.clone(),
1170 property.clone(),
1171 *direction,
1172 bounds.clone(),
1173 )),
1174 LogicalPlan::Union { branches, all } => {
1179 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
1180 Box::new(UnionOp::new(branch_ops, *all))
1181 }
1182 LogicalPlan::BindPath {
1183 input,
1184 path_var,
1185 node_vars,
1186 edge_vars,
1187 } => Box::new(BindPathOp::new(
1188 child!(input),
1189 path_var.clone(),
1190 node_vars.clone(),
1191 edge_vars.clone(),
1192 )),
1193 LogicalPlan::ShortestPath {
1194 input,
1195 src_var,
1196 dst_var,
1197 path_var,
1198 edge_types,
1199 direction,
1200 max_hops,
1201 kind,
1202 } => Box::new(ShortestPathOp::new(
1203 child!(input),
1204 src_var.clone(),
1205 dst_var.clone(),
1206 path_var.clone(),
1207 edge_types.clone(),
1208 *direction,
1209 *max_hops,
1210 *kind,
1211 )),
1212 LogicalPlan::CreatePropertyIndex { .. }
1213 | LogicalPlan::DropPropertyIndex { .. }
1214 | LogicalPlan::CreateEdgePropertyIndex { .. }
1215 | LogicalPlan::DropEdgePropertyIndex { .. }
1216 | LogicalPlan::ShowPropertyIndexes
1217 | LogicalPlan::CreatePointIndex { .. }
1218 | LogicalPlan::DropPointIndex { .. }
1219 | LogicalPlan::CreateEdgePointIndex { .. }
1220 | LogicalPlan::DropEdgePointIndex { .. }
1221 | LogicalPlan::ShowPointIndexes
1222 | LogicalPlan::CreatePropertyConstraint { .. }
1223 | LogicalPlan::DropPropertyConstraint { .. }
1224 | LogicalPlan::ShowPropertyConstraints => {
1225 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
1226 }
1227 }
1228}
1229
1230struct UnwindOp {
1231 var: String,
1232 expr: Expr,
1233 items: Option<Vec<Value>>,
1234 cursor: usize,
1235}
1236
1237impl UnwindOp {
1238 fn new(var: String, expr: Expr) -> Self {
1239 Self {
1240 var,
1241 expr,
1242 items: None,
1243 cursor: 0,
1244 }
1245 }
1246}
1247
1248impl Operator for UnwindOp {
1249 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1250 if self.items.is_none() {
1251 let empty = Row::new();
1252 let ectx = EvalCtx {
1253 row: &empty,
1254 params: ctx.params,
1255 reader: ctx.store,
1256 procedures: ctx.procedures,
1257 outer_rows: ctx.outer_rows,
1258 tombstones: ctx.tombstones,
1259 };
1260 let val = eval_expr(&self.expr, &ectx)?;
1261 self.items = Some(coerce_unwind_list(val)?);
1262 }
1263 let items = self.items.as_ref().unwrap();
1264 if self.cursor < items.len() {
1265 let v = items[self.cursor].clone();
1266 self.cursor += 1;
1267 let mut row = Row::new();
1268 row.insert(self.var.clone(), v);
1269 Ok(Some(row))
1270 } else {
1271 Ok(None)
1272 }
1273 }
1274}
1275
1276struct UnwindChainOp {
1282 input: Box<dyn Operator>,
1283 var: String,
1284 expr: Expr,
1285 current_row: Option<Row>,
1286 items: Vec<Value>,
1287 cursor: usize,
1288}
1289
1290impl UnwindChainOp {
1291 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
1292 Self {
1293 input,
1294 var,
1295 expr,
1296 current_row: None,
1297 items: Vec::new(),
1298 cursor: 0,
1299 }
1300 }
1301}
1302
1303impl Operator for UnwindChainOp {
1304 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1305 loop {
1306 if let Some(base) = &self.current_row {
1307 if self.cursor < self.items.len() {
1308 let v = self.items[self.cursor].clone();
1309 self.cursor += 1;
1310 let mut row = base.clone();
1311 row.insert(self.var.clone(), v);
1312 return Ok(Some(row));
1313 }
1314 self.current_row = None;
1315 self.items.clear();
1316 self.cursor = 0;
1317 }
1318 let base = match self.input.next(ctx)? {
1319 Some(r) => r,
1320 None => return Ok(None),
1321 };
1322 let ectx = EvalCtx {
1323 row: &base,
1324 params: ctx.params,
1325 reader: ctx.store,
1326 procedures: ctx.procedures,
1327 outer_rows: ctx.outer_rows,
1328 tombstones: ctx.tombstones,
1329 };
1330 let val = eval_expr(&self.expr, &ectx)?;
1331 self.items = coerce_unwind_list(val)?;
1332 self.current_row = Some(base);
1333 }
1334 }
1335}
1336
1337fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
1342 match val {
1343 Value::List(items) => Ok(items),
1344 Value::Property(Property::List(props)) => {
1345 Ok(props.into_iter().map(Value::Property).collect())
1346 }
1347 Value::Null => Ok(Vec::new()),
1348 _ => Err(Error::TypeMismatch),
1349 }
1350}
1351
1352struct CreatePathOp {
1353 input: Option<Box<dyn Operator>>,
1354 nodes: Vec<CreateNodeSpec>,
1355 edges: Vec<CreateEdgeSpec>,
1356 done: bool,
1357 buffered: Option<Vec<Row>>,
1358 cursor: usize,
1359}
1360
1361impl CreatePathOp {
1362 fn new(
1363 input: Option<Box<dyn Operator>>,
1364 nodes: Vec<CreateNodeSpec>,
1365 edges: Vec<CreateEdgeSpec>,
1366 ) -> Self {
1367 Self {
1368 input,
1369 nodes,
1370 edges,
1371 done: false,
1372 buffered: None,
1373 cursor: 0,
1374 }
1375 }
1376
1377 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
1378 let mut out = row.clone();
1379 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
1380 for spec in &self.nodes {
1381 match spec {
1382 CreateNodeSpec::New {
1383 var,
1384 labels,
1385 properties,
1386 } => {
1387 let mut node = Node::new();
1388 for label in labels {
1389 node.labels.push(label.clone());
1390 }
1391 for (k, expr) in properties {
1399 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1400 let prop = value_to_property(value)?;
1401 if matches!(prop, Property::Null) {
1402 continue;
1403 }
1404 node.properties.insert(k.clone(), prop);
1405 }
1406 ctx.writer.put_node(&node)?;
1407 node_ids.push(node.id);
1408 if let Some(v) = var {
1409 out.insert(v.clone(), Value::Node(node));
1410 }
1411 }
1412 CreateNodeSpec::Reference(name) => {
1413 let id = match out.get(name) {
1414 Some(Value::Node(n)) => n.id,
1415 _ => return Err(Error::UnboundVariable(name.clone())),
1416 };
1417 node_ids.push(id);
1418 }
1419 }
1420 }
1421 for spec in &self.edges {
1422 let src = node_ids[spec.src_idx];
1423 let dst = node_ids[spec.dst_idx];
1424 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
1425 for (k, expr) in &spec.properties {
1426 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
1427 let prop = value_to_property(value)?;
1428 if matches!(prop, Property::Null) {
1429 continue;
1430 }
1431 edge.properties.insert(k.clone(), prop);
1432 }
1433 ctx.writer.put_edge(&edge)?;
1434 if let Some(v) = &spec.var {
1435 out.insert(v.clone(), Value::Edge(edge));
1436 }
1437 }
1438 Ok(out)
1439 }
1440}
1441
1442impl Operator for CreatePathOp {
1443 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1444 if self.input.is_some() {
1445 if let Some(buffered) = self.buffered.as_mut() {
1458 if self.cursor < buffered.len() {
1459 let row = buffered[self.cursor].clone();
1460 self.cursor += 1;
1461 return Ok(Some(row));
1462 }
1463 return Ok(None);
1464 }
1465 let input_rows: Vec<Row> = {
1466 let input = self.input.as_mut().unwrap();
1467 let mut acc = Vec::new();
1468 while let Some(row) = input.next(ctx)? {
1469 acc.push(row);
1470 }
1471 acc
1472 };
1473 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1474 for row in input_rows {
1475 applied.push(self.apply(ctx, &row)?);
1476 }
1477 self.buffered = Some(applied);
1478 self.cursor = 0;
1479 self.next(ctx)
1482 } else {
1483 if self.done {
1484 return Ok(None);
1485 }
1486 self.done = true;
1487 let empty = Row::new();
1488 Ok(Some(self.apply(ctx, &empty)?))
1489 }
1490 }
1491}
1492
1493struct CartesianProductOp {
1494 left: Box<dyn Operator>,
1495 right_plan: LogicalPlan,
1496 left_row: Option<Row>,
1497 right_op: Option<Box<dyn Operator>>,
1498}
1499
1500impl CartesianProductOp {
1501 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
1502 Self {
1503 left,
1504 right_plan,
1505 left_row: None,
1506 right_op: None,
1507 }
1508 }
1509}
1510
1511impl Operator for CartesianProductOp {
1512 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1513 loop {
1514 if self.left_row.is_none() {
1515 match self.left.next(ctx)? {
1516 None => return Ok(None),
1517 Some(row) => {
1518 self.left_row = Some(row);
1519 self.right_op = Some(build_op(&self.right_plan));
1520 }
1521 }
1522 }
1523 let right_op = self.right_op.as_mut().expect("right_op set");
1524 let left_ref = self.left_row.as_ref().unwrap();
1529 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1530 stacked.push(left_ref);
1531 stacked.extend_from_slice(ctx.outer_rows);
1532 let inner_ctx = ExecCtx {
1533 store: ctx.store,
1534 writer: ctx.writer,
1535 params: ctx.params,
1536 procedures: ctx.procedures,
1537 outer_rows: &stacked,
1538 tombstones: ctx.tombstones,
1539 };
1540 match right_op.next(&inner_ctx)? {
1541 Some(right_row) => {
1542 let mut combined = left_ref.clone();
1543 for (k, v) in right_row {
1544 combined.insert(k, v);
1545 }
1546 return Ok(Some(combined));
1547 }
1548 None => {
1549 self.left_row = None;
1550 self.right_op = None;
1551 }
1552 }
1553 }
1554 }
1555}
1556
1557struct DeleteOp {
1558 input: Box<dyn Operator>,
1559 detach: bool,
1560 #[allow(dead_code)]
1561 vars: Vec<String>,
1562 exprs: Vec<Expr>,
1563 buffered: Option<Vec<Row>>,
1571 cursor: usize,
1572}
1573
1574impl DeleteOp {
1575 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1576 Self {
1577 input,
1578 detach,
1579 vars,
1580 exprs,
1581 buffered: None,
1582 cursor: 0,
1583 }
1584 }
1585
1586 fn apply_deletes(
1596 &self,
1597 ctx: &ExecCtx,
1598 row: &Row,
1599 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1600 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1601 ) -> Result<()> {
1602 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1603 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1604 for expr in &self.exprs {
1605 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1606 match v {
1607 Value::Node(n) => node_ids.push(n.id),
1608 Value::Edge(e) => edge_ids.push(e.id),
1609 Value::Path { nodes, edges } => {
1610 for e in edges {
1611 edge_ids.push(e.id);
1612 }
1613 for n in nodes {
1614 node_ids.push(n.id);
1615 }
1616 }
1617 Value::Null | Value::Property(Property::Null) => continue,
1618 _ => return Err(Error::TypeMismatch),
1619 }
1620 }
1621 for eid in &edge_ids {
1622 if seen_edges.insert(*eid) {
1623 ctx.writer.delete_edge(*eid)?;
1624 ctx.tombstones.edges.borrow_mut().insert(*eid);
1625 }
1626 }
1627 for nid in &node_ids {
1628 if !seen_nodes.insert(*nid) {
1629 continue;
1630 }
1631 if self.detach {
1632 for (eid, _) in ctx.store.outgoing(*nid)? {
1637 ctx.tombstones.edges.borrow_mut().insert(eid);
1638 }
1639 for (eid, _) in ctx.store.incoming(*nid)? {
1640 ctx.tombstones.edges.borrow_mut().insert(eid);
1641 }
1642 ctx.writer.detach_delete_node(*nid)?;
1643 } else {
1644 let out = ctx.store.outgoing(*nid)?;
1645 let inc = ctx.store.incoming(*nid)?;
1646 let still_attached = out
1647 .iter()
1648 .chain(inc.iter())
1649 .any(|(eid, _)| !seen_edges.contains(eid));
1650 if still_attached {
1651 return Err(Error::CannotDeleteAttachedNode);
1652 }
1653 ctx.writer.detach_delete_node(*nid)?;
1654 }
1655 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1656 }
1657 Ok(())
1658 }
1659}
1660
1661impl Operator for DeleteOp {
1662 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1663 if self.buffered.is_none() {
1671 let mut rows: Vec<Row> = Vec::new();
1672 while let Some(row) = self.input.next(ctx)? {
1673 rows.push(row);
1674 }
1675 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1676 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1677 for row in &rows {
1678 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1679 }
1680 self.buffered = Some(rows);
1681 self.cursor = 0;
1682 }
1683 let rows = self.buffered.as_ref().unwrap();
1684 if self.cursor < rows.len() {
1685 let row = rows[self.cursor].clone();
1686 self.cursor += 1;
1687 return Ok(Some(row));
1688 }
1689 Ok(None)
1690 }
1691}
1692
1693struct SetPropertyOp {
1694 input: Box<dyn Operator>,
1695 assignments: Vec<SetAssignment>,
1696 buffered: Option<Vec<Row>>,
1706 cursor: usize,
1707}
1708
1709impl SetPropertyOp {
1710 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1711 Self {
1712 input,
1713 assignments,
1714 buffered: None,
1715 cursor: 0,
1716 }
1717 }
1718
1719 fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1720 enum Action {
1722 SetKey {
1723 var: String,
1724 key: String,
1725 prop: Property,
1726 },
1727 AddLabels {
1728 var: String,
1729 labels: Vec<String>,
1730 },
1731 Replace {
1732 var: String,
1733 props: Vec<(String, Property)>,
1734 },
1735 Merge {
1736 var: String,
1737 props: Vec<(String, Property)>,
1738 },
1739 }
1740 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1741 for a in &self.assignments {
1742 match a {
1743 SetAssignment::Property { var, key, value } => {
1744 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1745 let prop = value_to_property(evaluated)?;
1746 actions.push(Action::SetKey {
1747 var: var.clone(),
1748 key: key.clone(),
1749 prop,
1750 });
1751 }
1752 SetAssignment::Labels { var, labels } => {
1753 actions.push(Action::AddLabels {
1754 var: var.clone(),
1755 labels: labels.clone(),
1756 });
1757 }
1758 SetAssignment::Replace { var, properties } => {
1759 let props = properties
1764 .iter()
1765 .map(|(k, expr)| {
1766 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1767 Ok((k.clone(), value_to_property(v)?))
1768 })
1769 .collect::<Result<Vec<(String, Property)>>>()?;
1770 actions.push(Action::Replace {
1771 var: var.clone(),
1772 props,
1773 });
1774 }
1775 SetAssignment::Merge { var, properties } => {
1776 let props = properties
1777 .iter()
1778 .map(|(k, expr)| {
1779 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1780 Ok((k.clone(), value_to_property(v)?))
1781 })
1782 .collect::<Result<Vec<(String, Property)>>>()?;
1783 actions.push(Action::Merge {
1784 var: var.clone(),
1785 props,
1786 });
1787 }
1788 SetAssignment::ReplaceFromExpr {
1789 var,
1790 source,
1791 replace,
1792 } => {
1793 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1794 let props = extract_property_map(&v)?;
1795 if *replace {
1796 actions.push(Action::Replace {
1797 var: var.clone(),
1798 props,
1799 });
1800 } else {
1801 actions.push(Action::Merge {
1802 var: var.clone(),
1803 props,
1804 });
1805 }
1806 }
1807 }
1808 }
1809
1810 let mut updated_nodes: HashSet<String> = HashSet::new();
1812 let mut updated_edges: HashSet<String> = HashSet::new();
1813 for action in actions {
1814 match action {
1815 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1816 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1820 Some(Value::Node(n)) => {
1823 if matches!(prop, Property::Null) {
1824 n.properties.remove(&key);
1825 } else {
1826 n.properties.insert(key, prop);
1827 }
1828 updated_nodes.insert(var);
1829 }
1830 Some(Value::Edge(e)) => {
1831 if matches!(prop, Property::Null) {
1832 e.properties.remove(&key);
1833 } else {
1834 e.properties.insert(key, prop);
1835 }
1836 updated_edges.insert(var);
1837 }
1838 _ => return Err(Error::UnboundVariable(var)),
1839 },
1840 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1841 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1842 Some(Value::Node(n)) => {
1843 for label in labels {
1844 if !n.labels.contains(&label) {
1845 n.labels.push(label);
1846 }
1847 }
1848 updated_nodes.insert(var);
1849 }
1850 _ => return Err(Error::UnboundVariable(var)),
1851 },
1852 Action::Replace { var, props } => match row.get_mut(&var) {
1853 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1854 Some(Value::Node(n)) => {
1855 n.properties.clear();
1856 for (k, v) in props {
1857 if !matches!(v, Property::Null) {
1858 n.properties.insert(k, v);
1859 }
1860 }
1861 updated_nodes.insert(var);
1862 }
1863 Some(Value::Edge(e)) => {
1864 e.properties.clear();
1865 for (k, v) in props {
1866 if !matches!(v, Property::Null) {
1867 e.properties.insert(k, v);
1868 }
1869 }
1870 updated_edges.insert(var);
1871 }
1872 _ => return Err(Error::UnboundVariable(var)),
1873 },
1874 Action::Merge { var, props } => match row.get_mut(&var) {
1875 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1876 Some(Value::Node(n)) => {
1877 for (k, v) in props {
1878 if matches!(v, Property::Null) {
1879 n.properties.remove(&k);
1880 } else {
1881 n.properties.insert(k, v);
1882 }
1883 }
1884 updated_nodes.insert(var);
1885 }
1886 Some(Value::Edge(e)) => {
1887 for (k, v) in props {
1888 if matches!(v, Property::Null) {
1889 e.properties.remove(&k);
1890 } else {
1891 e.properties.insert(k, v);
1892 }
1893 }
1894 updated_edges.insert(var);
1895 }
1896 _ => return Err(Error::UnboundVariable(var)),
1897 },
1898 }
1899 }
1900
1901 for var in &updated_nodes {
1903 if let Some(Value::Node(n)) = row.get(var) {
1904 ctx.writer.put_node(n)?;
1905 }
1906 }
1907 for var in &updated_edges {
1908 if let Some(Value::Edge(e)) = row.get(var) {
1909 ctx.writer.put_edge(e)?;
1910 }
1911 }
1912
1913 Ok(row)
1914 }
1915}
1916
1917impl Operator for SetPropertyOp {
1918 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1919 if self.buffered.is_none() {
1923 let mut input_rows: Vec<Row> = Vec::new();
1924 while let Some(row) = self.input.next(ctx)? {
1925 input_rows.push(row);
1926 }
1927 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
1928 for row in input_rows {
1929 applied.push(self.apply_one(ctx, row)?);
1930 }
1931 self.buffered = Some(applied);
1932 self.cursor = 0;
1933 }
1934 let rows = self.buffered.as_ref().unwrap();
1935 if self.cursor < rows.len() {
1936 let row = rows[self.cursor].clone();
1937 self.cursor += 1;
1938 return Ok(Some(row));
1939 }
1940 Ok(None)
1941 }
1942}
1943
1944struct RemoveOp {
1945 input: Box<dyn Operator>,
1946 items: Vec<RemoveSpec>,
1947 buffered: Option<Vec<Row>>,
1953 cursor: usize,
1954}
1955
1956impl RemoveOp {
1957 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1958 Self {
1959 input,
1960 items,
1961 buffered: None,
1962 cursor: 0,
1963 }
1964 }
1965
1966 fn apply_one(&self, ctx: &ExecCtx, mut row: Row) -> Result<Row> {
1967 let mut updated_nodes: HashSet<String> = HashSet::new();
1968 let mut updated_edges: HashSet<String> = HashSet::new();
1969 for item in &self.items {
1970 match item {
1971 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1972 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1975 Some(Value::Node(n)) => {
1976 n.properties.remove(key);
1977 updated_nodes.insert(var.clone());
1978 }
1979 Some(Value::Edge(e)) => {
1980 e.properties.remove(key);
1981 updated_edges.insert(var.clone());
1982 }
1983 _ => return Err(Error::UnboundVariable(var.clone())),
1984 },
1985 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1986 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => continue,
1987 Some(Value::Node(n)) => {
1988 n.labels.retain(|l| !labels.contains(l));
1989 updated_nodes.insert(var.clone());
1990 }
1991 _ => return Err(Error::UnboundVariable(var.clone())),
1992 },
1993 }
1994 }
1995 for var in &updated_nodes {
1996 if let Some(Value::Node(n)) = row.get(var) {
1997 ctx.writer.put_node(n)?;
1998 }
1999 }
2000 for var in &updated_edges {
2001 if let Some(Value::Edge(e)) = row.get(var) {
2002 ctx.writer.put_edge(e)?;
2003 }
2004 }
2005 Ok(row)
2006 }
2007}
2008
2009impl Operator for RemoveOp {
2010 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2011 if self.buffered.is_none() {
2012 let mut input_rows: Vec<Row> = Vec::new();
2013 while let Some(row) = self.input.next(ctx)? {
2014 input_rows.push(row);
2015 }
2016 let mut applied: Vec<Row> = Vec::with_capacity(input_rows.len());
2017 for row in input_rows {
2018 applied.push(self.apply_one(ctx, row)?);
2019 }
2020 self.buffered = Some(applied);
2021 self.cursor = 0;
2022 }
2023 let rows = self.buffered.as_ref().unwrap();
2024 if self.cursor < rows.len() {
2025 let row = rows[self.cursor].clone();
2026 self.cursor += 1;
2027 return Ok(Some(row));
2028 }
2029 Ok(None)
2030 }
2031}
2032
2033struct LoadCsvOp {
2034 input: Option<Box<dyn Operator>>,
2035 path_expr: Expr,
2036 var: String,
2037 with_headers: bool,
2038 rows: Option<Vec<Value>>,
2039 cursor: usize,
2040}
2041
2042impl LoadCsvOp {
2043 fn new(
2044 input: Option<Box<dyn Operator>>,
2045 path_expr: Expr,
2046 var: String,
2047 with_headers: bool,
2048 ) -> Self {
2049 Self {
2050 input,
2051 path_expr,
2052 var,
2053 with_headers,
2054 rows: None,
2055 cursor: 0,
2056 }
2057 }
2058
2059 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
2060 let ectx = ctx.eval_ctx(base_row);
2061 let path_val = eval_expr(&self.path_expr, &ectx)?;
2062 let path = match path_val {
2063 Value::Property(Property::String(s)) => s,
2064 _ => return Err(Error::TypeMismatch),
2065 };
2066 let content = std::fs::read_to_string(&path).map_err(|e| {
2067 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
2068 })?;
2069 let mut lines = content.lines();
2070 let headers: Option<Vec<String>> = if self.with_headers {
2071 lines
2072 .next()
2073 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
2074 } else {
2075 None
2076 };
2077 let mut csv_rows = Vec::new();
2078 for line in lines {
2079 if line.trim().is_empty() {
2080 continue;
2081 }
2082 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
2083 if let Some(hdrs) = &headers {
2084 let mut map = std::collections::HashMap::new();
2085 for (i, h) in hdrs.iter().enumerate() {
2086 let val = fields.get(i).cloned().unwrap_or_default();
2087 map.insert(h.clone(), Property::String(val));
2088 }
2089 csv_rows.push(Value::Property(Property::Map(map)));
2090 } else {
2091 let list: Vec<Value> = fields
2092 .into_iter()
2093 .map(|f| Value::Property(Property::String(f)))
2094 .collect();
2095 csv_rows.push(Value::List(list));
2096 }
2097 }
2098 self.rows = Some(csv_rows);
2099 self.cursor = 0;
2100 Ok(())
2101 }
2102}
2103
2104impl Operator for LoadCsvOp {
2105 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2106 if self.rows.is_none() {
2107 let base = if let Some(input) = &mut self.input {
2108 match input.next(ctx)? {
2109 Some(r) => r,
2110 None => return Ok(None),
2111 }
2112 } else {
2113 Row::new()
2114 };
2115 self.load(ctx, &base)?;
2116 }
2117 let rows = self.rows.as_ref().unwrap();
2118 if self.cursor < rows.len() {
2119 let val = rows[self.cursor].clone();
2120 self.cursor += 1;
2121 let mut row = Row::new();
2122 row.insert(self.var.clone(), val);
2123 Ok(Some(row))
2124 } else {
2125 Ok(None)
2126 }
2127 }
2128}
2129
2130struct ForeachOp {
2131 input: Box<dyn Operator>,
2132 var: String,
2133 list_expr: Expr,
2134 set_assignments: Vec<SetAssignment>,
2135 remove_items: Vec<RemoveSpec>,
2136}
2137
2138impl ForeachOp {
2139 fn new(
2140 input: Box<dyn Operator>,
2141 var: String,
2142 list_expr: Expr,
2143 set_assignments: Vec<SetAssignment>,
2144 remove_items: Vec<RemoveSpec>,
2145 ) -> Self {
2146 Self {
2147 input,
2148 var,
2149 list_expr,
2150 set_assignments,
2151 remove_items,
2152 }
2153 }
2154}
2155
2156impl Operator for ForeachOp {
2157 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2158 let Some(row) = self.input.next(ctx)? else {
2159 return Ok(None);
2160 };
2161 let ectx = ctx.eval_ctx(&row);
2162 let list_val = eval_expr(&self.list_expr, &ectx)?;
2163 let items = match list_val {
2164 Value::List(items) => items,
2165 Value::Property(Property::List(props)) => {
2166 props.into_iter().map(Value::Property).collect()
2167 }
2168 Value::Null | Value::Property(Property::Null) => Vec::new(),
2169 _ => return Err(Error::TypeMismatch),
2170 };
2171 for item in items {
2172 let mut scratch = row.clone();
2173 scratch.insert(self.var.clone(), item);
2174 for a in &self.set_assignments {
2175 match a {
2176 SetAssignment::Property { var, key, value } => {
2177 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
2178 let prop = value_to_property(evaluated)?;
2179 match scratch.get_mut(var) {
2180 Some(Value::Node(n)) => {
2181 n.properties.insert(key.clone(), prop);
2182 }
2183 Some(Value::Edge(e)) => {
2184 e.properties.insert(key.clone(), prop);
2185 }
2186 _ => return Err(Error::UnboundVariable(var.clone())),
2187 }
2188 }
2189 SetAssignment::Labels { var, labels } => {
2190 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2191 for l in labels {
2192 if !n.labels.contains(l) {
2193 n.labels.push(l.clone());
2194 }
2195 }
2196 }
2197 }
2198 _ => {}
2199 }
2200 }
2201 for ri in &self.remove_items {
2202 match ri {
2203 RemoveSpec::Property { var, key } => {
2204 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2205 n.properties.remove(key);
2206 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
2207 e.properties.remove(key);
2208 }
2209 }
2210 RemoveSpec::Labels { var, labels } => {
2211 if let Some(Value::Node(n)) = scratch.get_mut(var) {
2212 n.labels.retain(|l| !labels.contains(l));
2213 }
2214 }
2215 }
2216 }
2217 for (_, val) in scratch.iter() {
2219 match val {
2220 Value::Node(n) => ctx.writer.put_node(n)?,
2221 Value::Edge(e) => ctx.writer.put_edge(e)?,
2222 _ => {}
2223 }
2224 }
2225 }
2226 Ok(Some(row))
2227 }
2228}
2229
2230struct SeedRowOp {
2231 done: bool,
2232}
2233
2234impl Operator for SeedRowOp {
2235 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2236 if self.done {
2237 return Ok(None);
2238 }
2239 self.done = true;
2240 Ok(Some(Row::new()))
2241 }
2242}
2243
2244struct SeededRowOp {
2245 row: Option<Row>,
2246}
2247
2248impl Operator for SeededRowOp {
2249 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
2250 Ok(self.row.take())
2251 }
2252}
2253
2254struct CallSubqueryOp {
2255 input: Box<dyn Operator>,
2256 body_plan: LogicalPlan,
2257 pending: Vec<Row>,
2258 pending_idx: usize,
2259}
2260
2261impl CallSubqueryOp {
2262 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
2263 Self {
2264 input,
2265 body_plan,
2266 pending: Vec::new(),
2267 pending_idx: 0,
2268 }
2269 }
2270}
2271
2272impl Operator for CallSubqueryOp {
2273 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2274 loop {
2275 if self.pending_idx < self.pending.len() {
2276 let row = self.pending[self.pending_idx].clone();
2277 self.pending_idx += 1;
2278 return Ok(Some(row));
2279 }
2280 let outer_row = match self.input.next(ctx)? {
2281 Some(r) => r,
2282 None => return Ok(None),
2283 };
2284 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2285 let mut results = Vec::new();
2286 while let Some(body_row) = body_op.next(ctx)? {
2287 let mut merged = outer_row.clone();
2288 for (k, v) in body_row {
2289 merged.insert(k, v);
2290 }
2291 results.push(merged);
2292 }
2293 if results.is_empty() {
2294 continue;
2295 }
2296 self.pending = results;
2297 self.pending_idx = 0;
2298 }
2299 }
2300}
2301
2302struct OptionalApplyOp {
2309 input: Box<dyn Operator>,
2310 body_plan: LogicalPlan,
2311 null_vars: Vec<String>,
2312 pending: Vec<Row>,
2313 pending_idx: usize,
2314}
2315
2316impl OptionalApplyOp {
2317 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
2318 Self {
2319 input,
2320 body_plan,
2321 null_vars,
2322 pending: Vec::new(),
2323 pending_idx: 0,
2324 }
2325 }
2326}
2327
2328impl Operator for OptionalApplyOp {
2329 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2330 loop {
2331 if self.pending_idx < self.pending.len() {
2332 let row = self.pending[self.pending_idx].clone();
2333 self.pending_idx += 1;
2334 return Ok(Some(row));
2335 }
2336 let outer_row = match self.input.next(ctx)? {
2337 Some(r) => r,
2338 None => return Ok(None),
2339 };
2340 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row), &mut None);
2341 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
2345 stacked.push(&outer_row);
2346 stacked.extend_from_slice(ctx.outer_rows);
2347 let inner_ctx = ExecCtx {
2348 store: ctx.store,
2349 writer: ctx.writer,
2350 params: ctx.params,
2351 procedures: ctx.procedures,
2352 outer_rows: &stacked,
2353 tombstones: ctx.tombstones,
2354 };
2355 let mut results = Vec::new();
2356 while let Some(body_row) = body_op.next(&inner_ctx)? {
2357 let mut merged = outer_row.clone();
2358 for (k, v) in body_row {
2359 merged.insert(k, v);
2360 }
2361 results.push(merged);
2362 }
2363 if results.is_empty() {
2364 let mut fallback = outer_row;
2365 for v in &self.null_vars {
2366 fallback.insert(v.clone(), Value::Null);
2367 }
2368 return Ok(Some(fallback));
2369 }
2370 self.pending = results;
2371 self.pending_idx = 0;
2372 }
2373 }
2374}
2375
2376struct ProcedureCallOp {
2395 input: Option<Box<dyn Operator>>,
2396 qualified_name: Vec<String>,
2397 args: Option<Vec<Expr>>,
2398 yield_spec: Option<YieldSpec>,
2399 standalone: bool,
2400 active: ProcActiveSource,
2405 done: bool,
2408}
2409
2410enum ProcActiveSource {
2412 None,
2413 Buffered {
2414 rows: Vec<Row>,
2415 idx: usize,
2416 },
2417 Streaming {
2418 cursor: Box<dyn crate::procedures::ProcCursor>,
2419 input_row: Row,
2420 projection: Vec<(String, String)>,
2421 },
2422}
2423
2424impl ProcedureCallOp {
2425 fn new(
2426 input: Option<Box<dyn Operator>>,
2427 qualified_name: Vec<String>,
2428 args: Option<Vec<Expr>>,
2429 yield_spec: Option<YieldSpec>,
2430 standalone: bool,
2431 ) -> Self {
2432 Self {
2433 input,
2434 qualified_name,
2435 args,
2436 yield_spec,
2437 standalone,
2438 active: ProcActiveSource::None,
2439 done: false,
2440 }
2441 }
2442
2443 fn resolve_projection(
2449 &self,
2450 proc: &crate::procedures::Procedure,
2451 ) -> Result<Vec<(String, String)>> {
2452 match &self.yield_spec {
2453 None => {
2454 if !self.standalone {
2455 if proc.outputs.is_empty() {
2464 return Ok(Vec::new());
2465 }
2466 return Err(Error::Procedure(format!(
2467 "procedure '{}' has outputs but no YIELD clause",
2468 self.qualified_name.join(".")
2469 )));
2470 }
2471 Ok(proc
2472 .outputs
2473 .iter()
2474 .map(|o| (o.name.clone(), o.name.clone()))
2475 .collect())
2476 }
2477 Some(YieldSpec::Star) => {
2478 if !self.standalone {
2479 return Err(Error::Procedure(
2480 "YIELD * is only allowed on standalone CALL".into(),
2481 ));
2482 }
2483 Ok(proc
2484 .outputs
2485 .iter()
2486 .map(|o| (o.name.clone(), o.name.clone()))
2487 .collect())
2488 }
2489 Some(YieldSpec::Items(items)) => {
2490 let mut projection = Vec::with_capacity(items.len());
2491 let mut seen_aliases: std::collections::HashSet<String> =
2492 std::collections::HashSet::new();
2493 for yi in items {
2494 if !proc.outputs.iter().any(|o| o.name == yi.column) {
2495 return Err(Error::Procedure(format!(
2496 "procedure '{}' has no output column '{}'",
2497 self.qualified_name.join("."),
2498 yi.column
2499 )));
2500 }
2501 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
2502 if !seen_aliases.insert(alias.clone()) {
2503 return Err(Error::Procedure(format!(
2504 "variable '{alias}' already bound by YIELD"
2505 )));
2506 }
2507 projection.push((yi.column.clone(), alias));
2508 }
2509 Ok(projection)
2510 }
2511 }
2512 }
2513
2514 fn evaluate_args(
2521 &self,
2522 ctx: &ExecCtx,
2523 row: &Row,
2524 proc: &crate::procedures::Procedure,
2525 ) -> Result<Vec<Value>> {
2526 match &self.args {
2527 Some(exprs) => {
2528 if exprs.len() != proc.inputs.len() {
2529 return Err(Error::Procedure(format!(
2530 "procedure '{}' expects {} argument(s), got {}",
2531 self.qualified_name.join("."),
2532 proc.inputs.len(),
2533 exprs.len()
2534 )));
2535 }
2536 let eval_ctx = ctx.eval_ctx(row);
2537 let mut values = Vec::with_capacity(exprs.len());
2538 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
2539 let v = eval_expr(expr, &eval_ctx)?;
2540 if !spec.ty.accepts(&v) {
2541 return Err(Error::Procedure(format!(
2542 "argument '{}' has wrong type for procedure '{}'",
2543 spec.name,
2544 self.qualified_name.join(".")
2545 )));
2546 }
2547 values.push(coerce_arg(v, spec.ty));
2548 }
2549 Ok(values)
2550 }
2551 None => {
2552 if !self.standalone {
2554 return Err(Error::Procedure(
2555 "in-query CALL requires explicit argument list".into(),
2556 ));
2557 }
2558 let mut values = Vec::with_capacity(proc.inputs.len());
2559 for spec in &proc.inputs {
2560 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
2561 Error::Procedure(format!(
2562 "missing parameter ${} for procedure '{}'",
2563 spec.name,
2564 self.qualified_name.join(".")
2565 ))
2566 })?;
2567 if !spec.ty.accepts(&v) {
2568 return Err(Error::Procedure(format!(
2569 "parameter '{}' has wrong type",
2570 spec.name
2571 )));
2572 }
2573 values.push(coerce_arg(v, spec.ty));
2574 }
2575 Ok(values)
2576 }
2577 }
2578 }
2579
2580 fn merge_proc_row(
2584 &self,
2585 proc_row: &crate::procedures::ProcRow,
2586 input_row: &Row,
2587 projection: &[(String, String)],
2588 ) -> Row {
2589 let mut merged = if self.standalone {
2590 Row::new()
2591 } else {
2592 input_row.clone()
2593 };
2594 for (src, alias) in projection {
2595 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2596 merged.insert(alias.clone(), v);
2597 }
2598 merged
2599 }
2600
2601 fn invoke_once(
2610 &mut self,
2611 ctx: &ExecCtx,
2612 input_row: Row,
2613 proc: &crate::procedures::Procedure,
2614 projection: Vec<(String, String)>,
2615 ) -> Result<()> {
2616 if proc.outputs.is_empty() {
2620 if self.standalone {
2621 self.active = ProcActiveSource::None;
2622 } else {
2623 self.active = ProcActiveSource::Buffered {
2624 rows: vec![input_row],
2625 idx: 0,
2626 };
2627 }
2628 return Ok(());
2629 }
2630 let args = self.evaluate_args(ctx, &input_row, proc)?;
2631 let is_write = proc.is_write_builtin();
2632 if is_write {
2639 #[cfg(any(
2644 feature = "apoc-create",
2645 feature = "apoc-refactor",
2646 feature = "apoc-cypher"
2647 ))]
2648 {
2649 let rows = proc.resolve_write_rows(ctx.store, ctx.writer, &args, ctx.procedures)?;
2650 let merged: Vec<Row> = rows
2651 .iter()
2652 .map(|pr| self.merge_proc_row(pr, &input_row, &projection))
2653 .collect();
2654 self.active = ProcActiveSource::Buffered {
2655 rows: merged,
2656 idx: 0,
2657 };
2658 return Ok(());
2659 }
2660 #[cfg(not(any(
2661 feature = "apoc-create",
2662 feature = "apoc-refactor",
2663 feature = "apoc-cypher"
2664 )))]
2665 {
2666 let _ = (ctx, &args);
2667 return Err(Error::Procedure(
2668 "write procedure dispatched in a non-write-apoc build".into(),
2669 ));
2670 }
2671 }
2672 match proc.resolve_rows(ctx.store, &args, ctx.procedures)? {
2673 crate::procedures::ProcRows::Eager(rows) => {
2674 let is_static = proc.builtin.is_none();
2679 let merged: Vec<Row> = rows
2680 .iter()
2681 .filter(|pr| !is_static || proc.row_matches(pr, &args))
2682 .map(|pr| self.merge_proc_row(pr, &input_row, &projection))
2683 .collect();
2684 self.active = ProcActiveSource::Buffered {
2685 rows: merged,
2686 idx: 0,
2687 };
2688 }
2689 crate::procedures::ProcRows::Streaming(cursor) => {
2690 self.active = ProcActiveSource::Streaming {
2691 cursor,
2692 input_row,
2693 projection,
2694 };
2695 }
2696 }
2697 Ok(())
2698 }
2699}
2700
2701fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2706 use crate::procedures::ProcType;
2707 if matches!(ty, ProcType::Float) {
2708 if let Value::Property(Property::Int64(n)) = v {
2709 return Value::Property(Property::Float64(n as f64));
2710 }
2711 }
2712 v
2713}
2714
2715impl Operator for ProcedureCallOp {
2716 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2717 loop {
2718 match &mut self.active {
2722 ProcActiveSource::Buffered { rows, idx } => {
2723 if *idx < rows.len() {
2724 let row = rows[*idx].clone();
2725 *idx += 1;
2726 return Ok(Some(row));
2727 }
2728 self.active = ProcActiveSource::None;
2729 }
2730 ProcActiveSource::Streaming {
2731 cursor,
2732 input_row,
2733 projection,
2734 } => match cursor.advance(ctx.store)? {
2735 Some(proc_row) => {
2736 let mut merged = if self.standalone {
2740 Row::new()
2741 } else {
2742 input_row.clone()
2743 };
2744 for (src, alias) in projection.iter() {
2745 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2746 merged.insert(alias.clone(), v);
2747 }
2748 return Ok(Some(merged));
2749 }
2750 None => {
2751 self.active = ProcActiveSource::None;
2752 }
2753 },
2754 ProcActiveSource::None => {}
2755 }
2756
2757 let proc = match ctx.procedures.get(&self.qualified_name) {
2763 Some(p) => p,
2764 None => {
2765 return Err(Error::Procedure(format!(
2766 "procedure '{}' not found",
2767 self.qualified_name.join(".")
2768 )));
2769 }
2770 };
2771 let projection = self.resolve_projection(proc)?;
2772 let input_row = match &mut self.input {
2773 Some(inp) => match inp.next(ctx)? {
2774 Some(r) => r,
2775 None => return Ok(None),
2776 },
2777 None => {
2778 if self.done {
2779 return Ok(None);
2780 }
2781 self.done = true;
2782 Row::new()
2783 }
2784 };
2785 self.invoke_once(ctx, input_row, proc, projection)?;
2786 }
2791 }
2792}
2793
2794fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2800 match v {
2801 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2802 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2803 Value::Map(pairs) => pairs
2804 .iter()
2805 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2806 .collect(),
2807 Value::Property(Property::Map(entries)) => Ok(entries
2808 .iter()
2809 .map(|(k, p)| (k.clone(), p.clone()))
2810 .collect()),
2811 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2812 _ => Err(Error::InvalidSetValue),
2813 }
2814}
2815
2816fn value_to_property(v: Value) -> Result<Property> {
2817 match v {
2818 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2819 Value::Property(p) => Ok(p),
2820 Value::Null => Ok(Property::Null),
2821 Value::List(items) => {
2822 let props: Vec<Property> = items
2823 .into_iter()
2824 .map(value_to_property)
2825 .collect::<Result<_>>()?;
2826 Ok(Property::List(props))
2827 }
2828 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2832 Err(Error::InvalidSetValue)
2833 }
2834 }
2835}
2836
2837struct NodeScanAllOp {
2838 var: String,
2839 ids: Option<Vec<NodeId>>,
2840 cursor: usize,
2841}
2842
2843impl NodeScanAllOp {
2844 fn new(var: String) -> Self {
2845 Self {
2846 var,
2847 ids: None,
2848 cursor: 0,
2849 }
2850 }
2851}
2852
2853impl Operator for NodeScanAllOp {
2854 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2855 if self.ids.is_none() {
2856 self.ids = Some(ctx.store.all_node_ids()?);
2857 }
2858 let ids = self.ids.as_ref().unwrap();
2859 while self.cursor < ids.len() {
2860 let id = ids[self.cursor];
2861 self.cursor += 1;
2862 if let Some(node) = ctx.store.get_node(id)? {
2863 let mut row = Row::new();
2864 row.insert(self.var.clone(), Value::Node(node));
2865 return Ok(Some(row));
2866 }
2867 }
2868 Ok(None)
2869 }
2870}
2871
2872struct NodeScanByLabelsOp {
2873 var: String,
2874 labels: Vec<String>,
2875 ids: Option<Vec<NodeId>>,
2876 cursor: usize,
2877}
2878
2879impl NodeScanByLabelsOp {
2880 fn new(var: String, labels: Vec<String>) -> Self {
2881 Self {
2882 var,
2883 labels,
2884 ids: None,
2885 cursor: 0,
2886 }
2887 }
2888}
2889
2890impl Operator for NodeScanByLabelsOp {
2891 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2892 if self.ids.is_none() {
2893 let primary = self
2895 .labels
2896 .first()
2897 .expect("NodeScanByLabels must have at least one label");
2898 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2899 }
2900 let ids = self.ids.as_ref().unwrap();
2901 while self.cursor < ids.len() {
2902 let id = ids[self.cursor];
2903 self.cursor += 1;
2904 if let Some(node) = ctx.store.get_node(id)? {
2905 if has_all_labels(&node, &self.labels) {
2906 let mut row = Row::new();
2907 row.insert(self.var.clone(), Value::Node(node));
2908 return Ok(Some(row));
2909 }
2910 }
2911 }
2912 Ok(None)
2913 }
2914}
2915
2916fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2917 labels.iter().all(|l| node.labels.contains(l))
2918}
2919
2920struct IndexSeekOp {
2932 var: String,
2933 label: String,
2934 properties: Vec<String>,
2935 value_exprs: Vec<Expr>,
2936 results: Option<Vec<NodeId>>,
2937 cursor: usize,
2938}
2939
2940impl IndexSeekOp {
2941 fn new(var: String, label: String, properties: Vec<String>, value_exprs: Vec<Expr>) -> Self {
2942 assert_eq!(
2943 properties.len(),
2944 value_exprs.len(),
2945 "IndexSeekOp: properties and values must have equal length"
2946 );
2947 Self {
2948 var,
2949 label,
2950 properties,
2951 value_exprs,
2952 results: None,
2953 cursor: 0,
2954 }
2955 }
2956}
2957
2958impl Operator for IndexSeekOp {
2959 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2960 if self.results.is_none() {
2961 let empty = Row::new();
2962 let mut values: Vec<Property> = Vec::with_capacity(self.value_exprs.len());
2963 for expr in &self.value_exprs {
2964 let value = eval_expr(expr, &ctx.eval_ctx(&empty))?;
2965 let property = match value {
2966 Value::Property(p) => p,
2967 Value::Null => Property::Null,
2968 Value::Node(_)
2969 | Value::Edge(_)
2970 | Value::List(_)
2971 | Value::Map(_)
2972 | Value::Path { .. } => {
2973 return Err(Error::InvalidSetValue);
2974 }
2975 };
2976 values.push(property);
2977 }
2978 let ids = ctx
2979 .store
2980 .nodes_by_properties(&self.label, &self.properties, &values)?;
2981 self.results = Some(ids);
2982 }
2983 let ids = self.results.as_ref().unwrap();
2984 while self.cursor < ids.len() {
2985 let id = ids[self.cursor];
2986 self.cursor += 1;
2987 if let Some(node) = ctx.store.get_node(id)? {
2988 let mut row = Row::new();
2989 row.insert(self.var.clone(), Value::Node(node));
2990 return Ok(Some(row));
2991 }
2992 }
2993 Ok(None)
2994 }
2995}
2996
2997struct PointIndexSeekOp {
3016 var: String,
3017 label: String,
3018 property: String,
3019 bounds: PointSeekBounds,
3020 results: Option<Vec<NodeId>>,
3021 cursor: usize,
3022}
3023
3024impl PointIndexSeekOp {
3025 fn new(var: String, label: String, property: String, bounds: PointSeekBounds) -> Self {
3026 Self {
3027 var,
3028 label,
3029 property,
3030 bounds,
3031 results: None,
3032 cursor: 0,
3033 }
3034 }
3035}
3036
3037impl Operator for PointIndexSeekOp {
3038 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3039 if self.results.is_none() {
3040 let empty = Row::new();
3041 let ectx = ctx.eval_ctx(&empty);
3042 let ids = match &self.bounds {
3043 PointSeekBounds::Corners { lo, hi } => {
3044 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3045 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3046 match (lo_pt, hi_pt) {
3047 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.nodes_in_bbox(
3048 &self.label,
3049 &self.property,
3050 lo.srid,
3051 lo.x,
3052 lo.y,
3053 hi.x,
3054 hi.y,
3055 )?,
3056 _ => Vec::new(),
3057 }
3058 }
3059 PointSeekBounds::Radius { center, radius } => {
3060 let center_pt = extract_point(&eval_expr(center, &ectx)?);
3061 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3062 match (center_pt, radius_val) {
3063 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3064 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3065 ctx.store.nodes_in_bbox(
3066 &self.label,
3067 &self.property,
3068 c.srid,
3069 xlo,
3070 ylo,
3071 xhi,
3072 yhi,
3073 )?
3074 }
3075 _ => Vec::new(),
3077 }
3078 }
3079 };
3080 self.results = Some(ids);
3081 }
3082 let ids = self.results.as_ref().unwrap();
3083 while self.cursor < ids.len() {
3084 let id = ids[self.cursor];
3085 self.cursor += 1;
3086 if let Some(node) = ctx.store.get_node(id)? {
3087 let mut row = Row::new();
3088 row.insert(self.var.clone(), Value::Node(node));
3089 return Ok(Some(row));
3090 }
3091 }
3092 Ok(None)
3093 }
3094}
3095
3096fn extract_point(v: &Value) -> Option<meshdb_core::Point> {
3097 match v {
3098 Value::Property(Property::Point(p)) => Some(*p),
3099 _ => None,
3100 }
3101}
3102
3103fn extract_f64(v: &Value) -> Option<f64> {
3104 match v {
3105 Value::Property(Property::Float64(f)) => Some(*f),
3106 Value::Property(Property::Int64(i)) => Some(*i as f64),
3107 _ => None,
3108 }
3109}
3110
3111fn enclosing_bbox(center: &meshdb_core::Point, r: f64) -> (f64, f64, f64, f64) {
3121 if center.is_geographic() {
3122 const METRES_PER_DEG: f64 = 111_320.0;
3125 let dlat = r / METRES_PER_DEG;
3126 let cos_lat = center.y.to_radians().cos().abs();
3127 let cos_lat_floor = cos_lat.max(1.0e-6);
3131 let dlon = r / (METRES_PER_DEG * cos_lat_floor);
3132 (
3133 center.x - dlon,
3134 center.y - dlat,
3135 center.x + dlon,
3136 center.y + dlat,
3137 )
3138 } else {
3139 (center.x - r, center.y - r, center.x + r, center.y + r)
3140 }
3141}
3142
3143struct EdgeSeekOp {
3152 edge_var: String,
3153 src_var: String,
3154 dst_var: String,
3155 edge_type: String,
3156 property: String,
3157 value_expr: Expr,
3158 direction: Direction,
3159 residual_properties: Vec<(String, Expr)>,
3160 results: Option<Vec<Row>>,
3163 cursor: usize,
3164}
3165
3166impl EdgeSeekOp {
3167 #[allow(clippy::too_many_arguments)]
3168 fn new(
3169 edge_var: String,
3170 src_var: String,
3171 dst_var: String,
3172 edge_type: String,
3173 property: String,
3174 value_expr: Expr,
3175 direction: Direction,
3176 residual_properties: Vec<(String, Expr)>,
3177 ) -> Self {
3178 Self {
3179 edge_var,
3180 src_var,
3181 dst_var,
3182 edge_type,
3183 property,
3184 value_expr,
3185 direction,
3186 residual_properties,
3187 results: None,
3188 cursor: 0,
3189 }
3190 }
3191}
3192
3193impl Operator for EdgeSeekOp {
3194 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3195 if self.results.is_none() {
3196 let empty = Row::new();
3197 let seek_value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
3198 let property = match seek_value {
3199 Value::Property(p) => p,
3200 Value::Null => Property::Null,
3201 Value::Node(_)
3202 | Value::Edge(_)
3203 | Value::List(_)
3204 | Value::Map(_)
3205 | Value::Path { .. } => {
3206 return Err(Error::InvalidSetValue);
3207 }
3208 };
3209 let ids = ctx
3210 .store
3211 .edges_by_property(&self.edge_type, &self.property, &property)?;
3212 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3213 for id in ids {
3214 let Some(edge) = ctx.store.get_edge(id)? else {
3215 continue;
3216 };
3217 let mut residuals_ok = true;
3223 for (key, expr) in &self.residual_properties {
3224 let wanted = eval_expr(expr, &ctx.eval_ctx(&empty))?;
3225 let Some(stored) = edge.properties.get(key) else {
3226 residuals_ok = false;
3227 break;
3228 };
3229 if !values_equal(&Value::Property(stored.clone()), &wanted) {
3230 residuals_ok = false;
3231 break;
3232 }
3233 }
3234 if !residuals_ok {
3235 continue;
3236 }
3237 let Some(src_node) = ctx.store.get_node(edge.source)? else {
3242 continue;
3243 };
3244 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3245 continue;
3246 };
3247 match self.direction {
3252 Direction::Outgoing => {
3253 rows.push(self.make_row(&edge, &src_node, &dst_node));
3254 }
3255 Direction::Incoming => {
3256 rows.push(self.make_row(&edge, &dst_node, &src_node));
3257 }
3258 Direction::Both => {
3259 rows.push(self.make_row(&edge, &src_node, &dst_node));
3260 if edge.source != edge.target {
3263 rows.push(self.make_row(&edge, &dst_node, &src_node));
3264 }
3265 }
3266 }
3267 }
3268 self.results = Some(rows);
3269 }
3270 let rows = self.results.as_ref().unwrap();
3271 if self.cursor < rows.len() {
3272 let row = rows[self.cursor].clone();
3273 self.cursor += 1;
3274 return Ok(Some(row));
3275 }
3276 Ok(None)
3277 }
3278}
3279
3280impl EdgeSeekOp {
3281 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3282 let mut row = Row::new();
3283 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3284 row.insert(self.src_var.clone(), Value::Node(src.clone()));
3285 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3286 row
3287 }
3288}
3289
3290struct EdgePointIndexSeekOp {
3297 edge_var: String,
3298 src_var: String,
3299 dst_var: String,
3300 edge_type: String,
3301 property: String,
3302 direction: Direction,
3303 bounds: PointSeekBounds,
3304 results: Option<Vec<Row>>,
3305 cursor: usize,
3306}
3307
3308impl EdgePointIndexSeekOp {
3309 #[allow(clippy::too_many_arguments)]
3310 fn new(
3311 edge_var: String,
3312 src_var: String,
3313 dst_var: String,
3314 edge_type: String,
3315 property: String,
3316 direction: Direction,
3317 bounds: PointSeekBounds,
3318 ) -> Self {
3319 Self {
3320 edge_var,
3321 src_var,
3322 dst_var,
3323 edge_type,
3324 property,
3325 direction,
3326 bounds,
3327 results: None,
3328 cursor: 0,
3329 }
3330 }
3331
3332 fn make_row(&self, edge: &Edge, src: &Node, dst: &Node) -> Row {
3333 let mut row = Row::new();
3334 row.insert(self.edge_var.clone(), Value::Edge(edge.clone()));
3335 row.insert(self.src_var.clone(), Value::Node(src.clone()));
3336 row.insert(self.dst_var.clone(), Value::Node(dst.clone()));
3337 row
3338 }
3339}
3340
3341impl Operator for EdgePointIndexSeekOp {
3342 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3343 if self.results.is_none() {
3344 let empty = Row::new();
3345 let ectx = ctx.eval_ctx(&empty);
3346 let ids = match &self.bounds {
3347 PointSeekBounds::Corners { lo, hi } => {
3348 let lo_pt = extract_point(&eval_expr(lo, &ectx)?);
3349 let hi_pt = extract_point(&eval_expr(hi, &ectx)?);
3350 match (lo_pt, hi_pt) {
3351 (Some(lo), Some(hi)) if lo.srid == hi.srid => ctx.store.edges_in_bbox(
3352 &self.edge_type,
3353 &self.property,
3354 lo.srid,
3355 lo.x,
3356 lo.y,
3357 hi.x,
3358 hi.y,
3359 )?,
3360 _ => Vec::new(),
3361 }
3362 }
3363 PointSeekBounds::Radius { center, radius } => {
3364 let center_pt = extract_point(&eval_expr(center, &ectx)?);
3365 let radius_val = extract_f64(&eval_expr(radius, &ectx)?);
3366 match (center_pt, radius_val) {
3367 (Some(c), Some(r)) if r.is_finite() && r >= 0.0 => {
3368 let (xlo, ylo, xhi, yhi) = enclosing_bbox(&c, r);
3369 ctx.store.edges_in_bbox(
3370 &self.edge_type,
3371 &self.property,
3372 c.srid,
3373 xlo,
3374 ylo,
3375 xhi,
3376 yhi,
3377 )?
3378 }
3379 _ => Vec::new(),
3380 }
3381 }
3382 };
3383
3384 let mut rows: Vec<Row> = Vec::with_capacity(ids.len());
3385 for id in ids {
3386 let Some(edge) = ctx.store.get_edge(id)? else {
3387 continue;
3388 };
3389 let Some(src_node) = ctx.store.get_node(edge.source)? else {
3390 continue;
3391 };
3392 let Some(dst_node) = ctx.store.get_node(edge.target)? else {
3393 continue;
3394 };
3395 match self.direction {
3396 Direction::Outgoing => rows.push(self.make_row(&edge, &src_node, &dst_node)),
3397 Direction::Incoming => rows.push(self.make_row(&edge, &dst_node, &src_node)),
3398 Direction::Both => {
3399 rows.push(self.make_row(&edge, &src_node, &dst_node));
3400 if edge.source != edge.target {
3401 rows.push(self.make_row(&edge, &dst_node, &src_node));
3402 }
3403 }
3404 }
3405 }
3406 self.results = Some(rows);
3407 }
3408 let rows = self.results.as_ref().unwrap();
3409 if self.cursor < rows.len() {
3410 let row = rows[self.cursor].clone();
3411 self.cursor += 1;
3412 return Ok(Some(row));
3413 }
3414 Ok(None)
3415 }
3416}
3417
3418fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
3419 props.iter().all(|(k, v)| {
3420 node.properties
3421 .get(k)
3422 .map(|stored| stored == v)
3423 .unwrap_or(false)
3424 })
3425}
3426
3427struct MergeNodeOp {
3428 var: String,
3429 labels: Vec<String>,
3430 properties: Vec<(String, Expr)>,
3434 on_create: Vec<SetAssignment>,
3439 on_match: Vec<SetAssignment>,
3443 input: Option<Box<dyn Operator>>,
3450 merged_nodes: Vec<Node>,
3457 merge_done: bool,
3461 cursor: usize,
3462 input_buffered: Option<Vec<Row>>,
3471 input_cursor: usize,
3472}
3473
3474impl MergeNodeOp {
3475 fn new(
3476 input: Option<Box<dyn Operator>>,
3477 var: String,
3478 labels: Vec<String>,
3479 properties: Vec<(String, Expr)>,
3480 on_create: Vec<SetAssignment>,
3481 on_match: Vec<SetAssignment>,
3482 ) -> Self {
3483 Self {
3484 var,
3485 labels,
3486 properties,
3487 on_create,
3488 on_match,
3489 input,
3490 merged_nodes: Vec::new(),
3491 merge_done: false,
3492 cursor: 0,
3493 input_buffered: None,
3494 input_cursor: 0,
3495 }
3496 }
3497
3498 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
3510 let resolved_props: Vec<(String, Property)> = self
3511 .properties
3512 .iter()
3513 .map(|(k, expr)| {
3514 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
3515 Ok((k.clone(), value_to_property(v)?))
3516 })
3517 .collect::<Result<Vec<_>>>()?;
3518
3519 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
3520 ctx.store.nodes_by_label(primary)?
3521 } else {
3522 ctx.store.all_node_ids()?
3523 };
3524 let mut merged_nodes: Vec<Node> = Vec::new();
3525 for id in candidate_ids {
3526 if let Some(node) = ctx.store.get_node(id)? {
3527 if has_all_labels(&node, &self.labels)
3528 && matches_pattern_props(&node, &resolved_props)
3529 {
3530 merged_nodes.push(node);
3531 }
3532 }
3533 }
3534
3535 if merged_nodes.is_empty() {
3536 let mut node = Node::new();
3537 for label in &self.labels {
3538 node.labels.push(label.clone());
3539 }
3540 for (k, prop) in resolved_props {
3541 node.properties.insert(k, prop);
3542 }
3543 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
3544 ctx.writer.put_node(&node)?;
3545 merged_nodes.push(node);
3546 } else if !self.on_match.is_empty() {
3547 for node in merged_nodes.iter_mut() {
3548 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
3549 ctx.writer.put_node(node)?;
3550 }
3551 }
3552 Ok(merged_nodes)
3553 }
3554}
3555
3556impl Operator for MergeNodeOp {
3557 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3558 if self.input.is_none() {
3563 if !self.merge_done {
3564 let empty = Row::new();
3565 let nodes = self.run_merge_for(ctx, &empty)?;
3566 self.merged_nodes = nodes;
3567 self.merge_done = true;
3568 }
3569 if self.cursor < self.merged_nodes.len() {
3570 let node = self.merged_nodes[self.cursor].clone();
3571 self.cursor += 1;
3572 let mut row = Row::new();
3573 row.insert(self.var.clone(), Value::Node(node));
3574 return Ok(Some(row));
3575 }
3576 return Ok(None);
3577 }
3578
3579 if self.input_buffered.is_none() {
3587 let mut input_rows: Vec<Row> = Vec::new();
3588 while let Some(row) = self.input.as_mut().unwrap().next(ctx)? {
3589 input_rows.push(row);
3590 }
3591 let mut output: Vec<Row> = Vec::new();
3592 for input_row in input_rows {
3593 let nodes = self.run_merge_for(ctx, &input_row)?;
3594 for node in nodes {
3595 let mut out = input_row.clone();
3596 out.insert(self.var.clone(), Value::Node(node));
3597 output.push(out);
3598 }
3599 }
3600 self.input_buffered = Some(output);
3601 self.input_cursor = 0;
3602 }
3603 let rows = self.input_buffered.as_ref().unwrap();
3604 if self.input_cursor < rows.len() {
3605 let row = rows[self.input_cursor].clone();
3606 self.input_cursor += 1;
3607 return Ok(Some(row));
3608 }
3609 Ok(None)
3610 }
3611}
3612
3613struct MergeEdgeOp {
3632 input: Box<dyn Operator>,
3633 edge_var: String,
3634 src_var: String,
3635 dst_var: String,
3636 edge_type: String,
3637 undirected: bool,
3638 properties: Vec<(String, Expr)>,
3642 on_create: Vec<SetAssignment>,
3643 on_match: Vec<SetAssignment>,
3644 pending: std::collections::VecDeque<Row>,
3651 drained: bool,
3655}
3656
3657impl MergeEdgeOp {
3658 #[allow(clippy::too_many_arguments)]
3659 fn new(
3660 input: Box<dyn Operator>,
3661 edge_var: String,
3662 src_var: String,
3663 dst_var: String,
3664 edge_type: String,
3665 undirected: bool,
3666 properties: Vec<(String, Expr)>,
3667 on_create: Vec<SetAssignment>,
3668 on_match: Vec<SetAssignment>,
3669 ) -> Self {
3670 Self {
3671 input,
3672 edge_var,
3673 src_var,
3674 dst_var,
3675 edge_type,
3676 undirected,
3677 properties,
3678 on_create,
3679 on_match,
3680 pending: std::collections::VecDeque::new(),
3681 drained: false,
3682 }
3683 }
3684}
3685
3686impl MergeEdgeOp {
3687 fn merge_for(&self, ctx: &ExecCtx, row: Row, out: &mut Vec<Row>) -> Result<()> {
3692 let src_node = match row.get(&self.src_var) {
3697 Some(Value::Node(n)) => n.clone(),
3698 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3699 };
3700 let dst_node = match row.get(&self.dst_var) {
3701 Some(Value::Node(n)) => n.clone(),
3702 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
3703 };
3704
3705 let required_props: Vec<(String, Property)> = self
3709 .properties
3710 .iter()
3711 .map(|(k, expr)| {
3712 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
3713 Ok((k.clone(), value_to_property(v)?))
3714 })
3715 .collect::<Result<Vec<_>>>()?;
3716 let edge_matches = |edge: &Edge| -> bool {
3717 required_props.iter().all(|(k, want)| {
3718 edge.properties
3719 .get(k)
3720 .map(|have| have == want)
3721 .unwrap_or(false)
3722 })
3723 };
3724
3725 let mut matched: Vec<Edge> = Vec::new();
3729 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
3730 if neighbor_id != dst_node.id {
3731 continue;
3732 }
3733 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3734 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3735 matched.push(edge);
3736 }
3737 }
3738 }
3739 if self.undirected {
3740 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
3741 if neighbor_id != dst_node.id {
3742 continue;
3743 }
3744 if let Some(edge) = ctx.store.get_edge(edge_id)? {
3745 if edge.edge_type == self.edge_type && edge_matches(&edge) {
3746 matched.push(edge);
3747 }
3748 }
3749 }
3750 }
3751
3752 if matched.is_empty() {
3753 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
3754 for (k, p) in &required_props {
3755 new_edge.properties.insert(k.clone(), p.clone());
3756 }
3757 let mut row_out = row.clone();
3758 apply_merge_edge_actions(
3759 &mut new_edge,
3760 &self.on_create,
3761 &self.edge_var,
3762 ctx,
3763 &mut row_out,
3764 )?;
3765 ctx.writer.put_edge(&new_edge)?;
3766 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
3767 out.push(row_out);
3768 } else {
3769 for mut existing in matched {
3770 let mut row_out = row.clone();
3771 if !self.on_match.is_empty() {
3772 apply_merge_edge_actions(
3773 &mut existing,
3774 &self.on_match,
3775 &self.edge_var,
3776 ctx,
3777 &mut row_out,
3778 )?;
3779 ctx.writer.put_edge(&existing)?;
3780 }
3781 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
3782 out.push(row_out);
3783 }
3784 }
3785 Ok(())
3786 }
3787}
3788
3789impl Operator for MergeEdgeOp {
3790 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3791 if self.pending.is_empty() && !self.drained {
3796 let mut input_rows: Vec<Row> = Vec::new();
3797 while let Some(row) = self.input.next(ctx)? {
3798 input_rows.push(row);
3799 }
3800 let mut out: Vec<Row> = Vec::new();
3801 for row in input_rows {
3802 self.merge_for(ctx, row, &mut out)?;
3803 }
3804 self.pending.extend(out);
3805 self.drained = true;
3806 }
3807 Ok(self.pending.pop_front())
3808 }
3809}
3810
3811fn apply_merge_edge_actions(
3821 edge: &mut Edge,
3822 actions: &[SetAssignment],
3823 var: &str,
3824 exec_ctx: &ExecCtx,
3825 outer: &mut Row,
3826) -> Result<()> {
3827 if actions.is_empty() {
3828 return Ok(());
3829 }
3830 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3833 for action in actions {
3834 match action {
3835 SetAssignment::Property {
3836 var: target,
3837 key,
3838 value,
3839 } => {
3840 let sub_ctx = exec_ctx.eval_ctx(outer);
3841 let evaluated = eval_expr(value, &sub_ctx)?;
3842 let prop = value_to_property(evaluated)?;
3843 if target == var {
3844 if matches!(prop, Property::Null) {
3845 edge.properties.remove(key);
3846 } else {
3847 edge.properties.insert(key.clone(), prop);
3848 }
3849 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3850 } else {
3851 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
3852 }
3853 }
3854 SetAssignment::Merge {
3855 var: target,
3856 properties,
3857 } => {
3858 let sub_ctx = exec_ctx.eval_ctx(outer);
3859 let resolved: Vec<(String, Property)> = properties
3860 .iter()
3861 .map(|(k, expr)| {
3862 let v = eval_expr(expr, &sub_ctx)?;
3863 Ok((k.clone(), value_to_property(v)?))
3864 })
3865 .collect::<Result<Vec<_>>>()?;
3866 if target == var {
3867 for (k, p) in resolved {
3868 edge.properties.insert(k, p);
3869 }
3870 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3871 } else {
3872 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
3873 }
3874 }
3875 SetAssignment::Replace {
3876 var: target,
3877 properties,
3878 } => {
3879 let sub_ctx = exec_ctx.eval_ctx(outer);
3880 let resolved: Vec<(String, Property)> = properties
3881 .iter()
3882 .map(|(k, expr)| {
3883 let v = eval_expr(expr, &sub_ctx)?;
3884 Ok((k.clone(), value_to_property(v)?))
3885 })
3886 .collect::<Result<Vec<_>>>()?;
3887 if target == var {
3888 edge.properties.clear();
3889 for (k, p) in resolved {
3890 edge.properties.insert(k, p);
3891 }
3892 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3893 } else {
3894 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
3895 }
3896 }
3897 SetAssignment::Labels {
3898 var: target,
3899 labels,
3900 } => {
3901 if target == var {
3902 return Err(Error::UnboundVariable(target.clone()));
3904 }
3905 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
3906 }
3907 SetAssignment::ReplaceFromExpr {
3908 var: target,
3909 source,
3910 replace,
3911 } => {
3912 let sub_ctx = exec_ctx.eval_ctx(outer);
3913 let v = eval_expr(source, &sub_ctx)?;
3914 let props = extract_property_map(&v)?;
3915 if target == var {
3916 if *replace {
3917 edge.properties.clear();
3918 }
3919 for (k, p) in props {
3920 edge.properties.insert(k, p);
3921 }
3922 outer.insert(var.to_string(), Value::Edge(edge.clone()));
3923 } else {
3924 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
3925 }
3926 }
3927 }
3928 }
3929 Ok(())
3930}
3931
3932fn apply_set_prop_to_outer(
3937 outer: &mut Row,
3938 exec_ctx: &ExecCtx,
3939 target: &str,
3940 key: &str,
3941 prop: Property,
3942) -> Result<()> {
3943 match outer.get_mut(target) {
3944 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
3945 return Ok(());
3948 }
3949 Some(Value::Node(n)) => {
3950 if matches!(prop, Property::Null) {
3951 n.properties.remove(key);
3952 } else {
3953 n.properties.insert(key.to_string(), prop);
3954 }
3955 exec_ctx.writer.put_node(n)?;
3956 }
3957 Some(Value::Edge(e)) => {
3958 if matches!(prop, Property::Null) {
3959 e.properties.remove(key);
3960 } else {
3961 e.properties.insert(key.to_string(), prop);
3962 }
3963 exec_ctx.writer.put_edge(e)?;
3964 }
3965 _ => return Err(Error::UnboundVariable(target.to_string())),
3966 }
3967 Ok(())
3968}
3969
3970fn apply_set_map_to_outer(
3974 outer: &mut Row,
3975 exec_ctx: &ExecCtx,
3976 target: &str,
3977 props: Vec<(String, Property)>,
3978 replace: bool,
3979) -> Result<()> {
3980 match outer.get_mut(target) {
3981 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
3982 Some(Value::Node(n)) => {
3983 if replace {
3984 n.properties.clear();
3985 }
3986 for (k, p) in props {
3987 if replace || !matches!(p, Property::Null) {
3988 n.properties.insert(k, p);
3989 } else {
3990 n.properties.remove(&k);
3991 }
3992 }
3993 exec_ctx.writer.put_node(n)?;
3994 Ok(())
3995 }
3996 Some(Value::Edge(e)) => {
3997 if replace {
3998 e.properties.clear();
3999 }
4000 for (k, p) in props {
4001 if replace || !matches!(p, Property::Null) {
4002 e.properties.insert(k, p);
4003 } else {
4004 e.properties.remove(&k);
4005 }
4006 }
4007 exec_ctx.writer.put_edge(e)?;
4008 Ok(())
4009 }
4010 _ => Err(Error::UnboundVariable(target.to_string())),
4011 }
4012}
4013
4014fn apply_set_labels_to_outer(
4016 outer: &mut Row,
4017 exec_ctx: &ExecCtx,
4018 target: &str,
4019 labels: &[String],
4020) -> Result<()> {
4021 match outer.get_mut(target) {
4022 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
4023 Some(Value::Node(n)) => {
4024 for label in labels {
4025 if !n.labels.contains(label) {
4026 n.labels.push(label.clone());
4027 }
4028 }
4029 exec_ctx.writer.put_node(n)?;
4030 Ok(())
4031 }
4032 _ => Err(Error::UnboundVariable(target.to_string())),
4033 }
4034}
4035
4036fn apply_merge_actions(
4045 node: &mut Node,
4046 actions: &[SetAssignment],
4047 var: &str,
4048 exec_ctx: &ExecCtx,
4049 base_row: &Row,
4050) -> Result<()> {
4051 if actions.is_empty() {
4052 return Ok(());
4053 }
4054 let mut row = base_row.clone();
4057 row.insert(var.to_string(), Value::Node(node.clone()));
4058 for action in actions {
4059 let sub_ctx = exec_ctx.eval_ctx(&row);
4060 match action {
4061 SetAssignment::Property {
4062 var: target,
4063 key,
4064 value,
4065 } => {
4066 if target != var {
4067 return Err(Error::UnboundVariable(target.clone()));
4068 }
4069 let evaluated = eval_expr(value, &sub_ctx)?;
4070 let prop = value_to_property(evaluated)?;
4071 node.properties.insert(key.clone(), prop);
4072 row.insert(var.to_string(), Value::Node(node.clone()));
4073 }
4074 SetAssignment::Labels {
4075 var: target,
4076 labels,
4077 } => {
4078 if target != var {
4079 return Err(Error::UnboundVariable(target.clone()));
4080 }
4081 for label in labels {
4082 if !node.labels.contains(label) {
4083 node.labels.push(label.clone());
4084 }
4085 }
4086 row.insert(var.to_string(), Value::Node(node.clone()));
4087 }
4088 SetAssignment::Replace {
4089 var: target,
4090 properties,
4091 } => {
4092 if target != var {
4093 return Err(Error::UnboundVariable(target.clone()));
4094 }
4095 let resolved: Vec<(String, Property)> = properties
4096 .iter()
4097 .map(|(k, expr)| {
4098 let v = eval_expr(expr, &sub_ctx)?;
4099 Ok((k.clone(), value_to_property(v)?))
4100 })
4101 .collect::<Result<Vec<_>>>()?;
4102 node.properties.clear();
4103 for (k, p) in resolved {
4104 node.properties.insert(k, p);
4105 }
4106 row.insert(var.to_string(), Value::Node(node.clone()));
4107 }
4108 SetAssignment::Merge {
4109 var: target,
4110 properties,
4111 } => {
4112 if target != var {
4113 return Err(Error::UnboundVariable(target.clone()));
4114 }
4115 let resolved: Vec<(String, Property)> = properties
4116 .iter()
4117 .map(|(k, expr)| {
4118 let v = eval_expr(expr, &sub_ctx)?;
4119 Ok((k.clone(), value_to_property(v)?))
4120 })
4121 .collect::<Result<Vec<_>>>()?;
4122 for (k, p) in resolved {
4123 node.properties.insert(k, p);
4124 }
4125 row.insert(var.to_string(), Value::Node(node.clone()));
4126 }
4127 SetAssignment::ReplaceFromExpr {
4128 var: target,
4129 source,
4130 replace,
4131 } => {
4132 if target != var {
4133 return Err(Error::UnboundVariable(target.clone()));
4134 }
4135 let v = eval_expr(source, &sub_ctx)?;
4136 let props = extract_property_map(&v)?;
4137 if *replace {
4138 node.properties.clear();
4139 }
4140 for (k, p) in props {
4141 node.properties.insert(k, p);
4142 }
4143 row.insert(var.to_string(), Value::Node(node.clone()));
4144 }
4145 }
4146 }
4147 Ok(())
4148}
4149
4150struct EdgeExpandOp {
4151 input: Box<dyn Operator>,
4152 src_var: String,
4153 edge_var: Option<String>,
4154 dst_var: String,
4155 dst_labels: Vec<String>,
4156 edge_properties: Vec<(String, Expr)>,
4157 edge_types: Vec<String>,
4158 direction: Direction,
4159 edge_constraint_var: Option<String>,
4165 current_row: Option<Row>,
4166 pending: Vec<(EdgeId, NodeId)>,
4167 pending_idx: usize,
4168}
4169
4170impl EdgeExpandOp {
4171 #[allow(clippy::too_many_arguments)]
4172 fn new(
4173 input: Box<dyn Operator>,
4174 src_var: String,
4175 edge_var: Option<String>,
4176 dst_var: String,
4177 dst_labels: Vec<String>,
4178 edge_properties: Vec<(String, Expr)>,
4179 edge_types: Vec<String>,
4180 direction: Direction,
4181 edge_constraint_var: Option<String>,
4182 ) -> Self {
4183 Self {
4184 input,
4185 src_var,
4186 edge_var,
4187 dst_var,
4188 dst_labels,
4189 edge_properties,
4190 edge_types,
4191 direction,
4192 edge_constraint_var,
4193 current_row: None,
4194 pending: Vec::new(),
4195 pending_idx: 0,
4196 }
4197 }
4198}
4199
4200impl Operator for EdgeExpandOp {
4201 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4202 loop {
4203 while self.pending_idx < self.pending.len() {
4204 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4205 self.pending_idx += 1;
4206
4207 let edge = match ctx.store.get_edge(edge_id)? {
4208 Some(e) => e,
4209 None => continue,
4210 };
4211 if !self.edge_types.is_empty()
4212 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4213 {
4214 continue;
4215 }
4216 if let Some(constraint_var) = &self.edge_constraint_var {
4224 let base = self
4225 .current_row
4226 .as_ref()
4227 .expect("pending edges without source row");
4228 let expected = match ctx.lookup_binding(base, constraint_var) {
4229 Some(Value::Edge(e)) => Some(e.id),
4230 _ => None,
4231 };
4232 match expected {
4233 Some(id) if id != edge.id => continue,
4234 None => continue,
4235 _ => {}
4236 }
4237 }
4238 if !self.edge_properties.is_empty() {
4243 let base = self
4244 .current_row
4245 .as_ref()
4246 .expect("pending edges without source row");
4247 let ectx = ctx.eval_ctx(base);
4248 let mut ok = true;
4249 for (key, expr) in &self.edge_properties {
4250 let expected = eval_expr(expr, &ectx)?;
4251 let actual = match edge.properties.get(key) {
4252 Some(v) => Value::Property(v.clone()),
4253 None => {
4254 ok = false;
4255 break;
4256 }
4257 };
4258 if !values_equal(&actual, &expected) {
4259 ok = false;
4260 break;
4261 }
4262 }
4263 if !ok {
4264 continue;
4265 }
4266 }
4267
4268 let neighbor = match ctx.store.get_node(neighbor_id)? {
4269 Some(n) => n,
4270 None => continue,
4271 };
4272 if !has_all_labels(&neighbor, &self.dst_labels) {
4273 continue;
4274 }
4275
4276 let base = self
4277 .current_row
4278 .as_ref()
4279 .expect("pending edges without source row");
4280 let mut out = base.clone();
4281 if let Some(ev) = &self.edge_var {
4282 out.insert(ev.clone(), Value::Edge(edge));
4283 }
4284 out.insert(self.dst_var.clone(), Value::Node(neighbor));
4285 return Ok(Some(out));
4286 }
4287
4288 match self.input.next(ctx)? {
4289 None => return Ok(None),
4290 Some(row) => {
4291 let src_id = match row.get(&self.src_var) {
4292 Some(Value::Node(n)) => n.id,
4293 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4298 continue
4299 }
4300 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4301 };
4302 self.pending = match self.direction {
4303 Direction::Outgoing => ctx.store.outgoing(src_id)?,
4304 Direction::Incoming => ctx.store.incoming(src_id)?,
4305 Direction::Both => {
4306 let mut all = ctx.store.outgoing(src_id)?;
4312 let mut seen: std::collections::HashSet<EdgeId> =
4313 all.iter().map(|(e, _)| *e).collect();
4314 for (e, n) in ctx.store.incoming(src_id)? {
4315 if seen.insert(e) {
4316 all.push((e, n));
4317 }
4318 }
4319 all
4320 }
4321 };
4322 self.pending_idx = 0;
4323 self.current_row = Some(row);
4324 }
4325 }
4326 }
4327 }
4328}
4329
4330struct OptionalEdgeExpandOp {
4345 input: Box<dyn Operator>,
4346 src_var: String,
4347 edge_var: Option<String>,
4348 dst_var: String,
4349 dst_labels: Vec<String>,
4350 dst_properties: Vec<(String, Expr)>,
4351 edge_types: Vec<String>,
4352 direction: Direction,
4353 dst_constraint_var: Option<String>,
4359 edge_constraint_var: Option<String>,
4364 current_row: Option<Row>,
4365 pending: Vec<(EdgeId, NodeId)>,
4366 pending_idx: usize,
4367 yielded_for_current: bool,
4368}
4369
4370impl OptionalEdgeExpandOp {
4371 #[allow(clippy::too_many_arguments)]
4372 fn new(
4373 input: Box<dyn Operator>,
4374 src_var: String,
4375 edge_var: Option<String>,
4376 dst_var: String,
4377 dst_labels: Vec<String>,
4378 dst_properties: Vec<(String, Expr)>,
4379 edge_types: Vec<String>,
4380 direction: Direction,
4381 dst_constraint_var: Option<String>,
4382 edge_constraint_var: Option<String>,
4383 ) -> Self {
4384 Self {
4385 input,
4386 src_var,
4387 edge_var,
4388 dst_var,
4389 dst_labels,
4390 dst_properties,
4391 edge_types,
4392 direction,
4393 dst_constraint_var,
4394 edge_constraint_var,
4395 current_row: None,
4396 pending: Vec::new(),
4397 pending_idx: 0,
4398 yielded_for_current: false,
4399 }
4400 }
4401}
4402
4403impl Operator for OptionalEdgeExpandOp {
4404 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4405 loop {
4406 while self.pending_idx < self.pending.len() {
4407 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
4408 self.pending_idx += 1;
4409
4410 let edge = match ctx.store.get_edge(edge_id)? {
4411 Some(e) => e,
4412 None => continue,
4413 };
4414 if !self.edge_types.is_empty()
4415 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4416 {
4417 continue;
4418 }
4419 if let Some(constraint_var) = &self.edge_constraint_var {
4425 let base = self
4426 .current_row
4427 .as_ref()
4428 .expect("pending without source row");
4429 let expected = match ctx.lookup_binding(base, constraint_var) {
4430 Some(Value::Edge(e)) => Some(e.id),
4431 _ => None,
4432 };
4433 match expected {
4434 Some(id) if id != edge.id => continue,
4435 None => continue,
4436 _ => {}
4437 }
4438 }
4439
4440 let neighbor = match ctx.store.get_node(neighbor_id)? {
4441 Some(n) => n,
4442 None => continue,
4443 };
4444 if !has_all_labels(&neighbor, &self.dst_labels) {
4445 continue;
4446 }
4447 if let Some(constraint_var) = &self.dst_constraint_var {
4454 let base = self
4455 .current_row
4456 .as_ref()
4457 .expect("pending without source row");
4458 let bound_id = match base.get(constraint_var) {
4459 Some(Value::Node(n)) => Some(n.id),
4460 Some(Value::Null)
4461 | Some(Value::Property(meshdb_core::Property::Null))
4462 | None => None,
4463 _ => None,
4464 };
4465 match bound_id {
4466 Some(id) if id != neighbor.id => continue,
4467 None => continue,
4468 _ => {}
4469 }
4470 }
4471 if !self.dst_properties.is_empty() {
4472 let base = self
4473 .current_row
4474 .as_ref()
4475 .expect("pending without source row");
4476 let ectx = ctx.eval_ctx(base);
4477 let mut props_ok = true;
4478 for (key, expr) in &self.dst_properties {
4479 let expected = eval_expr(expr, &ectx)?;
4480 let actual = neighbor
4481 .properties
4482 .get(key)
4483 .cloned()
4484 .map(Value::Property)
4485 .unwrap_or(Value::Null);
4486 if !values_equal(&expected, &actual) {
4487 props_ok = false;
4488 break;
4489 }
4490 }
4491 if !props_ok {
4492 continue;
4493 }
4494 }
4495
4496 let base = self
4497 .current_row
4498 .as_ref()
4499 .expect("pending edges without source row");
4500 let mut out = base.clone();
4501 if let Some(ev) = &self.edge_var {
4502 out.insert(ev.clone(), Value::Edge(edge));
4503 }
4504 out.insert(self.dst_var.clone(), Value::Node(neighbor));
4505 self.yielded_for_current = true;
4506 return Ok(Some(out));
4507 }
4508
4509 if let Some(base) = self.current_row.take() {
4519 if !self.yielded_for_current {
4520 let mut out = base;
4521 if let Some(ev) = &self.edge_var {
4522 let preserve = self
4523 .edge_constraint_var
4524 .as_ref()
4525 .map(|c| c == ev)
4526 .unwrap_or(false);
4527 if !preserve {
4528 out.insert(ev.clone(), Value::Null);
4529 }
4530 }
4531 let preserve_dst = self
4532 .dst_constraint_var
4533 .as_ref()
4534 .map(|c| c == &self.dst_var)
4535 .unwrap_or(false);
4536 if !preserve_dst {
4537 out.insert(self.dst_var.clone(), Value::Null);
4538 }
4539 self.yielded_for_current = true;
4540 return Ok(Some(out));
4541 }
4542 }
4543
4544 match self.input.next(ctx)? {
4545 None => return Ok(None),
4546 Some(row) => {
4547 let src_id = match row.get(&self.src_var) {
4548 Some(Value::Node(n)) => n.id,
4549 Some(Value::Null) => {
4556 self.pending = Vec::new();
4557 self.pending_idx = 0;
4558 self.yielded_for_current = false;
4559 self.current_row = Some(row);
4560 continue;
4561 }
4562 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4563 };
4564 self.pending = match self.direction {
4565 Direction::Outgoing => ctx.store.outgoing(src_id)?,
4566 Direction::Incoming => ctx.store.incoming(src_id)?,
4567 Direction::Both => {
4568 let mut all = ctx.store.outgoing(src_id)?;
4574 let mut seen: std::collections::HashSet<EdgeId> =
4575 all.iter().map(|(e, _)| *e).collect();
4576 for (e, n) in ctx.store.incoming(src_id)? {
4577 if seen.insert(e) {
4578 all.push((e, n));
4579 }
4580 }
4581 all
4582 }
4583 };
4584 self.pending_idx = 0;
4585 self.yielded_for_current = false;
4586 self.current_row = Some(row);
4587 }
4588 }
4589 }
4590 }
4591}
4592
4593struct VarLengthExpandOp {
4594 input: Box<dyn Operator>,
4595 src_var: String,
4596 edge_var: Option<String>,
4597 dst_var: String,
4598 dst_labels: Vec<String>,
4599 edge_types: Vec<String>,
4600 edge_properties: Vec<(String, Expr)>,
4606 direction: Direction,
4607 min_hops: u64,
4608 max_hops: u64,
4609 path_var: Option<String>,
4610 optional: bool,
4616 dst_constraint_var: Option<String>,
4623 bound_edge_list_var: Option<String>,
4627 excluded_edge_vars: Vec<String>,
4635 current_row: Option<Row>,
4636 pending_paths: Vec<Vec<Edge>>,
4637 pending_node_paths: Vec<Vec<NodeId>>,
4638 pending_targets: Vec<NodeId>,
4639 pending_idx: usize,
4640}
4641
4642impl VarLengthExpandOp {
4643 #[allow(clippy::too_many_arguments)]
4644 fn new(
4645 input: Box<dyn Operator>,
4646 src_var: String,
4647 edge_var: Option<String>,
4648 dst_var: String,
4649 dst_labels: Vec<String>,
4650 edge_types: Vec<String>,
4651 edge_properties: Vec<(String, Expr)>,
4652 direction: Direction,
4653 min_hops: u64,
4654 max_hops: u64,
4655 path_var: Option<String>,
4656 optional: bool,
4657 dst_constraint_var: Option<String>,
4658 bound_edge_list_var: Option<String>,
4659 excluded_edge_vars: Vec<String>,
4660 ) -> Self {
4661 Self {
4662 input,
4663 src_var,
4664 edge_var,
4665 dst_var,
4666 dst_labels,
4667 edge_types,
4668 edge_properties,
4669 direction,
4670 min_hops,
4671 max_hops,
4672 path_var,
4673 optional,
4674 dst_constraint_var,
4675 bound_edge_list_var,
4676 excluded_edge_vars,
4677 current_row: None,
4678 pending_paths: Vec::new(),
4679 pending_node_paths: Vec::new(),
4680 pending_targets: Vec::new(),
4681 pending_idx: 0,
4682 }
4683 }
4684
4685 fn enumerate(
4686 &self,
4687 ctx: &ExecCtx,
4688 start: NodeId,
4689 input_row: &Row,
4690 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4691 let mut paths: Vec<Vec<Edge>> = Vec::new();
4692 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
4693 let mut targets: Vec<NodeId> = Vec::new();
4694 let mut edge_buf: Vec<Edge> = Vec::new();
4695 let mut node_buf: Vec<NodeId> = vec![start];
4696 let mut used: HashSet<EdgeId> = HashSet::new();
4703 for var in &self.excluded_edge_vars {
4704 match ctx.lookup_binding(input_row, var) {
4705 Some(Value::Edge(e)) => {
4706 used.insert(e.id);
4707 }
4708 Some(Value::List(items)) => {
4709 for item in items {
4710 if let Value::Edge(e) = item {
4711 used.insert(e.id);
4712 }
4713 }
4714 }
4715 _ => {}
4716 }
4717 }
4718 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
4722 Vec::new()
4723 } else {
4724 let ectx = ctx.eval_ctx(input_row);
4725 self.edge_properties
4726 .iter()
4727 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
4728 .collect::<Result<Vec<_>>>()?
4729 };
4730 self.dfs(
4731 ctx,
4732 start,
4733 &expected_edge_props,
4734 &mut edge_buf,
4735 &mut node_buf,
4736 &mut used,
4737 &mut paths,
4738 &mut node_paths,
4739 &mut targets,
4740 )?;
4741 Ok((paths, node_paths, targets))
4742 }
4743
4744 #[allow(clippy::too_many_arguments)]
4745 fn dfs(
4746 &self,
4747 ctx: &ExecCtx,
4748 current_node: NodeId,
4749 expected_edge_props: &[(String, Value)],
4750 edge_buf: &mut Vec<Edge>,
4751 node_buf: &mut Vec<NodeId>,
4752 used: &mut HashSet<EdgeId>,
4753 out_paths: &mut Vec<Vec<Edge>>,
4754 out_node_paths: &mut Vec<Vec<NodeId>>,
4755 out_targets: &mut Vec<NodeId>,
4756 ) -> Result<()> {
4757 let depth = edge_buf.len() as u64;
4758
4759 if depth >= self.min_hops && depth <= self.max_hops {
4760 let terminal_ok = match ctx.store.get_node(current_node)? {
4761 Some(node) => has_all_labels(&node, &self.dst_labels),
4762 None => false,
4763 };
4764 if terminal_ok {
4765 out_paths.push(edge_buf.clone());
4766 out_node_paths.push(node_buf.clone());
4767 out_targets.push(current_node);
4768 }
4769 }
4770
4771 if depth >= self.max_hops {
4772 return Ok(());
4773 }
4774
4775 let neighbors = match self.direction {
4776 Direction::Outgoing => ctx.store.outgoing(current_node)?,
4777 Direction::Incoming => ctx.store.incoming(current_node)?,
4778 Direction::Both => {
4779 let mut all = ctx.store.outgoing(current_node)?;
4780 all.extend(ctx.store.incoming(current_node)?);
4781 all
4782 }
4783 };
4784
4785 for (eid, neighbor_id) in neighbors {
4786 if used.contains(&eid) {
4787 continue;
4788 }
4789 let edge = match ctx.store.get_edge(eid)? {
4790 Some(e) => e,
4791 None => continue,
4792 };
4793 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
4794 {
4795 continue;
4796 }
4797 if !expected_edge_props.is_empty() {
4802 let mut ok = true;
4803 for (key, expected) in expected_edge_props {
4804 let actual = match edge.properties.get(key) {
4805 Some(v) => Value::Property(v.clone()),
4806 None => {
4807 ok = false;
4808 break;
4809 }
4810 };
4811 if !values_equal(&actual, expected) {
4812 ok = false;
4813 break;
4814 }
4815 }
4816 if !ok {
4817 continue;
4818 }
4819 }
4820 used.insert(eid);
4821 edge_buf.push(edge);
4822 node_buf.push(neighbor_id);
4823 self.dfs(
4824 ctx,
4825 neighbor_id,
4826 expected_edge_props,
4827 edge_buf,
4828 node_buf,
4829 used,
4830 out_paths,
4831 out_node_paths,
4832 out_targets,
4833 )?;
4834 edge_buf.pop();
4835 node_buf.pop();
4836 used.remove(&eid);
4837 }
4838
4839 Ok(())
4840 }
4841}
4842
4843fn replay_edge_list(
4861 ctx: &ExecCtx,
4862 row: &Row,
4863 list_var: &str,
4864 src_id: Option<NodeId>,
4865 direction: Direction,
4866 edge_types: &[String],
4867) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
4868 let start = match src_id {
4869 Some(id) => id,
4870 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
4871 };
4872 let list = match ctx.lookup_binding(row, list_var) {
4873 Some(Value::List(items)) => items.clone(),
4874 Some(Value::Property(meshdb_core::Property::List(items))) => items
4875 .iter()
4876 .cloned()
4877 .map(Value::Property)
4878 .collect::<Vec<_>>(),
4879 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4880 };
4881 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
4882 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
4883 node_buf.push(start);
4884 let mut current = start;
4885 for item in list {
4886 let edge = match item {
4887 Value::Edge(e) => e,
4888 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
4889 };
4890 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
4891 return Ok((Vec::new(), Vec::new(), Vec::new()));
4892 }
4893 let next_node = match direction {
4894 Direction::Outgoing => {
4895 if edge.source != current {
4896 return Ok((Vec::new(), Vec::new(), Vec::new()));
4897 }
4898 edge.target
4899 }
4900 Direction::Incoming => {
4901 if edge.target != current {
4902 return Ok((Vec::new(), Vec::new(), Vec::new()));
4903 }
4904 edge.source
4905 }
4906 Direction::Both => {
4907 if edge.source == current {
4908 edge.target
4909 } else if edge.target == current {
4910 edge.source
4911 } else {
4912 return Ok((Vec::new(), Vec::new(), Vec::new()));
4913 }
4914 }
4915 };
4916 if ctx.store.get_node(next_node)?.is_none() {
4920 return Ok((Vec::new(), Vec::new(), Vec::new()));
4921 }
4922 edge_buf.push(edge);
4923 node_buf.push(next_node);
4924 current = next_node;
4925 }
4926 Ok((vec![edge_buf], vec![node_buf], vec![current]))
4927}
4928
4929impl Operator for VarLengthExpandOp {
4930 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4931 loop {
4932 while self.pending_idx < self.pending_paths.len() {
4933 let i = self.pending_idx;
4934 self.pending_idx += 1;
4935
4936 let target_id = self.pending_targets[i];
4937 let target = match ctx.store.get_node(target_id)? {
4938 Some(n) => n,
4939 None => continue,
4940 };
4941
4942 let base = self
4943 .current_row
4944 .as_ref()
4945 .expect("pending without source row");
4946 let mut out = base.clone();
4947 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
4948 if let Some(ev) = &self.edge_var {
4949 let edges: Vec<Value> = self.pending_paths[i]
4950 .iter()
4951 .cloned()
4952 .map(Value::Edge)
4953 .collect();
4954 out.insert(ev.clone(), Value::List(edges));
4955 }
4956 if let Some(pv) = &self.path_var {
4957 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
4958 for nid in &self.pending_node_paths[i] {
4959 match ctx.store.get_node(*nid)? {
4960 Some(n) => nodes.push(n),
4961 None => continue,
4962 }
4963 }
4964 let edges = self.pending_paths[i].clone();
4965 out.insert(pv.clone(), Value::Path { nodes, edges });
4966 }
4967 return Ok(Some(out));
4968 }
4969
4970 match self.input.next(ctx)? {
4971 None => return Ok(None),
4972 Some(row) => {
4973 let src_id = match row.get(&self.src_var) {
4974 Some(Value::Node(n)) => Some(n.id),
4975 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
4981 None
4982 }
4983 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
4984 };
4985 let (mut paths, mut node_paths, mut targets) =
4993 if let Some(list_var) = &self.bound_edge_list_var {
4994 replay_edge_list(
4995 ctx,
4996 &row,
4997 list_var,
4998 src_id,
4999 self.direction,
5000 &self.edge_types,
5001 )?
5002 } else {
5003 match src_id {
5004 Some(id) => self.enumerate(ctx, id, &row)?,
5005 None => (Vec::new(), Vec::new(), Vec::new()),
5006 }
5007 };
5008 if let Some(constraint_var) = &self.dst_constraint_var {
5015 let target_id = match row.get(constraint_var) {
5016 Some(Value::Node(n)) => Some(n.id),
5017 _ => None,
5018 };
5019 match target_id {
5020 Some(id) => {
5021 let mut kept_paths = Vec::new();
5022 let mut kept_node_paths = Vec::new();
5023 let mut kept_targets = Vec::new();
5024 for ((p, np), t) in paths
5025 .drain(..)
5026 .zip(node_paths.drain(..))
5027 .zip(targets.drain(..))
5028 {
5029 if t == id {
5030 kept_paths.push(p);
5031 kept_node_paths.push(np);
5032 kept_targets.push(t);
5033 }
5034 }
5035 paths = kept_paths;
5036 node_paths = kept_node_paths;
5037 targets = kept_targets;
5038 }
5039 None => {
5040 paths.clear();
5041 node_paths.clear();
5042 targets.clear();
5043 }
5044 }
5045 }
5046 if paths.is_empty() && self.optional {
5047 let mut out = row;
5052 if let Some(ev) = &self.edge_var {
5053 out.insert(ev.clone(), Value::Null);
5054 }
5055 out.insert(self.dst_var.clone(), Value::Null);
5056 if let Some(pv) = &self.path_var {
5057 out.insert(pv.clone(), Value::Null);
5058 }
5059 return Ok(Some(out));
5060 }
5061 self.pending_paths = paths;
5062 self.pending_node_paths = node_paths;
5063 self.pending_targets = targets;
5064 self.pending_idx = 0;
5065 self.current_row = Some(row);
5066 }
5067 }
5068 }
5069 }
5070}
5071
5072struct FilterOp {
5073 input: Box<dyn Operator>,
5074 predicate: Expr,
5075}
5076
5077impl FilterOp {
5078 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
5079 Self { input, predicate }
5080 }
5081}
5082
5083impl Operator for FilterOp {
5084 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5085 while let Some(row) = self.input.next(ctx)? {
5086 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
5087 Ok(v) => v,
5088 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
5091 Err(e) => return Err(e),
5092 };
5093 if to_bool(&v).unwrap_or(false) {
5094 return Ok(Some(row));
5095 }
5096 }
5097 Ok(None)
5098 }
5099}
5100
5101struct IdentityOp {
5104 input: Box<dyn Operator>,
5105}
5106
5107impl IdentityOp {
5108 fn new(input: Box<dyn Operator>) -> Self {
5109 Self { input }
5110 }
5111}
5112
5113impl Operator for IdentityOp {
5114 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5115 self.input.next(ctx)
5116 }
5117}
5118
5119struct CoalesceNullRowOp {
5125 input: Box<dyn Operator>,
5126 null_vars: Vec<String>,
5127 produced_any: bool,
5128 done: bool,
5129}
5130
5131impl CoalesceNullRowOp {
5132 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
5133 Self {
5134 input,
5135 null_vars,
5136 produced_any: false,
5137 done: false,
5138 }
5139 }
5140}
5141
5142impl Operator for CoalesceNullRowOp {
5143 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5144 if self.done {
5145 return Ok(None);
5146 }
5147 match self.input.next(ctx)? {
5148 Some(row) => {
5149 self.produced_any = true;
5150 Ok(Some(row))
5151 }
5152 None => {
5153 self.done = true;
5154 if self.produced_any {
5155 Ok(None)
5156 } else {
5157 let mut row = Row::new();
5158 for v in &self.null_vars {
5159 row.insert(v.clone(), Value::Null);
5160 }
5161 Ok(Some(row))
5162 }
5163 }
5164 }
5165 }
5166}
5167
5168struct ProjectOp {
5169 input: Box<dyn Operator>,
5170 items: Vec<ReturnItem>,
5171}
5172
5173impl ProjectOp {
5174 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
5175 Self { input, items }
5176 }
5177}
5178
5179impl Operator for ProjectOp {
5180 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5181 match self.input.next(ctx)? {
5182 Some(row) => {
5183 let mut out = Row::new();
5184 for (i, item) in self.items.iter().enumerate() {
5185 let name = item.alias.clone().unwrap_or_else(|| {
5186 item.raw_text
5187 .clone()
5188 .unwrap_or_else(|| default_name(&item.expr, i))
5189 });
5190 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
5191 out.insert(name, value);
5192 }
5193 Ok(Some(out))
5194 }
5195 None => Ok(None),
5196 }
5197 }
5198}
5199
5200fn default_name(expr: &Expr, idx: usize) -> String {
5201 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
5202}
5203
5204fn render_expr_name(expr: &Expr) -> Option<String> {
5205 Some(match expr {
5206 Expr::Identifier(s) => s.clone(),
5207 Expr::Property { var, key } => format!("{var}.{key}"),
5208 Expr::PropertyAccess { base, key } => {
5209 if matches!(
5213 base.as_ref(),
5214 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
5215 ) {
5216 format!("({}).{key}", render_expr_name(base)?)
5217 } else {
5218 format!("{}.{key}", render_expr_name(base)?)
5219 }
5220 }
5221 Expr::Parameter(name) => format!("${name}"),
5222 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
5223 Expr::Literal(Literal::Integer(i)) => i.to_string(),
5224 Expr::Literal(Literal::Float(f)) => f.to_string(),
5225 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
5226 Expr::Literal(Literal::Null) => "NULL".into(),
5227 Expr::Call { name, args } => {
5228 let arg_str = match args {
5229 CallArgs::Star => "*".into(),
5230 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
5231 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
5232 "DISTINCT "
5233 } else {
5234 ""
5235 };
5236 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
5237 if inner.len() != es.len() {
5238 return None;
5239 }
5240 format!("{prefix}{}", inner.join(", "))
5241 }
5242 };
5243 format!("{name}({arg_str})")
5244 }
5245 Expr::BinaryOp { op, left, right } => {
5246 let op_str = match op {
5247 BinaryOp::Add => " + ",
5248 BinaryOp::Sub => " - ",
5249 BinaryOp::Mul => " * ",
5250 BinaryOp::Div => " / ",
5251 BinaryOp::Mod => " % ",
5252 BinaryOp::Pow => " ^ ",
5253 };
5254 format!(
5255 "{}{op_str}{}",
5256 render_expr_name(left)?,
5257 render_expr_name(right)?
5258 )
5259 }
5260 Expr::UnaryOp { op, operand } => {
5261 let op_str = match op {
5262 UnaryOp::Neg => "-",
5263 };
5264 format!("{op_str}{}", render_expr_name(operand)?)
5265 }
5266 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
5267 Expr::IsNull { negated, inner } => {
5268 if *negated {
5269 format!("{} IS NOT NULL", render_expr_name(inner)?)
5270 } else {
5271 format!("{} IS NULL", render_expr_name(inner)?)
5272 }
5273 }
5274 Expr::Compare { op, left, right } => {
5275 let op_str = match op {
5276 CompareOp::Eq => " = ",
5277 CompareOp::Ne => " <> ",
5278 CompareOp::Lt => " < ",
5279 CompareOp::Le => " <= ",
5280 CompareOp::Gt => " > ",
5281 CompareOp::Ge => " >= ",
5282 CompareOp::StartsWith => " STARTS WITH ",
5283 CompareOp::EndsWith => " ENDS WITH ",
5284 CompareOp::Contains => " CONTAINS ",
5285 CompareOp::RegexMatch => " =~ ",
5286 };
5287 format!(
5288 "{}{op_str}{}",
5289 render_expr_name(left)?,
5290 render_expr_name(right)?
5291 )
5292 }
5293 Expr::List(items) => {
5294 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
5295 if inner.len() != items.len() {
5296 return None;
5297 }
5298 format!("[{}]", inner.join(", "))
5299 }
5300 Expr::Map(entries) => {
5301 let inner: Vec<String> = entries
5302 .iter()
5303 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
5304 .collect::<Option<Vec<_>>>()?;
5305 format!("{{{}}}", inner.join(", "))
5306 }
5307 Expr::IndexAccess { base, index } => {
5308 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
5309 }
5310 Expr::InList { element, list } => {
5311 format!(
5312 "{} IN {}",
5313 render_expr_name(element)?,
5314 render_expr_name(list)?
5315 )
5316 }
5317 Expr::HasLabels { expr, labels } => {
5318 let mut s = format!("({}", render_expr_name(expr)?);
5319 for l in labels {
5320 s.push(':');
5321 s.push_str(l);
5322 }
5323 s.push(')');
5324 s
5325 }
5326 _ => return None,
5327 })
5328}
5329
5330struct DistinctOp {
5331 input: Box<dyn Operator>,
5332 seen: HashSet<String>,
5333}
5334
5335impl DistinctOp {
5336 fn new(input: Box<dyn Operator>) -> Self {
5337 Self {
5338 input,
5339 seen: HashSet::new(),
5340 }
5341 }
5342}
5343
5344impl Operator for DistinctOp {
5345 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5346 while let Some(row) = self.input.next(ctx)? {
5347 let key = row_key(&row);
5348 if self.seen.insert(key) {
5349 return Ok(Some(row));
5350 }
5351 }
5352 Ok(None)
5353 }
5354}
5355
5356struct BindPathOp {
5372 input: Box<dyn Operator>,
5373 path_var: String,
5374 node_vars: Vec<String>,
5375 edge_vars: Vec<String>,
5376}
5377
5378impl BindPathOp {
5379 fn new(
5380 input: Box<dyn Operator>,
5381 path_var: String,
5382 node_vars: Vec<String>,
5383 edge_vars: Vec<String>,
5384 ) -> Self {
5385 Self {
5386 input,
5387 path_var,
5388 node_vars,
5389 edge_vars,
5390 }
5391 }
5392}
5393
5394impl Operator for BindPathOp {
5395 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5396 let Some(mut row) = self.input.next(ctx)? else {
5397 return Ok(None);
5398 };
5399 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
5403 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
5404 let mut abort = false;
5405 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
5412 nodes.push(n.clone());
5413 } else {
5414 abort = true;
5415 }
5416 if !abort {
5417 for (i, ev) in self.edge_vars.iter().enumerate() {
5418 match row.get(ev) {
5419 Some(Value::Edge(e)) => {
5420 edges.push(e.clone());
5421 match row.get(&self.node_vars[i + 1]) {
5422 Some(Value::Node(n)) => nodes.push(n.clone()),
5423 _ => {
5424 abort = true;
5425 break;
5426 }
5427 }
5428 }
5429 Some(Value::Path {
5430 nodes: sub_nodes,
5431 edges: sub_edges,
5432 }) => {
5433 edges.extend(sub_edges.iter().cloned());
5439 if sub_nodes.len() > 1 {
5440 nodes.extend(sub_nodes[1..].iter().cloned());
5441 }
5442 }
5443 _ => {
5444 abort = true;
5445 break;
5446 }
5447 }
5448 }
5449 }
5450 if abort {
5451 row.insert(self.path_var.clone(), Value::Null);
5452 } else {
5453 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
5454 }
5455 Ok(Some(row))
5456 }
5457}
5458
5459struct ShortestPathOp {
5478 input: Box<dyn Operator>,
5479 src_var: String,
5480 dst_var: String,
5481 path_var: String,
5482 edge_types: Vec<String>,
5483 direction: meshdb_cypher::Direction,
5484 max_hops: u64,
5485 kind: meshdb_cypher::ShortestKind,
5486 pending: std::collections::VecDeque<(Row, Value)>,
5493}
5494
5495impl ShortestPathOp {
5496 #[allow(clippy::too_many_arguments)]
5497 fn new(
5498 input: Box<dyn Operator>,
5499 src_var: String,
5500 dst_var: String,
5501 path_var: String,
5502 edge_types: Vec<String>,
5503 direction: meshdb_cypher::Direction,
5504 max_hops: u64,
5505 kind: meshdb_cypher::ShortestKind,
5506 ) -> Self {
5507 Self {
5508 input,
5509 src_var,
5510 dst_var,
5511 path_var,
5512 edge_types,
5513 direction,
5514 max_hops,
5515 kind,
5516 pending: std::collections::VecDeque::new(),
5517 }
5518 }
5519}
5520
5521impl Operator for ShortestPathOp {
5522 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5523 loop {
5524 if let Some((mut row, path)) = self.pending.pop_front() {
5529 row.insert(self.path_var.clone(), path);
5530 return Ok(Some(row));
5531 }
5532 let Some(row) = self.input.next(ctx)? else {
5533 return Ok(None);
5534 };
5535 let src = match row.get(&self.src_var) {
5536 Some(Value::Node(n)) => n.clone(),
5537 _ => continue,
5538 };
5539 let dst = match row.get(&self.dst_var) {
5540 Some(Value::Node(n)) => n.clone(),
5541 _ => continue,
5542 };
5543 let paths = bfs_shortest_paths(
5544 &src,
5545 &dst,
5546 &self.edge_types,
5547 self.direction,
5548 self.max_hops,
5549 self.kind,
5550 ctx.store,
5551 )?;
5552 if paths.is_empty() {
5553 continue;
5555 }
5556 for path in paths {
5557 self.pending.push_back((row.clone(), path));
5558 }
5559 }
5560 }
5561}
5562
5563fn bfs_shortest_paths(
5582 src: &Node,
5583 dst: &Node,
5584 edge_types: &[String],
5585 direction: meshdb_cypher::Direction,
5586 max_hops: u64,
5587 kind: meshdb_cypher::ShortestKind,
5588 reader: &dyn crate::reader::GraphReader,
5589) -> Result<Vec<Value>> {
5590 use meshdb_cypher::Direction;
5591
5592 if src.id == dst.id {
5593 return Ok(vec![Value::Path {
5594 nodes: vec![src.clone()],
5595 edges: vec![],
5596 }]);
5597 }
5598
5599 let mut dist: HashMap<NodeId, u64> = HashMap::new();
5605 dist.insert(src.id, 0);
5606 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
5607
5608 let mut frontier: Vec<NodeId> = vec![src.id];
5609 let mut depth: u64 = 0;
5610 let mut found = false;
5611
5612 while !frontier.is_empty() && depth < max_hops && !found {
5613 let mut next_frontier: Vec<NodeId> = Vec::new();
5614 for node_id in &frontier {
5615 let neighbors = match direction {
5616 Direction::Outgoing => reader.outgoing(*node_id)?,
5617 Direction::Incoming => reader.incoming(*node_id)?,
5618 Direction::Both => {
5619 let mut out = reader.outgoing(*node_id)?;
5620 out.extend(reader.incoming(*node_id)?);
5621 out
5622 }
5623 };
5624 for (edge_id, neighbor_id) in neighbors {
5625 if !edge_types.is_empty() {
5628 let edge = match reader.get_edge(edge_id)? {
5629 Some(e) => e,
5630 None => continue,
5631 };
5632 if !edge_types.iter().any(|t| t == &edge.edge_type) {
5633 continue;
5634 }
5635 }
5636 match dist.get(&neighbor_id) {
5637 Some(&d) if d == depth + 1 => {
5638 parents
5644 .entry(neighbor_id)
5645 .or_default()
5646 .push((*node_id, edge_id));
5647 }
5648 Some(_) => {
5649 }
5653 None => {
5654 dist.insert(neighbor_id, depth + 1);
5655 parents
5656 .entry(neighbor_id)
5657 .or_default()
5658 .push((*node_id, edge_id));
5659 if neighbor_id == dst.id {
5660 found = true;
5661 } else {
5662 next_frontier.push(neighbor_id);
5663 }
5664 }
5665 }
5666 }
5667 }
5668 depth += 1;
5669 if !found {
5670 frontier = next_frontier;
5671 }
5672 }
5673
5674 if !found {
5675 return Ok(Vec::new());
5676 }
5677
5678 let mut out: Vec<Value> = Vec::new();
5682 let mut nodes_rev: Vec<Node> = Vec::new();
5683 let mut edges_rev: Vec<Edge> = Vec::new();
5684 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
5685 collect_shortest_paths(
5686 src,
5687 dst,
5688 &parents,
5689 reader,
5690 &mut nodes_rev,
5691 &mut edges_rev,
5692 &mut out,
5693 only_first,
5694 )?;
5695 Ok(out)
5696}
5697
5698#[allow(clippy::too_many_arguments)]
5710fn collect_shortest_paths(
5711 src: &Node,
5712 current: &Node,
5713 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
5714 reader: &dyn crate::reader::GraphReader,
5715 nodes_rev: &mut Vec<Node>,
5716 edges_rev: &mut Vec<Edge>,
5717 out: &mut Vec<Value>,
5718 only_first: bool,
5719) -> Result<()> {
5720 if current.id == src.id {
5721 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
5726 nodes.push(src.clone());
5727 nodes.extend(nodes_rev.iter().rev().cloned());
5728 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
5729 out.push(Value::Path { nodes, edges });
5730 return Ok(());
5731 }
5732 let Some(parent_edges) = parents.get(¤t.id) else {
5733 return Ok(());
5737 };
5738 for (parent_id, edge_id) in parent_edges {
5739 if only_first && !out.is_empty() {
5740 return Ok(());
5741 }
5742 let edge = reader
5743 .get_edge(*edge_id)?
5744 .expect("BFS inserted this edge id; it must still exist");
5745 let parent_node = reader
5746 .get_node(*parent_id)?
5747 .expect("BFS visited this node id; it must still exist");
5748 nodes_rev.push(current.clone());
5749 edges_rev.push(edge);
5750 collect_shortest_paths(
5751 src,
5752 &parent_node,
5753 parents,
5754 reader,
5755 nodes_rev,
5756 edges_rev,
5757 out,
5758 only_first,
5759 )?;
5760 nodes_rev.pop();
5761 edges_rev.pop();
5762 }
5763 Ok(())
5764}
5765
5766struct UnionOp {
5775 branches: Vec<Box<dyn Operator>>,
5776 current: usize,
5777 seen: Option<HashSet<String>>,
5778}
5779
5780impl UnionOp {
5781 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
5782 Self {
5783 branches,
5784 current: 0,
5785 seen: if all { None } else { Some(HashSet::new()) },
5786 }
5787 }
5788}
5789
5790impl Operator for UnionOp {
5791 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5792 while self.current < self.branches.len() {
5793 match self.branches[self.current].next(ctx)? {
5794 Some(row) => {
5795 if let Some(seen) = self.seen.as_mut() {
5796 let key = row_key(&row);
5797 if !seen.insert(key) {
5798 continue;
5799 }
5800 }
5801 return Ok(Some(row));
5802 }
5803 None => {
5804 self.current += 1;
5805 }
5806 }
5807 }
5808 Ok(None)
5809 }
5810}
5811
5812struct OrderByOp {
5813 input: Box<dyn Operator>,
5814 sort_items: Vec<SortItem>,
5815 sorted: Option<Vec<Row>>,
5816 cursor: usize,
5817}
5818
5819impl OrderByOp {
5820 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
5821 Self {
5822 input,
5823 sort_items,
5824 sorted: None,
5825 cursor: 0,
5826 }
5827 }
5828}
5829
5830impl Operator for OrderByOp {
5831 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5832 if self.sorted.is_none() {
5833 let mut rows: Vec<Row> = Vec::new();
5834 while let Some(row) = self.input.next(ctx)? {
5835 rows.push(row);
5836 }
5837 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
5838 for row in rows {
5839 let mut keys = Vec::with_capacity(self.sort_items.len());
5840 for item in &self.sort_items {
5841 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5842 }
5843 keyed.push((keys, row));
5844 }
5845 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
5846 keyed.sort_by(|a, b| {
5847 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
5848 let ord = compare_values(va, vb);
5849 let ord = if descs[i] { ord.reverse() } else { ord };
5850 if ord != Ordering::Equal {
5851 return ord;
5852 }
5853 }
5854 Ordering::Equal
5855 });
5856 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
5857 }
5858 let rows = self.sorted.as_ref().unwrap();
5859 if self.cursor < rows.len() {
5860 let row = rows[self.cursor].clone();
5861 self.cursor += 1;
5862 Ok(Some(row))
5863 } else {
5864 Ok(None)
5865 }
5866 }
5867}
5868
5869struct AggregateOp {
5870 input: Box<dyn Operator>,
5871 group_keys: Vec<ReturnItem>,
5872 aggregates: Vec<AggregateSpec>,
5873 results: Option<Vec<Row>>,
5874 cursor: usize,
5875}
5876
5877impl AggregateOp {
5878 fn new(
5879 input: Box<dyn Operator>,
5880 group_keys: Vec<ReturnItem>,
5881 aggregates: Vec<AggregateSpec>,
5882 ) -> Self {
5883 Self {
5884 input,
5885 group_keys,
5886 aggregates,
5887 results: None,
5888 cursor: 0,
5889 }
5890 }
5891
5892 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
5893 let mut groups: HashMap<String, GroupState> = HashMap::new();
5894 let mut order: Vec<String> = Vec::new();
5895
5896 let mut saw_any = false;
5899
5900 while let Some(row) = self.input.next(ctx)? {
5901 saw_any = true;
5902 let mut key_values = Vec::with_capacity(self.group_keys.len());
5903 for item in &self.group_keys {
5904 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
5905 }
5906 let mut hash_key = String::new();
5907 for v in &key_values {
5908 hash_key.push_str(&value_key(v));
5909 hash_key.push('|');
5910 }
5911 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
5912 order.push(hash_key.clone());
5913 GroupState {
5914 key_values: key_values.clone(),
5915 agg_states: self
5916 .aggregates
5917 .iter()
5918 .map(|a| AggState::initial(a.function))
5919 .collect(),
5920 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
5921 }
5922 });
5923 for (i, spec) in self.aggregates.iter().enumerate() {
5924 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
5925 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
5926 if matches!(v, Value::Null) {
5927 continue;
5928 }
5929 let key = value_key(&v);
5930 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
5931 if !seen.insert(key) {
5932 continue;
5933 }
5934 }
5935 if let Some(extra_expr) = &spec.extra_arg {
5941 let need_resolve_percentile = matches!(
5942 &entry.agg_states[i],
5943 AggState::PercentileDisc {
5944 percentile: None,
5945 ..
5946 } | AggState::PercentileCont {
5947 percentile: None,
5948 ..
5949 }
5950 );
5951 let need_resolve_nth =
5952 matches!(&entry.agg_states[i], AggState::ApocNth { target: None, .. });
5953 if need_resolve_percentile {
5954 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5955 let p = match pv {
5956 Value::Property(Property::Float64(f)) => f,
5957 Value::Property(Property::Int64(i)) => i as f64,
5958 _ => 0.0,
5959 };
5960 if !(0.0..=1.0).contains(&p) || p.is_nan() {
5964 return Err(Error::Procedure(format!("percentile out of range: {p}")));
5965 }
5966 match &mut entry.agg_states[i] {
5967 AggState::PercentileDisc { percentile, .. }
5968 | AggState::PercentileCont { percentile, .. } => {
5969 *percentile = Some(p);
5970 }
5971 _ => {}
5972 }
5973 }
5974 if need_resolve_nth {
5975 let nv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
5976 let n = match nv {
5977 Value::Property(Property::Int64(i)) => i,
5978 _ => {
5979 return Err(Error::Procedure(
5980 "apoc.agg.nth expects an integer index".into(),
5981 ))
5982 }
5983 };
5984 if n < 0 {
5985 return Err(Error::Procedure(format!(
5986 "apoc.agg.nth index out of range: {n}"
5987 )));
5988 }
5989 if let AggState::ApocNth { target, .. } = &mut entry.agg_states[i] {
5990 *target = Some(n);
5991 }
5992 }
5993 }
5994 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
5995 }
5996 }
5997
5998 let mut out = Vec::new();
5999 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
6000 let mut row = Row::new();
6002 for spec in &self.aggregates {
6003 row.insert(
6004 spec.alias.clone(),
6005 AggState::initial(spec.function).finalize(),
6006 );
6007 }
6008 out.push(row);
6009 } else {
6010 for key in order {
6011 let state = groups.remove(&key).unwrap();
6012 let mut row = Row::new();
6013 for (i, item) in self.group_keys.iter().enumerate() {
6014 let name = item
6015 .alias
6016 .clone()
6017 .unwrap_or_else(|| default_name(&item.expr, i));
6018 row.insert(name, state.key_values[i].clone());
6019 }
6020 for (i, spec) in self.aggregates.iter().enumerate() {
6021 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
6022 }
6023 out.push(row);
6024 }
6025 }
6026 self.results = Some(out);
6027 Ok(())
6028 }
6029}
6030
6031impl Operator for AggregateOp {
6032 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6033 if self.results.is_none() {
6034 self.compute(ctx)?;
6035 }
6036 let rows = self.results.as_ref().unwrap();
6037 if self.cursor < rows.len() {
6038 let row = rows[self.cursor].clone();
6039 self.cursor += 1;
6040 Ok(Some(row))
6041 } else {
6042 Ok(None)
6043 }
6044 }
6045}
6046
6047struct GroupState {
6048 key_values: Vec<Value>,
6049 agg_states: Vec<AggState>,
6050 distinct_seen: Vec<Option<HashSet<String>>>,
6051}
6052
6053enum AggState {
6054 Count(i64),
6055 Sum {
6056 int_part: i64,
6057 float_part: f64,
6058 is_float: bool,
6059 },
6060 Avg {
6061 total: f64,
6062 count: i64,
6063 },
6064 Min(Option<Value>),
6065 Max(Option<Value>),
6066 Collect(Vec<Value>),
6067 StDev {
6068 sum: f64,
6069 sum_sq: f64,
6070 count: i64,
6071 },
6072 StDevP {
6073 sum: f64,
6074 sum_sq: f64,
6075 count: i64,
6076 },
6077 PercentileDisc {
6078 items: Vec<Value>,
6079 percentile: Option<f64>,
6080 },
6081 PercentileCont {
6082 items: Vec<Value>,
6083 percentile: Option<f64>,
6084 },
6085 ApocFirst(Option<Value>),
6087 ApocLast(Option<Value>),
6089 ApocNth {
6094 target: Option<i64>,
6095 count: i64,
6096 slot: Option<Value>,
6097 },
6098 ApocMedian(Vec<f64>),
6101 ApocProduct {
6104 int_part: i64,
6105 float_part: f64,
6106 is_float: bool,
6107 seen: bool,
6108 },
6109}
6110
6111impl AggState {
6112 fn initial(func: AggregateFn) -> Self {
6113 match func {
6114 AggregateFn::Count => AggState::Count(0),
6115 AggregateFn::Sum => AggState::Sum {
6116 int_part: 0,
6117 float_part: 0.0,
6118 is_float: false,
6119 },
6120 AggregateFn::Avg => AggState::Avg {
6121 total: 0.0,
6122 count: 0,
6123 },
6124 AggregateFn::Min => AggState::Min(None),
6125 AggregateFn::Max => AggState::Max(None),
6126 AggregateFn::Collect => AggState::Collect(Vec::new()),
6127 AggregateFn::StDev => AggState::StDev {
6128 sum: 0.0,
6129 sum_sq: 0.0,
6130 count: 0,
6131 },
6132 AggregateFn::StDevP => AggState::StDevP {
6133 sum: 0.0,
6134 sum_sq: 0.0,
6135 count: 0,
6136 },
6137 AggregateFn::PercentileDisc => AggState::PercentileDisc {
6138 items: Vec::new(),
6139 percentile: None,
6140 },
6141 AggregateFn::PercentileCont => AggState::PercentileCont {
6142 items: Vec::new(),
6143 percentile: None,
6144 },
6145 AggregateFn::ApocFirst => AggState::ApocFirst(None),
6146 AggregateFn::ApocLast => AggState::ApocLast(None),
6147 AggregateFn::ApocNth => AggState::ApocNth {
6148 target: None,
6149 count: 0,
6150 slot: None,
6151 },
6152 AggregateFn::ApocMedian => AggState::ApocMedian(Vec::new()),
6153 AggregateFn::ApocProduct => AggState::ApocProduct {
6154 int_part: 1,
6155 float_part: 1.0,
6156 is_float: false,
6157 seen: false,
6158 },
6159 }
6160 }
6161
6162 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
6163 match self {
6164 AggState::Count(c) => match arg {
6165 AggregateArg::Star => *c += 1,
6166 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
6167 if !matches!(eval_expr(e, ctx)?, Value::Null) {
6168 *c += 1;
6169 }
6170 }
6171 },
6172 AggState::Sum {
6173 int_part,
6174 float_part,
6175 is_float,
6176 } => {
6177 let v = expr_arg_value(arg, ctx)?;
6178 match v {
6179 Value::Null => {}
6180 Value::Property(Property::Int64(i)) => *int_part += i,
6181 Value::Property(Property::Float64(f)) => {
6182 *float_part += f;
6183 *is_float = true;
6184 }
6185 _ => return Err(Error::AggregateTypeError),
6186 }
6187 }
6188 AggState::Avg { total, count } => {
6189 let v = expr_arg_value(arg, ctx)?;
6190 match v {
6191 Value::Null => {}
6192 Value::Property(Property::Int64(i)) => {
6193 *total += i as f64;
6194 *count += 1;
6195 }
6196 Value::Property(Property::Float64(f)) => {
6197 *total += f;
6198 *count += 1;
6199 }
6200 _ => return Err(Error::AggregateTypeError),
6201 }
6202 }
6203 AggState::Min(slot) => {
6204 let v = expr_arg_value(arg, ctx)?;
6211 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6212 } else {
6214 match slot {
6215 None => *slot = Some(v),
6216 Some(cur) => {
6217 if compare_values(&v, cur) == Ordering::Less {
6218 *cur = v;
6219 }
6220 }
6221 }
6222 }
6223 }
6224 AggState::Max(slot) => {
6225 let v = expr_arg_value(arg, ctx)?;
6226 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6227 } else {
6229 match slot {
6230 None => *slot = Some(v),
6231 Some(cur) => {
6232 if compare_values(&v, cur) == Ordering::Greater {
6233 *cur = v;
6234 }
6235 }
6236 }
6237 }
6238 }
6239 AggState::Collect(items) => {
6240 let v = expr_arg_value(arg, ctx)?;
6241 if !matches!(v, Value::Null) {
6242 items.push(v);
6243 }
6244 }
6245 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
6246 let v = expr_arg_value(arg, ctx)?;
6247 if !matches!(v, Value::Null) {
6248 items.push(v);
6249 }
6250 }
6251 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
6252 let v = expr_arg_value(arg, ctx)?;
6253 match v {
6254 Value::Null => {}
6255 Value::Property(Property::Int64(i)) => {
6256 let f = i as f64;
6257 *sum += f;
6258 *sum_sq += f * f;
6259 *count += 1;
6260 }
6261 Value::Property(Property::Float64(f)) => {
6262 *sum += f;
6263 *sum_sq += f * f;
6264 *count += 1;
6265 }
6266 _ => return Err(Error::AggregateTypeError),
6267 }
6268 }
6269 AggState::ApocFirst(slot) => {
6270 if slot.is_some() {
6271 return Ok(());
6272 }
6273 let v = expr_arg_value(arg, ctx)?;
6274 if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6275 *slot = Some(v);
6276 }
6277 }
6278 AggState::ApocLast(slot) => {
6279 let v = expr_arg_value(arg, ctx)?;
6280 if !matches!(v, Value::Null | Value::Property(Property::Null)) {
6281 *slot = Some(v);
6282 }
6283 }
6284 AggState::ApocNth {
6285 target,
6286 count,
6287 slot,
6288 } => {
6289 if slot.is_some() {
6290 return Ok(());
6291 }
6292 let v = expr_arg_value(arg, ctx)?;
6293 if matches!(v, Value::Null | Value::Property(Property::Null)) {
6294 return Ok(());
6295 }
6296 if let Some(t) = *target {
6297 if *count == t {
6298 *slot = Some(v);
6299 }
6300 *count += 1;
6301 }
6302 }
6303 AggState::ApocMedian(items) => {
6304 let v = expr_arg_value(arg, ctx)?;
6305 match v {
6306 Value::Null | Value::Property(Property::Null) => {}
6307 Value::Property(Property::Int64(i)) => items.push(i as f64),
6308 Value::Property(Property::Float64(f)) => items.push(f),
6309 _ => return Err(Error::AggregateTypeError),
6310 }
6311 }
6312 AggState::ApocProduct {
6313 int_part,
6314 float_part,
6315 is_float,
6316 seen,
6317 } => {
6318 let v = expr_arg_value(arg, ctx)?;
6319 match v {
6320 Value::Null | Value::Property(Property::Null) => {}
6321 Value::Property(Property::Int64(i)) => {
6322 *int_part = int_part.saturating_mul(i);
6323 *seen = true;
6324 }
6325 Value::Property(Property::Float64(f)) => {
6326 *float_part *= f;
6327 *is_float = true;
6328 *seen = true;
6329 }
6330 _ => return Err(Error::AggregateTypeError),
6331 }
6332 }
6333 }
6334 Ok(())
6335 }
6336
6337 fn finalize(&self) -> Value {
6338 match self {
6339 AggState::Count(c) => Value::Property(Property::Int64(*c)),
6340 AggState::Sum {
6341 int_part,
6342 float_part,
6343 is_float,
6344 } => {
6345 if *is_float {
6346 Value::Property(Property::Float64(*float_part + *int_part as f64))
6347 } else {
6348 Value::Property(Property::Int64(*int_part))
6349 }
6350 }
6351 AggState::Avg { total, count } => {
6352 if *count == 0 {
6353 Value::Null
6354 } else {
6355 Value::Property(Property::Float64(*total / *count as f64))
6356 }
6357 }
6358 AggState::Min(slot) | AggState::Max(slot) => match slot {
6359 Some(v) => v.clone(),
6360 None => Value::Null,
6361 },
6362 AggState::Collect(items) => Value::List(items.clone()),
6363 AggState::StDevP { sum, sum_sq, count } => {
6364 if *count == 0 {
6365 Value::Property(Property::Float64(0.0))
6366 } else {
6367 let n = *count as f64;
6368 let variance = *sum_sq / n - (*sum / n).powi(2);
6369 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6370 }
6371 }
6372 AggState::StDev { sum, sum_sq, count } => {
6373 if *count < 2 {
6374 Value::Property(Property::Float64(0.0))
6375 } else {
6376 let n = *count as f64;
6377 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
6378 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
6379 }
6380 }
6381 AggState::PercentileDisc { items, percentile } => {
6382 percentile_disc(items, percentile.unwrap_or(0.0))
6383 }
6384 AggState::PercentileCont { items, percentile } => {
6385 percentile_cont(items, percentile.unwrap_or(0.0))
6386 }
6387 AggState::ApocFirst(slot) | AggState::ApocLast(slot) => match slot {
6388 Some(v) => v.clone(),
6389 None => Value::Null,
6390 },
6391 AggState::ApocNth { slot, .. } => match slot {
6392 Some(v) => v.clone(),
6393 None => Value::Null,
6394 },
6395 AggState::ApocMedian(items) => {
6396 if items.is_empty() {
6397 return Value::Null;
6398 }
6399 let mut sorted = items.clone();
6400 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6401 let n = sorted.len();
6402 let median = if n % 2 == 1 {
6403 sorted[n / 2]
6404 } else {
6405 (sorted[n / 2 - 1] + sorted[n / 2]) / 2.0
6406 };
6407 Value::Property(Property::Float64(median))
6408 }
6409 AggState::ApocProduct {
6410 int_part,
6411 float_part,
6412 is_float,
6413 seen,
6414 } => {
6415 if !*seen {
6418 return Value::Null;
6419 }
6420 if *is_float {
6421 Value::Property(Property::Float64(*float_part * *int_part as f64))
6422 } else {
6423 Value::Property(Property::Int64(*int_part))
6424 }
6425 }
6426 }
6427 }
6428}
6429
6430fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
6431 match arg {
6432 AggregateArg::Star => Err(Error::AggregateTypeError),
6433 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
6434 }
6435}
6436
6437fn value_to_f64(v: &Value) -> f64 {
6441 match v {
6442 Value::Property(Property::Int64(i)) => *i as f64,
6443 Value::Property(Property::Float64(f)) => *f,
6444 _ => f64::NAN,
6445 }
6446}
6447
6448fn percentile_disc(items: &[Value], p: f64) -> Value {
6453 let mut nums: Vec<(f64, Value)> = items
6454 .iter()
6455 .map(|v| (value_to_f64(v), v.clone()))
6456 .filter(|(f, _)| !f.is_nan())
6457 .collect();
6458 if nums.is_empty() {
6459 return Value::Null;
6460 }
6461 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
6462 let p = p.clamp(0.0, 1.0);
6463 let n = nums.len();
6464 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
6466 nums[idx.min(n - 1)].1.clone()
6467}
6468
6469fn percentile_cont(items: &[Value], p: f64) -> Value {
6473 let mut nums: Vec<f64> = items
6474 .iter()
6475 .map(value_to_f64)
6476 .filter(|f| !f.is_nan())
6477 .collect();
6478 if nums.is_empty() {
6479 return Value::Null;
6480 }
6481 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
6482 let p = p.clamp(0.0, 1.0);
6483 let n = nums.len();
6484 if n == 1 {
6485 return Value::Property(Property::Float64(nums[0]));
6486 }
6487 let pos = p * (n as f64 - 1.0);
6488 let lo = pos.floor() as usize;
6489 let hi = pos.ceil() as usize;
6490 let frac = pos - lo as f64;
6491 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
6492 Value::Property(Property::Float64(v))
6493}
6494
6495struct SkipOp {
6496 input: Box<dyn Operator>,
6497 count_expr: Expr,
6498 remaining: Option<i64>,
6499}
6500
6501impl SkipOp {
6502 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
6503 Self {
6504 input,
6505 count_expr,
6506 remaining: None,
6507 }
6508 }
6509}
6510
6511impl Operator for SkipOp {
6512 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6513 if self.remaining.is_none() {
6514 let empty = Row::new();
6515 let ectx = ctx.eval_ctx(&empty);
6516 let val = eval_expr(&self.count_expr, &ectx)?;
6517 self.remaining = Some(expr_to_count(val)?);
6518 }
6519 let rem = self.remaining.as_mut().unwrap();
6520 while *rem > 0 {
6521 if self.input.next(ctx)?.is_none() {
6522 return Ok(None);
6523 }
6524 *rem -= 1;
6525 }
6526 self.input.next(ctx)
6527 }
6528}
6529
6530struct LimitOp {
6531 input: Box<dyn Operator>,
6532 count_expr: Expr,
6533 remaining: Option<i64>,
6534 drain_on_complete: bool,
6541 drained: bool,
6544}
6545
6546impl LimitOp {
6547 fn new(input: Box<dyn Operator>, count_expr: Expr, drain_on_complete: bool) -> Self {
6548 Self {
6549 input,
6550 count_expr,
6551 remaining: None,
6552 drain_on_complete,
6553 drained: false,
6554 }
6555 }
6556
6557 fn drain_input(&mut self, ctx: &ExecCtx) -> Result<()> {
6561 if self.drained {
6562 return Ok(());
6563 }
6564 while self.input.next(ctx)?.is_some() {}
6565 self.drained = true;
6566 Ok(())
6567 }
6568}
6569
6570impl Operator for LimitOp {
6571 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
6572 if self.remaining.is_none() {
6573 let empty = Row::new();
6574 let ectx = ctx.eval_ctx(&empty);
6575 let val = eval_expr(&self.count_expr, &ectx)?;
6576 self.remaining = Some(expr_to_count(val)?);
6577 }
6578 let rem = self.remaining.as_mut().unwrap();
6579 if *rem <= 0 {
6580 if self.drain_on_complete {
6581 self.drain_input(ctx)?;
6582 }
6583 return Ok(None);
6584 }
6585 match self.input.next(ctx)? {
6586 Some(row) => {
6587 *rem -= 1;
6588 Ok(Some(row))
6589 }
6590 None => Ok(None),
6591 }
6592 }
6593}
6594
6595fn expr_to_count(val: Value) -> Result<i64> {
6596 match val {
6597 Value::Null | Value::Property(Property::Null) => Ok(0),
6598 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
6599 _ => Err(Error::TypeMismatch),
6604 }
6605}
6606
6607struct RowsLiteralOp {
6614 rows: Vec<Row>,
6615 cursor: usize,
6616}
6617
6618impl RowsLiteralOp {
6619 fn new(rows: Vec<Row>) -> Self {
6620 Self { rows, cursor: 0 }
6621 }
6622}
6623
6624impl Operator for RowsLiteralOp {
6625 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
6626 if self.cursor < self.rows.len() {
6627 let row = self.rows[self.cursor].clone();
6628 self.cursor += 1;
6629 Ok(Some(row))
6630 } else {
6631 Ok(None)
6632 }
6633 }
6634}