1use crate::{
2 error::{Error, Result},
3 eval::{compare_values, eval_expr, row_key, to_bool, value_key, values_equal, EvalCtx},
4 procedures::ProcedureRegistry,
5 reader::GraphReader,
6 value::{ParamMap, Row, Value},
7 writer::GraphWriter,
8};
9use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
10use meshdb_cypher::{
11 AggregateArg, AggregateFn, AggregateSpec, BinaryOp, CallArgs, CompareOp, ConstraintKind,
12 ConstraintScope as CypherConstraintScope, CreateEdgeSpec, CreateNodeSpec, Direction, Expr,
13 Literal, LogicalPlan, PropertyType as CypherPropertyType, RemoveSpec, ReturnItem,
14 SetAssignment, SortItem, UnaryOp, YieldSpec,
15};
16use meshdb_storage::{
17 ConstraintScope as StorageConstraintScope, PropertyConstraintKind,
18 PropertyType as StoragePropertyType, RocksDbStorageEngine,
19};
20use std::cell::RefCell;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23
24#[derive(Default)]
30pub struct Tombstones {
31 pub nodes: RefCell<HashSet<meshdb_core::NodeId>>,
32 pub edges: RefCell<HashSet<meshdb_core::EdgeId>>,
33}
34
35pub struct ExecCtx<'a> {
36 pub store: &'a dyn GraphReader,
37 pub writer: &'a dyn GraphWriter,
38 pub params: &'a ParamMap,
43 pub procedures: &'a ProcedureRegistry,
48 pub outer_rows: &'a [&'a Row],
56 pub tombstones: &'a Tombstones,
60}
61
62pub(crate) struct NoOpWriter;
63impl GraphWriter for NoOpWriter {
64 fn put_node(&self, _: &Node) -> Result<()> {
65 Ok(())
66 }
67 fn put_edge(&self, _: &Edge) -> Result<()> {
68 Ok(())
69 }
70 fn delete_edge(&self, _: EdgeId) -> Result<()> {
71 Ok(())
72 }
73 fn detach_delete_node(&self, _: NodeId) -> Result<()> {
74 Ok(())
75 }
76}
77
78impl<'a> ExecCtx<'a> {
79 pub(crate) fn eval_ctx<'b>(&self, row: &'b Row) -> EvalCtx<'b>
85 where
86 'a: 'b,
87 {
88 EvalCtx {
89 row,
90 params: self.params,
91 reader: self.store,
92 procedures: self.procedures,
93 outer_rows: self.outer_rows,
94 tombstones: self.tombstones,
95 }
96 }
97
98 pub(crate) fn lookup_binding<'r>(&'r self, row: &'r Row, name: &str) -> Option<&'r Value> {
104 if let Some(v) = row.get(name) {
105 return Some(v);
106 }
107 for outer in self.outer_rows {
108 if let Some(v) = outer.get(name) {
109 return Some(v);
110 }
111 }
112 None
113 }
114}
115
116pub trait Operator {
117 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>>;
118}
119
120pub fn execute(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
124 let params = ParamMap::new();
125 execute_with_reader(
126 plan,
127 store as &dyn GraphReader,
128 store as &dyn GraphWriter,
129 ¶ms,
130 )
131}
132
133pub fn execute_with_writer(
138 plan: &LogicalPlan,
139 store: &RocksDbStorageEngine,
140 writer: &dyn GraphWriter,
141) -> Result<Vec<Row>> {
142 let params = ParamMap::new();
143 execute_with_reader(plan, store as &dyn GraphReader, writer, ¶ms)
144}
145
146pub fn explain(plan: &LogicalPlan) -> Vec<Row> {
151 let text = meshdb_cypher::format_plan(plan);
152 let mut row = Row::new();
153 row.insert("plan".to_string(), Value::Property(Property::String(text)));
154 vec![row]
155}
156
157pub fn profile(plan: &LogicalPlan, store: &RocksDbStorageEngine) -> Result<Vec<Row>> {
158 let result_rows = execute(plan, store)?;
159 let row_count = result_rows.len() as i64;
160 let plan_text = meshdb_cypher::format_plan(plan);
161 let summary = format!("{plan_text}\nRows: {row_count}");
162 let mut row = Row::new();
163 row.insert(
164 "profile".to_string(),
165 Value::Property(Property::String(summary)),
166 );
167 row.insert(
168 "rows".to_string(),
169 Value::Property(Property::Int64(row_count)),
170 );
171 Ok(vec![row])
172}
173
174pub fn execute_with_reader(
175 plan: &LogicalPlan,
176 reader: &dyn GraphReader,
177 writer: &dyn GraphWriter,
178 params: &ParamMap,
179) -> Result<Vec<Row>> {
180 let empty_procs = ProcedureRegistry::new();
181 execute_with_reader_and_procs(plan, reader, writer, params, &empty_procs)
182}
183
184pub fn execute_with_reader_and_procs(
190 plan: &LogicalPlan,
191 reader: &dyn GraphReader,
192 writer: &dyn GraphWriter,
193 params: &ParamMap,
194 procedures: &ProcedureRegistry,
195) -> Result<Vec<Row>> {
196 crate::eval::reset_statement_time();
201 if let Some(rows) = try_execute_ddl(plan, reader, writer)? {
206 return Ok(rows);
207 }
208 let suppress_output = is_write_only_plan(plan);
209 let mut op = build_op(plan);
210 let tombstones = Tombstones::default();
211 let ctx = ExecCtx {
212 store: reader,
213 writer,
214 params,
215 procedures,
216 outer_rows: &[],
217 tombstones: &tombstones,
218 };
219 let mut rows = Vec::new();
220 while let Some(row) = op.next(&ctx)? {
221 rows.push(row);
222 }
223 if suppress_output {
224 Ok(Vec::new())
225 } else {
226 Ok(rows)
227 }
228}
229
230fn is_write_only_plan(plan: &LogicalPlan) -> bool {
231 match plan {
235 LogicalPlan::CreatePath { .. }
236 | LogicalPlan::Delete { .. }
237 | LogicalPlan::SetProperty { .. }
238 | LogicalPlan::Remove { .. }
239 | LogicalPlan::Foreach { .. }
240 | LogicalPlan::MergeNode { .. }
241 | LogicalPlan::MergeEdge { .. } => true,
242 _ => false,
243 }
244}
245
246fn try_execute_ddl(
256 plan: &LogicalPlan,
257 reader: &dyn GraphReader,
258 writer: &dyn GraphWriter,
259) -> Result<Option<Vec<Row>>> {
260 match plan {
261 LogicalPlan::CreatePropertyIndex { label, property } => {
262 writer.create_property_index(label, property)?;
263 Ok(Some(vec![ddl_ack_row("created", label, property)]))
264 }
265 LogicalPlan::DropPropertyIndex { label, property } => {
266 writer.drop_property_index(label, property)?;
267 Ok(Some(vec![ddl_ack_row("dropped", label, property)]))
268 }
269 LogicalPlan::ShowPropertyIndexes => {
270 let specs = reader.list_property_indexes()?;
274 let rows = specs
275 .into_iter()
276 .map(|(label, property)| {
277 let mut row = Row::default();
278 row.insert("label".into(), Value::Property(Property::String(label)));
279 row.insert(
280 "property".into(),
281 Value::Property(Property::String(property)),
282 );
283 row.insert(
284 "state".into(),
285 Value::Property(Property::String("online".into())),
286 );
287 row
288 })
289 .collect();
290 Ok(Some(rows))
291 }
292 LogicalPlan::CreatePropertyConstraint {
293 name,
294 scope,
295 properties,
296 kind,
297 if_not_exists,
298 } => {
299 let storage_kind = match kind {
300 ConstraintKind::Unique => PropertyConstraintKind::Unique,
301 ConstraintKind::NotNull => PropertyConstraintKind::NotNull,
302 ConstraintKind::NodeKey => PropertyConstraintKind::NodeKey,
303 ConstraintKind::PropertyType(t) => {
304 PropertyConstraintKind::PropertyType(cypher_to_storage_property_type(*t))
305 }
306 };
307 let storage_scope = cypher_to_storage_scope(scope);
308 let spec = writer.create_property_constraint(
309 name.as_deref(),
310 &storage_scope,
311 properties,
312 storage_kind,
313 *if_not_exists,
314 )?;
315 Ok(Some(vec![constraint_ack_row("created", &spec)]))
316 }
317 LogicalPlan::DropPropertyConstraint { name, if_exists } => {
318 writer.drop_property_constraint(name, *if_exists)?;
319 let mut row = Row::default();
320 row.insert(
321 "state".into(),
322 Value::Property(Property::String("dropped".into())),
323 );
324 row.insert(
325 "name".into(),
326 Value::Property(Property::String(name.clone())),
327 );
328 Ok(Some(vec![row]))
329 }
330 LogicalPlan::ShowPropertyConstraints => {
331 let specs = reader.list_property_constraints()?;
332 let rows = specs.into_iter().map(constraint_show_row).collect();
333 Ok(Some(rows))
334 }
335 _ => Ok(None),
336 }
337}
338
339fn constraint_ack_row(state: &str, spec: &meshdb_storage::PropertyConstraintSpec) -> Row {
344 let mut row = constraint_show_row(spec.clone());
345 row.insert(
346 "state".into(),
347 Value::Property(Property::String(state.into())),
348 );
349 row
350}
351
352fn constraint_show_row(spec: meshdb_storage::PropertyConstraintSpec) -> Row {
358 let mut row = Row::default();
359 row.insert("name".into(), Value::Property(Property::String(spec.name)));
360 let (scope_tag, target) = match spec.scope {
361 meshdb_storage::ConstraintScope::Node(l) => ("NODE", l),
362 meshdb_storage::ConstraintScope::Relationship(t) => ("RELATIONSHIP", t),
363 };
364 row.insert(
365 "scope".into(),
366 Value::Property(Property::String(scope_tag.into())),
367 );
368 row.insert("label".into(), Value::Property(Property::String(target)));
373 let props: Vec<Property> = spec.properties.into_iter().map(Property::String).collect();
374 row.insert("properties".into(), Value::Property(Property::List(props)));
375 row.insert(
376 "type".into(),
377 Value::Property(Property::String(spec.kind.as_string())),
378 );
379 row
380}
381
382fn cypher_to_storage_scope(scope: &CypherConstraintScope) -> StorageConstraintScope {
386 match scope {
387 CypherConstraintScope::Node(l) => StorageConstraintScope::Node(l.clone()),
388 CypherConstraintScope::Relationship(t) => StorageConstraintScope::Relationship(t.clone()),
389 }
390}
391
392fn cypher_to_storage_property_type(t: CypherPropertyType) -> StoragePropertyType {
397 match t {
398 CypherPropertyType::String => StoragePropertyType::String,
399 CypherPropertyType::Integer => StoragePropertyType::Integer,
400 CypherPropertyType::Float => StoragePropertyType::Float,
401 CypherPropertyType::Boolean => StoragePropertyType::Boolean,
402 }
403}
404
405fn ddl_ack_row(state: &str, label: &str, property: &str) -> Row {
406 let mut row = Row::default();
407 row.insert(
408 "state".into(),
409 Value::Property(Property::String(state.into())),
410 );
411 row.insert(
412 "label".into(),
413 Value::Property(Property::String(label.into())),
414 );
415 row.insert(
416 "property".into(),
417 Value::Property(Property::String(property.into())),
418 );
419 row
420}
421
422fn build_op(plan: &LogicalPlan) -> Box<dyn Operator> {
423 build_op_inner(plan, None)
424}
425
426pub(crate) fn build_op_inner(plan: &LogicalPlan, seed: Option<&Row>) -> Box<dyn Operator> {
427 macro_rules! child {
428 ($p:expr) => {
429 build_op_inner($p, seed)
430 };
431 }
432 match plan {
433 LogicalPlan::CreatePath {
434 input,
435 nodes,
436 edges,
437 } => Box::new(CreatePathOp::new(
438 input.as_ref().map(|p| child!(p)),
439 nodes.clone(),
440 edges.clone(),
441 )),
442 LogicalPlan::CartesianProduct { left, right } => {
443 Box::new(CartesianProductOp::new(child!(left), (**right).clone()))
444 }
445 LogicalPlan::Delete {
446 input,
447 detach,
448 vars,
449 exprs,
450 } => Box::new(DeleteOp::new(
451 child!(input),
452 *detach,
453 vars.clone(),
454 exprs.clone(),
455 )),
456 LogicalPlan::SetProperty { input, assignments } => {
457 Box::new(SetPropertyOp::new(child!(input), assignments.clone()))
458 }
459 LogicalPlan::Remove { input, items } => {
460 Box::new(RemoveOp::new(child!(input), items.clone()))
461 }
462 LogicalPlan::LoadCsv {
463 input,
464 path_expr,
465 var,
466 with_headers,
467 } => Box::new(LoadCsvOp::new(
468 input.as_ref().map(|p| child!(p)),
469 path_expr.clone(),
470 var.clone(),
471 *with_headers,
472 )),
473 LogicalPlan::Foreach {
474 input,
475 var,
476 list_expr,
477 set_assignments,
478 remove_items,
479 } => Box::new(ForeachOp::new(
480 child!(input),
481 var.clone(),
482 list_expr.clone(),
483 set_assignments.clone(),
484 remove_items.clone(),
485 )),
486 LogicalPlan::CallSubquery { input, body } => {
487 Box::new(CallSubqueryOp::new(child!(input), (**body).clone()))
488 }
489 LogicalPlan::OptionalApply {
490 input,
491 body,
492 null_vars,
493 } => Box::new(OptionalApplyOp::new(
494 child!(input),
495 (**body).clone(),
496 null_vars.clone(),
497 )),
498 LogicalPlan::ProcedureCall {
499 input,
500 qualified_name,
501 args,
502 yield_spec,
503 standalone,
504 } => Box::new(ProcedureCallOp::new(
505 input.as_ref().map(|p| child!(p)),
506 qualified_name.clone(),
507 args.clone(),
508 yield_spec.clone(),
509 *standalone,
510 )),
511 LogicalPlan::SeedRow => match seed {
512 Some(r) => Box::new(SeededRowOp {
513 row: Some(r.clone()),
514 }),
515 None => Box::new(SeedRowOp { done: false }),
516 },
517 LogicalPlan::NodeScanAll { var } => Box::new(NodeScanAllOp::new(var.clone())),
518 LogicalPlan::NodeScanByLabels { var, labels } => {
519 Box::new(NodeScanByLabelsOp::new(var.clone(), labels.clone()))
520 }
521 LogicalPlan::EdgeExpand {
522 input,
523 src_var,
524 edge_var,
525 dst_var,
526 dst_labels,
527 edge_properties,
528 edge_types,
529 direction,
530 edge_constraint_var,
531 } => Box::new(EdgeExpandOp::new(
532 child!(input),
533 src_var.clone(),
534 edge_var.clone(),
535 dst_var.clone(),
536 dst_labels.clone(),
537 edge_properties.clone(),
538 edge_types.clone(),
539 *direction,
540 edge_constraint_var.clone(),
541 )),
542 LogicalPlan::OptionalEdgeExpand {
543 input,
544 src_var,
545 edge_var,
546 dst_var,
547 dst_labels,
548 dst_properties,
549 edge_types,
550 direction,
551 dst_constraint_var,
552 edge_constraint_var,
553 } => Box::new(OptionalEdgeExpandOp::new(
554 child!(input),
555 src_var.clone(),
556 edge_var.clone(),
557 dst_var.clone(),
558 dst_labels.clone(),
559 dst_properties.clone(),
560 edge_types.clone(),
561 *direction,
562 dst_constraint_var.clone(),
563 edge_constraint_var.clone(),
564 )),
565 LogicalPlan::VarLengthExpand {
566 input,
567 src_var,
568 edge_var,
569 dst_var,
570 dst_labels,
571 edge_types,
572 edge_properties,
573 direction,
574 min_hops,
575 max_hops,
576 path_var,
577 optional,
578 dst_constraint_var,
579 bound_edge_list_var,
580 excluded_edge_vars,
581 } => Box::new(VarLengthExpandOp::new(
582 child!(input),
583 src_var.clone(),
584 edge_var.clone(),
585 dst_var.clone(),
586 dst_labels.clone(),
587 edge_types.clone(),
588 edge_properties.clone(),
589 *direction,
590 *min_hops,
591 *max_hops,
592 path_var.clone(),
593 *optional,
594 dst_constraint_var.clone(),
595 bound_edge_list_var.clone(),
596 excluded_edge_vars.clone(),
597 )),
598 LogicalPlan::Filter { input, predicate } => {
599 Box::new(FilterOp::new(child!(input), predicate.clone()))
600 }
601 LogicalPlan::Project { input, items } => {
602 Box::new(ProjectOp::new(child!(input), items.clone()))
603 }
604 LogicalPlan::Aggregate {
605 input,
606 group_keys,
607 aggregates,
608 } => Box::new(AggregateOp::new(
609 child!(input),
610 group_keys.clone(),
611 aggregates.clone(),
612 )),
613 LogicalPlan::Identity { input } => Box::new(IdentityOp::new(child!(input))),
614 LogicalPlan::CoalesceNullRow { input, null_vars } => {
615 Box::new(CoalesceNullRowOp::new(child!(input), null_vars.clone()))
616 }
617 LogicalPlan::Distinct { input } => Box::new(DistinctOp::new(child!(input))),
618 LogicalPlan::OrderBy { input, sort_items } => {
619 Box::new(OrderByOp::new(child!(input), sort_items.clone()))
620 }
621 LogicalPlan::Skip { input, count } => Box::new(SkipOp::new(child!(input), count.clone())),
622 LogicalPlan::Limit { input, count } => Box::new(LimitOp::new(child!(input), count.clone())),
623 LogicalPlan::MergeNode {
624 input,
625 var,
626 labels,
627 properties,
628 on_create,
629 on_match,
630 } => Box::new(MergeNodeOp::new(
631 input.as_ref().map(|p| child!(p)),
632 var.clone(),
633 labels.clone(),
634 properties.clone(),
635 on_create.clone(),
636 on_match.clone(),
637 )),
638 LogicalPlan::MergeEdge {
639 input,
640 edge_var,
641 src_var,
642 dst_var,
643 edge_type,
644 undirected,
645 properties,
646 on_create,
647 on_match,
648 } => Box::new(MergeEdgeOp::new(
649 child!(input),
650 edge_var.clone(),
651 src_var.clone(),
652 dst_var.clone(),
653 edge_type.clone(),
654 *undirected,
655 properties.clone(),
656 on_create.clone(),
657 on_match.clone(),
658 )),
659 LogicalPlan::Unwind { var, expr } => Box::new(UnwindOp::new(var.clone(), expr.clone())),
660 LogicalPlan::UnwindChain { input, var, expr } => {
661 Box::new(UnwindChainOp::new(child!(input), var.clone(), expr.clone()))
662 }
663 LogicalPlan::IndexSeek {
664 var,
665 label,
666 property,
667 value,
668 } => Box::new(IndexSeekOp::new(
669 var.clone(),
670 label.clone(),
671 property.clone(),
672 value.clone(),
673 )),
674 LogicalPlan::Union { branches, all } => {
679 let branch_ops: Vec<Box<dyn Operator>> = branches.iter().map(|b| child!(b)).collect();
680 Box::new(UnionOp::new(branch_ops, *all))
681 }
682 LogicalPlan::BindPath {
683 input,
684 path_var,
685 node_vars,
686 edge_vars,
687 } => Box::new(BindPathOp::new(
688 child!(input),
689 path_var.clone(),
690 node_vars.clone(),
691 edge_vars.clone(),
692 )),
693 LogicalPlan::ShortestPath {
694 input,
695 src_var,
696 dst_var,
697 path_var,
698 edge_types,
699 direction,
700 max_hops,
701 kind,
702 } => Box::new(ShortestPathOp::new(
703 child!(input),
704 src_var.clone(),
705 dst_var.clone(),
706 path_var.clone(),
707 edge_types.clone(),
708 *direction,
709 *max_hops,
710 *kind,
711 )),
712 LogicalPlan::CreatePropertyIndex { .. }
713 | LogicalPlan::DropPropertyIndex { .. }
714 | LogicalPlan::ShowPropertyIndexes
715 | LogicalPlan::CreatePropertyConstraint { .. }
716 | LogicalPlan::DropPropertyConstraint { .. }
717 | LogicalPlan::ShowPropertyConstraints => {
718 panic!("schema DDL must be dispatched via try_execute_ddl before build_op")
719 }
720 }
721}
722
723struct UnwindOp {
724 var: String,
725 expr: Expr,
726 items: Option<Vec<Value>>,
727 cursor: usize,
728}
729
730impl UnwindOp {
731 fn new(var: String, expr: Expr) -> Self {
732 Self {
733 var,
734 expr,
735 items: None,
736 cursor: 0,
737 }
738 }
739}
740
741impl Operator for UnwindOp {
742 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
743 if self.items.is_none() {
744 let empty = Row::new();
745 let ectx = EvalCtx {
746 row: &empty,
747 params: ctx.params,
748 reader: ctx.store,
749 procedures: ctx.procedures,
750 outer_rows: ctx.outer_rows,
751 tombstones: ctx.tombstones,
752 };
753 let val = eval_expr(&self.expr, &ectx)?;
754 self.items = Some(coerce_unwind_list(val)?);
755 }
756 let items = self.items.as_ref().unwrap();
757 if self.cursor < items.len() {
758 let v = items[self.cursor].clone();
759 self.cursor += 1;
760 let mut row = Row::new();
761 row.insert(self.var.clone(), v);
762 Ok(Some(row))
763 } else {
764 Ok(None)
765 }
766 }
767}
768
769struct UnwindChainOp {
775 input: Box<dyn Operator>,
776 var: String,
777 expr: Expr,
778 current_row: Option<Row>,
779 items: Vec<Value>,
780 cursor: usize,
781}
782
783impl UnwindChainOp {
784 fn new(input: Box<dyn Operator>, var: String, expr: Expr) -> Self {
785 Self {
786 input,
787 var,
788 expr,
789 current_row: None,
790 items: Vec::new(),
791 cursor: 0,
792 }
793 }
794}
795
796impl Operator for UnwindChainOp {
797 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
798 loop {
799 if let Some(base) = &self.current_row {
800 if self.cursor < self.items.len() {
801 let v = self.items[self.cursor].clone();
802 self.cursor += 1;
803 let mut row = base.clone();
804 row.insert(self.var.clone(), v);
805 return Ok(Some(row));
806 }
807 self.current_row = None;
808 self.items.clear();
809 self.cursor = 0;
810 }
811 let base = match self.input.next(ctx)? {
812 Some(r) => r,
813 None => return Ok(None),
814 };
815 let ectx = EvalCtx {
816 row: &base,
817 params: ctx.params,
818 reader: ctx.store,
819 procedures: ctx.procedures,
820 outer_rows: ctx.outer_rows,
821 tombstones: ctx.tombstones,
822 };
823 let val = eval_expr(&self.expr, &ectx)?;
824 self.items = coerce_unwind_list(val)?;
825 self.current_row = Some(base);
826 }
827 }
828}
829
830fn coerce_unwind_list(val: Value) -> Result<Vec<Value>> {
835 match val {
836 Value::List(items) => Ok(items),
837 Value::Property(Property::List(props)) => {
838 Ok(props.into_iter().map(Value::Property).collect())
839 }
840 Value::Null => Ok(Vec::new()),
841 _ => Err(Error::TypeMismatch),
842 }
843}
844
845struct CreatePathOp {
846 input: Option<Box<dyn Operator>>,
847 nodes: Vec<CreateNodeSpec>,
848 edges: Vec<CreateEdgeSpec>,
849 done: bool,
850 buffered: Option<Vec<Row>>,
851 cursor: usize,
852}
853
854impl CreatePathOp {
855 fn new(
856 input: Option<Box<dyn Operator>>,
857 nodes: Vec<CreateNodeSpec>,
858 edges: Vec<CreateEdgeSpec>,
859 ) -> Self {
860 Self {
861 input,
862 nodes,
863 edges,
864 done: false,
865 buffered: None,
866 cursor: 0,
867 }
868 }
869
870 fn apply(&self, ctx: &ExecCtx, row: &Row) -> Result<Row> {
871 let mut out = row.clone();
872 let mut node_ids: Vec<NodeId> = Vec::with_capacity(self.nodes.len());
873 for spec in &self.nodes {
874 match spec {
875 CreateNodeSpec::New {
876 var,
877 labels,
878 properties,
879 } => {
880 let mut node = Node::new();
881 for label in labels {
882 node.labels.push(label.clone());
883 }
884 for (k, expr) in properties {
892 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
893 let prop = value_to_property(value)?;
894 if matches!(prop, Property::Null) {
895 continue;
896 }
897 node.properties.insert(k.clone(), prop);
898 }
899 ctx.writer.put_node(&node)?;
900 node_ids.push(node.id);
901 if let Some(v) = var {
902 out.insert(v.clone(), Value::Node(node));
903 }
904 }
905 CreateNodeSpec::Reference(name) => {
906 let id = match out.get(name) {
907 Some(Value::Node(n)) => n.id,
908 _ => return Err(Error::UnboundVariable(name.clone())),
909 };
910 node_ids.push(id);
911 }
912 }
913 }
914 for spec in &self.edges {
915 let src = node_ids[spec.src_idx];
916 let dst = node_ids[spec.dst_idx];
917 let mut edge = Edge::new(spec.edge_type.clone(), src, dst);
918 for (k, expr) in &spec.properties {
919 let value = eval_expr(expr, &ctx.eval_ctx(&out))?;
920 let prop = value_to_property(value)?;
921 if matches!(prop, Property::Null) {
922 continue;
923 }
924 edge.properties.insert(k.clone(), prop);
925 }
926 ctx.writer.put_edge(&edge)?;
927 if let Some(v) = &spec.var {
928 out.insert(v.clone(), Value::Edge(edge));
929 }
930 }
931 Ok(out)
932 }
933}
934
935impl Operator for CreatePathOp {
936 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
937 if self.input.is_some() {
938 if let Some(buffered) = self.buffered.as_mut() {
942 if self.cursor < buffered.len() {
943 let row = buffered[self.cursor].clone();
944 self.cursor += 1;
945 return Ok(Some(self.apply(ctx, &row)?));
946 }
947 return Ok(None);
948 }
949 let mut rows: Vec<Row> = Vec::new();
950 {
951 let input = self.input.as_mut().unwrap();
952 while let Some(row) = input.next(ctx)? {
953 rows.push(row);
954 }
955 }
956 self.buffered = Some(rows);
957 self.cursor = 0;
958 self.next(ctx)
960 } else {
961 if self.done {
962 return Ok(None);
963 }
964 self.done = true;
965 let empty = Row::new();
966 Ok(Some(self.apply(ctx, &empty)?))
967 }
968 }
969}
970
971struct CartesianProductOp {
972 left: Box<dyn Operator>,
973 right_plan: LogicalPlan,
974 left_row: Option<Row>,
975 right_op: Option<Box<dyn Operator>>,
976}
977
978impl CartesianProductOp {
979 fn new(left: Box<dyn Operator>, right_plan: LogicalPlan) -> Self {
980 Self {
981 left,
982 right_plan,
983 left_row: None,
984 right_op: None,
985 }
986 }
987}
988
989impl Operator for CartesianProductOp {
990 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
991 loop {
992 if self.left_row.is_none() {
993 match self.left.next(ctx)? {
994 None => return Ok(None),
995 Some(row) => {
996 self.left_row = Some(row);
997 self.right_op = Some(build_op(&self.right_plan));
998 }
999 }
1000 }
1001 let right_op = self.right_op.as_mut().expect("right_op set");
1002 let left_ref = self.left_row.as_ref().unwrap();
1007 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1008 stacked.push(left_ref);
1009 stacked.extend_from_slice(ctx.outer_rows);
1010 let inner_ctx = ExecCtx {
1011 store: ctx.store,
1012 writer: ctx.writer,
1013 params: ctx.params,
1014 procedures: ctx.procedures,
1015 outer_rows: &stacked,
1016 tombstones: ctx.tombstones,
1017 };
1018 match right_op.next(&inner_ctx)? {
1019 Some(right_row) => {
1020 let mut combined = left_ref.clone();
1021 for (k, v) in right_row {
1022 combined.insert(k, v);
1023 }
1024 return Ok(Some(combined));
1025 }
1026 None => {
1027 self.left_row = None;
1028 self.right_op = None;
1029 }
1030 }
1031 }
1032 }
1033}
1034
1035struct DeleteOp {
1036 input: Box<dyn Operator>,
1037 detach: bool,
1038 #[allow(dead_code)]
1039 vars: Vec<String>,
1040 exprs: Vec<Expr>,
1041 buffered: Option<Vec<Row>>,
1049 cursor: usize,
1050}
1051
1052impl DeleteOp {
1053 fn new(input: Box<dyn Operator>, detach: bool, vars: Vec<String>, exprs: Vec<Expr>) -> Self {
1054 Self {
1055 input,
1056 detach,
1057 vars,
1058 exprs,
1059 buffered: None,
1060 cursor: 0,
1061 }
1062 }
1063
1064 fn apply_deletes(
1074 &self,
1075 ctx: &ExecCtx,
1076 row: &Row,
1077 seen_edges: &mut HashSet<meshdb_core::EdgeId>,
1078 seen_nodes: &mut HashSet<meshdb_core::NodeId>,
1079 ) -> Result<()> {
1080 let mut edge_ids: Vec<meshdb_core::EdgeId> = Vec::new();
1081 let mut node_ids: Vec<meshdb_core::NodeId> = Vec::new();
1082 for expr in &self.exprs {
1083 let v = eval_expr(expr, &ctx.eval_ctx(row))?;
1084 match v {
1085 Value::Node(n) => node_ids.push(n.id),
1086 Value::Edge(e) => edge_ids.push(e.id),
1087 Value::Path { nodes, edges } => {
1088 for e in edges {
1089 edge_ids.push(e.id);
1090 }
1091 for n in nodes {
1092 node_ids.push(n.id);
1093 }
1094 }
1095 Value::Null | Value::Property(Property::Null) => continue,
1096 _ => return Err(Error::TypeMismatch),
1097 }
1098 }
1099 for eid in &edge_ids {
1100 if seen_edges.insert(*eid) {
1101 ctx.writer.delete_edge(*eid)?;
1102 ctx.tombstones.edges.borrow_mut().insert(*eid);
1103 }
1104 }
1105 for nid in &node_ids {
1106 if !seen_nodes.insert(*nid) {
1107 continue;
1108 }
1109 if self.detach {
1110 for (eid, _) in ctx.store.outgoing(*nid)? {
1115 ctx.tombstones.edges.borrow_mut().insert(eid);
1116 }
1117 for (eid, _) in ctx.store.incoming(*nid)? {
1118 ctx.tombstones.edges.borrow_mut().insert(eid);
1119 }
1120 ctx.writer.detach_delete_node(*nid)?;
1121 } else {
1122 let out = ctx.store.outgoing(*nid)?;
1123 let inc = ctx.store.incoming(*nid)?;
1124 let still_attached = out
1125 .iter()
1126 .chain(inc.iter())
1127 .any(|(eid, _)| !seen_edges.contains(eid));
1128 if still_attached {
1129 return Err(Error::CannotDeleteAttachedNode);
1130 }
1131 ctx.writer.detach_delete_node(*nid)?;
1132 }
1133 ctx.tombstones.nodes.borrow_mut().insert(*nid);
1134 }
1135 Ok(())
1136 }
1137}
1138
1139impl Operator for DeleteOp {
1140 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1141 if self.buffered.is_none() {
1149 let mut rows: Vec<Row> = Vec::new();
1150 while let Some(row) = self.input.next(ctx)? {
1151 rows.push(row);
1152 }
1153 let mut seen_edges: HashSet<meshdb_core::EdgeId> = HashSet::new();
1154 let mut seen_nodes: HashSet<meshdb_core::NodeId> = HashSet::new();
1155 for row in &rows {
1156 self.apply_deletes(ctx, row, &mut seen_edges, &mut seen_nodes)?;
1157 }
1158 self.buffered = Some(rows);
1159 self.cursor = 0;
1160 }
1161 let rows = self.buffered.as_ref().unwrap();
1162 if self.cursor < rows.len() {
1163 let row = rows[self.cursor].clone();
1164 self.cursor += 1;
1165 return Ok(Some(row));
1166 }
1167 Ok(None)
1168 }
1169}
1170
1171struct SetPropertyOp {
1172 input: Box<dyn Operator>,
1173 assignments: Vec<SetAssignment>,
1174}
1175
1176impl SetPropertyOp {
1177 fn new(input: Box<dyn Operator>, assignments: Vec<SetAssignment>) -> Self {
1178 Self { input, assignments }
1179 }
1180}
1181
1182impl Operator for SetPropertyOp {
1183 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1184 match self.input.next(ctx)? {
1185 None => Ok(None),
1186 Some(mut row) => {
1187 enum Action {
1189 SetKey {
1190 var: String,
1191 key: String,
1192 prop: Property,
1193 },
1194 AddLabels {
1195 var: String,
1196 labels: Vec<String>,
1197 },
1198 Replace {
1199 var: String,
1200 props: Vec<(String, Property)>,
1201 },
1202 Merge {
1203 var: String,
1204 props: Vec<(String, Property)>,
1205 },
1206 }
1207 let mut actions: Vec<Action> = Vec::with_capacity(self.assignments.len());
1208 for a in &self.assignments {
1209 match a {
1210 SetAssignment::Property { var, key, value } => {
1211 let evaluated = eval_expr(value, &ctx.eval_ctx(&row))?;
1212 let prop = value_to_property(evaluated)?;
1213 actions.push(Action::SetKey {
1214 var: var.clone(),
1215 key: key.clone(),
1216 prop,
1217 });
1218 }
1219 SetAssignment::Labels { var, labels } => {
1220 actions.push(Action::AddLabels {
1221 var: var.clone(),
1222 labels: labels.clone(),
1223 });
1224 }
1225 SetAssignment::Replace { var, properties } => {
1226 let props = properties
1231 .iter()
1232 .map(|(k, expr)| {
1233 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1234 Ok((k.clone(), value_to_property(v)?))
1235 })
1236 .collect::<Result<Vec<(String, Property)>>>()?;
1237 actions.push(Action::Replace {
1238 var: var.clone(),
1239 props,
1240 });
1241 }
1242 SetAssignment::Merge { var, properties } => {
1243 let props = properties
1244 .iter()
1245 .map(|(k, expr)| {
1246 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
1247 Ok((k.clone(), value_to_property(v)?))
1248 })
1249 .collect::<Result<Vec<(String, Property)>>>()?;
1250 actions.push(Action::Merge {
1251 var: var.clone(),
1252 props,
1253 });
1254 }
1255 SetAssignment::ReplaceFromExpr {
1256 var,
1257 source,
1258 replace,
1259 } => {
1260 let v = eval_expr(source, &ctx.eval_ctx(&row))?;
1261 let props = extract_property_map(&v)?;
1262 if *replace {
1263 actions.push(Action::Replace {
1264 var: var.clone(),
1265 props,
1266 });
1267 } else {
1268 actions.push(Action::Merge {
1269 var: var.clone(),
1270 props,
1271 });
1272 }
1273 }
1274 }
1275 }
1276
1277 let mut updated_nodes: HashSet<String> = HashSet::new();
1279 let mut updated_edges: HashSet<String> = HashSet::new();
1280 for action in actions {
1281 match action {
1282 Action::SetKey { var, key, prop } => match row.get_mut(&var) {
1283 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1287 continue
1288 }
1289 Some(Value::Node(n)) => {
1292 if matches!(prop, Property::Null) {
1293 n.properties.remove(&key);
1294 } else {
1295 n.properties.insert(key, prop);
1296 }
1297 updated_nodes.insert(var);
1298 }
1299 Some(Value::Edge(e)) => {
1300 if matches!(prop, Property::Null) {
1301 e.properties.remove(&key);
1302 } else {
1303 e.properties.insert(key, prop);
1304 }
1305 updated_edges.insert(var);
1306 }
1307 _ => return Err(Error::UnboundVariable(var)),
1308 },
1309 Action::AddLabels { var, labels } => match row.get_mut(&var) {
1310 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1311 continue
1312 }
1313 Some(Value::Node(n)) => {
1314 for label in labels {
1315 if !n.labels.contains(&label) {
1316 n.labels.push(label);
1317 }
1318 }
1319 updated_nodes.insert(var);
1320 }
1321 _ => return Err(Error::UnboundVariable(var)),
1322 },
1323 Action::Replace { var, props } => match row.get_mut(&var) {
1324 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1325 continue
1326 }
1327 Some(Value::Node(n)) => {
1328 n.properties.clear();
1329 for (k, v) in props {
1330 if !matches!(v, Property::Null) {
1331 n.properties.insert(k, v);
1332 }
1333 }
1334 updated_nodes.insert(var);
1335 }
1336 Some(Value::Edge(e)) => {
1337 e.properties.clear();
1338 for (k, v) in props {
1339 if !matches!(v, Property::Null) {
1340 e.properties.insert(k, v);
1341 }
1342 }
1343 updated_edges.insert(var);
1344 }
1345 _ => return Err(Error::UnboundVariable(var)),
1346 },
1347 Action::Merge { var, props } => match row.get_mut(&var) {
1348 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1349 continue
1350 }
1351 Some(Value::Node(n)) => {
1352 for (k, v) in props {
1353 if matches!(v, Property::Null) {
1354 n.properties.remove(&k);
1355 } else {
1356 n.properties.insert(k, v);
1357 }
1358 }
1359 updated_nodes.insert(var);
1360 }
1361 Some(Value::Edge(e)) => {
1362 for (k, v) in props {
1363 if matches!(v, Property::Null) {
1364 e.properties.remove(&k);
1365 } else {
1366 e.properties.insert(k, v);
1367 }
1368 }
1369 updated_edges.insert(var);
1370 }
1371 _ => return Err(Error::UnboundVariable(var)),
1372 },
1373 }
1374 }
1375
1376 for var in &updated_nodes {
1378 if let Some(Value::Node(n)) = row.get(var) {
1379 ctx.writer.put_node(n)?;
1380 }
1381 }
1382 for var in &updated_edges {
1383 if let Some(Value::Edge(e)) = row.get(var) {
1384 ctx.writer.put_edge(e)?;
1385 }
1386 }
1387
1388 Ok(Some(row))
1389 }
1390 }
1391 }
1392}
1393
1394struct RemoveOp {
1395 input: Box<dyn Operator>,
1396 items: Vec<RemoveSpec>,
1397}
1398
1399impl RemoveOp {
1400 fn new(input: Box<dyn Operator>, items: Vec<RemoveSpec>) -> Self {
1401 Self { input, items }
1402 }
1403}
1404
1405impl Operator for RemoveOp {
1406 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1407 match self.input.next(ctx)? {
1408 None => Ok(None),
1409 Some(mut row) => {
1410 let mut updated_nodes: HashSet<String> = HashSet::new();
1411 let mut updated_edges: HashSet<String> = HashSet::new();
1412 for item in &self.items {
1413 match item {
1414 RemoveSpec::Property { var, key } => match row.get_mut(var) {
1415 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1418 continue
1419 }
1420 Some(Value::Node(n)) => {
1421 n.properties.remove(key);
1422 updated_nodes.insert(var.clone());
1423 }
1424 Some(Value::Edge(e)) => {
1425 e.properties.remove(key);
1426 updated_edges.insert(var.clone());
1427 }
1428 _ => return Err(Error::UnboundVariable(var.clone())),
1429 },
1430 RemoveSpec::Labels { var, labels } => match row.get_mut(var) {
1431 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
1432 continue
1433 }
1434 Some(Value::Node(n)) => {
1435 n.labels.retain(|l| !labels.contains(l));
1436 updated_nodes.insert(var.clone());
1437 }
1438 _ => return Err(Error::UnboundVariable(var.clone())),
1439 },
1440 }
1441 }
1442 for var in &updated_nodes {
1443 if let Some(Value::Node(n)) = row.get(var) {
1444 ctx.writer.put_node(n)?;
1445 }
1446 }
1447 for var in &updated_edges {
1448 if let Some(Value::Edge(e)) = row.get(var) {
1449 ctx.writer.put_edge(e)?;
1450 }
1451 }
1452 Ok(Some(row))
1453 }
1454 }
1455 }
1456}
1457
1458struct LoadCsvOp {
1459 input: Option<Box<dyn Operator>>,
1460 path_expr: Expr,
1461 var: String,
1462 with_headers: bool,
1463 rows: Option<Vec<Value>>,
1464 cursor: usize,
1465}
1466
1467impl LoadCsvOp {
1468 fn new(
1469 input: Option<Box<dyn Operator>>,
1470 path_expr: Expr,
1471 var: String,
1472 with_headers: bool,
1473 ) -> Self {
1474 Self {
1475 input,
1476 path_expr,
1477 var,
1478 with_headers,
1479 rows: None,
1480 cursor: 0,
1481 }
1482 }
1483
1484 fn load(&mut self, ctx: &ExecCtx, base_row: &Row) -> Result<()> {
1485 let ectx = ctx.eval_ctx(base_row);
1486 let path_val = eval_expr(&self.path_expr, &ectx)?;
1487 let path = match path_val {
1488 Value::Property(Property::String(s)) => s,
1489 _ => return Err(Error::TypeMismatch),
1490 };
1491 let content = std::fs::read_to_string(&path).map_err(|e| {
1492 Error::Unsupported(format!("LOAD CSV: cannot read file '{}': {}", path, e))
1493 })?;
1494 let mut lines = content.lines();
1495 let headers: Option<Vec<String>> = if self.with_headers {
1496 lines
1497 .next()
1498 .map(|h| h.split(',').map(|s| s.trim().to_string()).collect())
1499 } else {
1500 None
1501 };
1502 let mut csv_rows = Vec::new();
1503 for line in lines {
1504 if line.trim().is_empty() {
1505 continue;
1506 }
1507 let fields: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
1508 if let Some(hdrs) = &headers {
1509 let mut map = std::collections::HashMap::new();
1510 for (i, h) in hdrs.iter().enumerate() {
1511 let val = fields.get(i).cloned().unwrap_or_default();
1512 map.insert(h.clone(), Property::String(val));
1513 }
1514 csv_rows.push(Value::Property(Property::Map(map)));
1515 } else {
1516 let list: Vec<Value> = fields
1517 .into_iter()
1518 .map(|f| Value::Property(Property::String(f)))
1519 .collect();
1520 csv_rows.push(Value::List(list));
1521 }
1522 }
1523 self.rows = Some(csv_rows);
1524 self.cursor = 0;
1525 Ok(())
1526 }
1527}
1528
1529impl Operator for LoadCsvOp {
1530 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1531 if self.rows.is_none() {
1532 let base = if let Some(input) = &mut self.input {
1533 match input.next(ctx)? {
1534 Some(r) => r,
1535 None => return Ok(None),
1536 }
1537 } else {
1538 Row::new()
1539 };
1540 self.load(ctx, &base)?;
1541 }
1542 let rows = self.rows.as_ref().unwrap();
1543 if self.cursor < rows.len() {
1544 let val = rows[self.cursor].clone();
1545 self.cursor += 1;
1546 let mut row = Row::new();
1547 row.insert(self.var.clone(), val);
1548 Ok(Some(row))
1549 } else {
1550 Ok(None)
1551 }
1552 }
1553}
1554
1555struct ForeachOp {
1556 input: Box<dyn Operator>,
1557 var: String,
1558 list_expr: Expr,
1559 set_assignments: Vec<SetAssignment>,
1560 remove_items: Vec<RemoveSpec>,
1561}
1562
1563impl ForeachOp {
1564 fn new(
1565 input: Box<dyn Operator>,
1566 var: String,
1567 list_expr: Expr,
1568 set_assignments: Vec<SetAssignment>,
1569 remove_items: Vec<RemoveSpec>,
1570 ) -> Self {
1571 Self {
1572 input,
1573 var,
1574 list_expr,
1575 set_assignments,
1576 remove_items,
1577 }
1578 }
1579}
1580
1581impl Operator for ForeachOp {
1582 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1583 let Some(row) = self.input.next(ctx)? else {
1584 return Ok(None);
1585 };
1586 let ectx = ctx.eval_ctx(&row);
1587 let list_val = eval_expr(&self.list_expr, &ectx)?;
1588 let items = match list_val {
1589 Value::List(items) => items,
1590 Value::Property(Property::List(props)) => {
1591 props.into_iter().map(Value::Property).collect()
1592 }
1593 Value::Null | Value::Property(Property::Null) => Vec::new(),
1594 _ => return Err(Error::TypeMismatch),
1595 };
1596 for item in items {
1597 let mut scratch = row.clone();
1598 scratch.insert(self.var.clone(), item);
1599 for a in &self.set_assignments {
1600 match a {
1601 SetAssignment::Property { var, key, value } => {
1602 let evaluated = eval_expr(value, &ctx.eval_ctx(&scratch))?;
1603 let prop = value_to_property(evaluated)?;
1604 match scratch.get_mut(var) {
1605 Some(Value::Node(n)) => {
1606 n.properties.insert(key.clone(), prop);
1607 }
1608 Some(Value::Edge(e)) => {
1609 e.properties.insert(key.clone(), prop);
1610 }
1611 _ => return Err(Error::UnboundVariable(var.clone())),
1612 }
1613 }
1614 SetAssignment::Labels { var, labels } => {
1615 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1616 for l in labels {
1617 if !n.labels.contains(l) {
1618 n.labels.push(l.clone());
1619 }
1620 }
1621 }
1622 }
1623 _ => {}
1624 }
1625 }
1626 for ri in &self.remove_items {
1627 match ri {
1628 RemoveSpec::Property { var, key } => {
1629 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1630 n.properties.remove(key);
1631 } else if let Some(Value::Edge(e)) = scratch.get_mut(var) {
1632 e.properties.remove(key);
1633 }
1634 }
1635 RemoveSpec::Labels { var, labels } => {
1636 if let Some(Value::Node(n)) = scratch.get_mut(var) {
1637 n.labels.retain(|l| !labels.contains(l));
1638 }
1639 }
1640 }
1641 }
1642 for (_, val) in scratch.iter() {
1644 match val {
1645 Value::Node(n) => ctx.writer.put_node(n)?,
1646 Value::Edge(e) => ctx.writer.put_edge(e)?,
1647 _ => {}
1648 }
1649 }
1650 }
1651 Ok(Some(row))
1652 }
1653}
1654
1655struct SeedRowOp {
1656 done: bool,
1657}
1658
1659impl Operator for SeedRowOp {
1660 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1661 if self.done {
1662 return Ok(None);
1663 }
1664 self.done = true;
1665 Ok(Some(Row::new()))
1666 }
1667}
1668
1669struct SeededRowOp {
1670 row: Option<Row>,
1671}
1672
1673impl Operator for SeededRowOp {
1674 fn next(&mut self, _ctx: &ExecCtx) -> Result<Option<Row>> {
1675 Ok(self.row.take())
1676 }
1677}
1678
1679struct CallSubqueryOp {
1680 input: Box<dyn Operator>,
1681 body_plan: LogicalPlan,
1682 pending: Vec<Row>,
1683 pending_idx: usize,
1684}
1685
1686impl CallSubqueryOp {
1687 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan) -> Self {
1688 Self {
1689 input,
1690 body_plan,
1691 pending: Vec::new(),
1692 pending_idx: 0,
1693 }
1694 }
1695}
1696
1697impl Operator for CallSubqueryOp {
1698 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1699 loop {
1700 if self.pending_idx < self.pending.len() {
1701 let row = self.pending[self.pending_idx].clone();
1702 self.pending_idx += 1;
1703 return Ok(Some(row));
1704 }
1705 let outer_row = match self.input.next(ctx)? {
1706 Some(r) => r,
1707 None => return Ok(None),
1708 };
1709 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1710 let mut results = Vec::new();
1711 while let Some(body_row) = body_op.next(ctx)? {
1712 let mut merged = outer_row.clone();
1713 for (k, v) in body_row {
1714 merged.insert(k, v);
1715 }
1716 results.push(merged);
1717 }
1718 if results.is_empty() {
1719 continue;
1720 }
1721 self.pending = results;
1722 self.pending_idx = 0;
1723 }
1724 }
1725}
1726
1727struct OptionalApplyOp {
1734 input: Box<dyn Operator>,
1735 body_plan: LogicalPlan,
1736 null_vars: Vec<String>,
1737 pending: Vec<Row>,
1738 pending_idx: usize,
1739}
1740
1741impl OptionalApplyOp {
1742 fn new(input: Box<dyn Operator>, body_plan: LogicalPlan, null_vars: Vec<String>) -> Self {
1743 Self {
1744 input,
1745 body_plan,
1746 null_vars,
1747 pending: Vec::new(),
1748 pending_idx: 0,
1749 }
1750 }
1751}
1752
1753impl Operator for OptionalApplyOp {
1754 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
1755 loop {
1756 if self.pending_idx < self.pending.len() {
1757 let row = self.pending[self.pending_idx].clone();
1758 self.pending_idx += 1;
1759 return Ok(Some(row));
1760 }
1761 let outer_row = match self.input.next(ctx)? {
1762 Some(r) => r,
1763 None => return Ok(None),
1764 };
1765 let mut body_op = build_op_inner(&self.body_plan, Some(&outer_row));
1766 let mut stacked: Vec<&Row> = Vec::with_capacity(ctx.outer_rows.len() + 1);
1770 stacked.push(&outer_row);
1771 stacked.extend_from_slice(ctx.outer_rows);
1772 let inner_ctx = ExecCtx {
1773 store: ctx.store,
1774 writer: ctx.writer,
1775 params: ctx.params,
1776 procedures: ctx.procedures,
1777 outer_rows: &stacked,
1778 tombstones: ctx.tombstones,
1779 };
1780 let mut results = Vec::new();
1781 while let Some(body_row) = body_op.next(&inner_ctx)? {
1782 let mut merged = outer_row.clone();
1783 for (k, v) in body_row {
1784 merged.insert(k, v);
1785 }
1786 results.push(merged);
1787 }
1788 if results.is_empty() {
1789 let mut fallback = outer_row;
1790 for v in &self.null_vars {
1791 fallback.insert(v.clone(), Value::Null);
1792 }
1793 return Ok(Some(fallback));
1794 }
1795 self.pending = results;
1796 self.pending_idx = 0;
1797 }
1798 }
1799}
1800
1801struct ProcedureCallOp {
1820 input: Option<Box<dyn Operator>>,
1821 qualified_name: Vec<String>,
1822 args: Option<Vec<Expr>>,
1823 yield_spec: Option<YieldSpec>,
1824 standalone: bool,
1825 buffered: Vec<Row>,
1826 buffered_idx: usize,
1827 done: bool,
1830}
1831
1832impl ProcedureCallOp {
1833 fn new(
1834 input: Option<Box<dyn Operator>>,
1835 qualified_name: Vec<String>,
1836 args: Option<Vec<Expr>>,
1837 yield_spec: Option<YieldSpec>,
1838 standalone: bool,
1839 ) -> Self {
1840 Self {
1841 input,
1842 qualified_name,
1843 args,
1844 yield_spec,
1845 standalone,
1846 buffered: Vec::new(),
1847 buffered_idx: 0,
1848 done: false,
1849 }
1850 }
1851
1852 fn resolve_projection(
1858 &self,
1859 proc: &crate::procedures::Procedure,
1860 ) -> Result<Vec<(String, String)>> {
1861 match &self.yield_spec {
1862 None => {
1863 if !self.standalone {
1864 if proc.outputs.is_empty() {
1873 return Ok(Vec::new());
1874 }
1875 return Err(Error::Procedure(format!(
1876 "procedure '{}' has outputs but no YIELD clause",
1877 self.qualified_name.join(".")
1878 )));
1879 }
1880 Ok(proc
1881 .outputs
1882 .iter()
1883 .map(|o| (o.name.clone(), o.name.clone()))
1884 .collect())
1885 }
1886 Some(YieldSpec::Star) => {
1887 if !self.standalone {
1888 return Err(Error::Procedure(
1889 "YIELD * is only allowed on standalone CALL".into(),
1890 ));
1891 }
1892 Ok(proc
1893 .outputs
1894 .iter()
1895 .map(|o| (o.name.clone(), o.name.clone()))
1896 .collect())
1897 }
1898 Some(YieldSpec::Items(items)) => {
1899 let mut projection = Vec::with_capacity(items.len());
1900 let mut seen_aliases: std::collections::HashSet<String> =
1901 std::collections::HashSet::new();
1902 for yi in items {
1903 if !proc.outputs.iter().any(|o| o.name == yi.column) {
1904 return Err(Error::Procedure(format!(
1905 "procedure '{}' has no output column '{}'",
1906 self.qualified_name.join("."),
1907 yi.column
1908 )));
1909 }
1910 let alias = yi.alias.clone().unwrap_or_else(|| yi.column.clone());
1911 if !seen_aliases.insert(alias.clone()) {
1912 return Err(Error::Procedure(format!(
1913 "variable '{alias}' already bound by YIELD"
1914 )));
1915 }
1916 projection.push((yi.column.clone(), alias));
1917 }
1918 Ok(projection)
1919 }
1920 }
1921 }
1922
1923 fn evaluate_args(
1930 &self,
1931 ctx: &ExecCtx,
1932 row: &Row,
1933 proc: &crate::procedures::Procedure,
1934 ) -> Result<Vec<Value>> {
1935 match &self.args {
1936 Some(exprs) => {
1937 if exprs.len() != proc.inputs.len() {
1938 return Err(Error::Procedure(format!(
1939 "procedure '{}' expects {} argument(s), got {}",
1940 self.qualified_name.join("."),
1941 proc.inputs.len(),
1942 exprs.len()
1943 )));
1944 }
1945 let eval_ctx = ctx.eval_ctx(row);
1946 let mut values = Vec::with_capacity(exprs.len());
1947 for (expr, spec) in exprs.iter().zip(proc.inputs.iter()) {
1948 let v = eval_expr(expr, &eval_ctx)?;
1949 if !spec.ty.accepts(&v) {
1950 return Err(Error::Procedure(format!(
1951 "argument '{}' has wrong type for procedure '{}'",
1952 spec.name,
1953 self.qualified_name.join(".")
1954 )));
1955 }
1956 values.push(coerce_arg(v, spec.ty));
1957 }
1958 Ok(values)
1959 }
1960 None => {
1961 if !self.standalone {
1963 return Err(Error::Procedure(
1964 "in-query CALL requires explicit argument list".into(),
1965 ));
1966 }
1967 let mut values = Vec::with_capacity(proc.inputs.len());
1968 for spec in &proc.inputs {
1969 let v = ctx.params.get(&spec.name).cloned().ok_or_else(|| {
1970 Error::Procedure(format!(
1971 "missing parameter ${} for procedure '{}'",
1972 spec.name,
1973 self.qualified_name.join(".")
1974 ))
1975 })?;
1976 if !spec.ty.accepts(&v) {
1977 return Err(Error::Procedure(format!(
1978 "parameter '{}' has wrong type",
1979 spec.name
1980 )));
1981 }
1982 values.push(coerce_arg(v, spec.ty));
1983 }
1984 Ok(values)
1985 }
1986 }
1987 }
1988
1989 fn invoke_once(
1994 &self,
1995 ctx: &ExecCtx,
1996 input_row: &Row,
1997 proc: &crate::procedures::Procedure,
1998 projection: &[(String, String)],
1999 out: &mut Vec<Row>,
2000 ) -> Result<()> {
2001 if proc.outputs.is_empty() {
2005 if !self.standalone {
2006 out.push(input_row.clone());
2007 }
2008 return Ok(());
2009 }
2010 let args = self.evaluate_args(ctx, input_row, proc)?;
2011 let rows = proc.resolve_rows(ctx.store)?;
2012 for proc_row in &rows {
2013 if !proc.row_matches(proc_row, &args) {
2014 continue;
2015 }
2016 let mut merged = if self.standalone {
2017 Row::new()
2018 } else {
2019 input_row.clone()
2020 };
2021 for (src, alias) in projection {
2022 let v = proc_row.get(src).cloned().unwrap_or(Value::Null);
2023 merged.insert(alias.clone(), v);
2024 }
2025 out.push(merged);
2026 }
2027 Ok(())
2028 }
2029}
2030
2031fn coerce_arg(v: Value, ty: crate::procedures::ProcType) -> Value {
2036 use crate::procedures::ProcType;
2037 if matches!(ty, ProcType::Float) {
2038 if let Value::Property(Property::Int64(n)) = v {
2039 return Value::Property(Property::Float64(n as f64));
2040 }
2041 }
2042 v
2043}
2044
2045impl Operator for ProcedureCallOp {
2046 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2047 loop {
2048 if self.buffered_idx < self.buffered.len() {
2049 let row = self.buffered[self.buffered_idx].clone();
2050 self.buffered_idx += 1;
2051 return Ok(Some(row));
2052 }
2053 self.buffered.clear();
2054 self.buffered_idx = 0;
2055
2056 let proc = match ctx.procedures.get(&self.qualified_name) {
2057 Some(p) => p,
2058 None => {
2059 return Err(Error::Procedure(format!(
2060 "procedure '{}' not found",
2061 self.qualified_name.join(".")
2062 )));
2063 }
2064 };
2065 let projection = self.resolve_projection(proc)?;
2066
2067 let input_row = match &mut self.input {
2068 Some(inp) => match inp.next(ctx)? {
2069 Some(r) => r,
2070 None => return Ok(None),
2071 },
2072 None => {
2073 if self.done {
2074 return Ok(None);
2075 }
2076 self.done = true;
2077 Row::new()
2078 }
2079 };
2080
2081 let mut produced = Vec::new();
2082 self.invoke_once(ctx, &input_row, proc, &projection, &mut produced)?;
2083 if produced.is_empty() {
2084 if self.input.is_some() {
2085 continue;
2086 }
2087 return Ok(None);
2088 }
2089 self.buffered = produced;
2090 }
2091 }
2092}
2093
2094fn extract_property_map(v: &Value) -> Result<Vec<(String, Property)>> {
2100 match v {
2101 Value::Node(n) => Ok(n.properties.clone().into_iter().collect()),
2102 Value::Edge(e) => Ok(e.properties.clone().into_iter().collect()),
2103 Value::Map(pairs) => pairs
2104 .iter()
2105 .map(|(k, vv)| Ok((k.clone(), value_to_property(vv.clone())?)))
2106 .collect(),
2107 Value::Property(Property::Map(entries)) => Ok(entries
2108 .iter()
2109 .map(|(k, p)| (k.clone(), p.clone()))
2110 .collect()),
2111 Value::Null | Value::Property(Property::Null) => Ok(Vec::new()),
2112 _ => Err(Error::InvalidSetValue),
2113 }
2114}
2115
2116fn value_to_property(v: Value) -> Result<Property> {
2117 match v {
2118 Value::Property(Property::Map(_)) => Err(Error::InvalidSetValue),
2119 Value::Property(p) => Ok(p),
2120 Value::Null => Ok(Property::Null),
2121 Value::List(items) => {
2122 let props: Vec<Property> = items
2123 .into_iter()
2124 .map(value_to_property)
2125 .collect::<Result<_>>()?;
2126 Ok(Property::List(props))
2127 }
2128 Value::Map(_) | Value::Node(_) | Value::Edge(_) | Value::Path { .. } => {
2132 Err(Error::InvalidSetValue)
2133 }
2134 }
2135}
2136
2137struct NodeScanAllOp {
2138 var: String,
2139 ids: Option<Vec<NodeId>>,
2140 cursor: usize,
2141}
2142
2143impl NodeScanAllOp {
2144 fn new(var: String) -> Self {
2145 Self {
2146 var,
2147 ids: None,
2148 cursor: 0,
2149 }
2150 }
2151}
2152
2153impl Operator for NodeScanAllOp {
2154 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2155 if self.ids.is_none() {
2156 self.ids = Some(ctx.store.all_node_ids()?);
2157 }
2158 let ids = self.ids.as_ref().unwrap();
2159 while self.cursor < ids.len() {
2160 let id = ids[self.cursor];
2161 self.cursor += 1;
2162 if let Some(node) = ctx.store.get_node(id)? {
2163 let mut row = Row::new();
2164 row.insert(self.var.clone(), Value::Node(node));
2165 return Ok(Some(row));
2166 }
2167 }
2168 Ok(None)
2169 }
2170}
2171
2172struct NodeScanByLabelsOp {
2173 var: String,
2174 labels: Vec<String>,
2175 ids: Option<Vec<NodeId>>,
2176 cursor: usize,
2177}
2178
2179impl NodeScanByLabelsOp {
2180 fn new(var: String, labels: Vec<String>) -> Self {
2181 Self {
2182 var,
2183 labels,
2184 ids: None,
2185 cursor: 0,
2186 }
2187 }
2188}
2189
2190impl Operator for NodeScanByLabelsOp {
2191 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2192 if self.ids.is_none() {
2193 let primary = self
2195 .labels
2196 .first()
2197 .expect("NodeScanByLabels must have at least one label");
2198 self.ids = Some(ctx.store.nodes_by_label(primary)?);
2199 }
2200 let ids = self.ids.as_ref().unwrap();
2201 while self.cursor < ids.len() {
2202 let id = ids[self.cursor];
2203 self.cursor += 1;
2204 if let Some(node) = ctx.store.get_node(id)? {
2205 if has_all_labels(&node, &self.labels) {
2206 let mut row = Row::new();
2207 row.insert(self.var.clone(), Value::Node(node));
2208 return Ok(Some(row));
2209 }
2210 }
2211 }
2212 Ok(None)
2213 }
2214}
2215
2216fn has_all_labels(node: &Node, labels: &[String]) -> bool {
2217 labels.iter().all(|l| node.labels.contains(l))
2218}
2219
2220struct IndexSeekOp {
2232 var: String,
2233 label: String,
2234 property: String,
2235 value_expr: Expr,
2236 results: Option<Vec<NodeId>>,
2237 cursor: usize,
2238}
2239
2240impl IndexSeekOp {
2241 fn new(var: String, label: String, property: String, value_expr: Expr) -> Self {
2242 Self {
2243 var,
2244 label,
2245 property,
2246 value_expr,
2247 results: None,
2248 cursor: 0,
2249 }
2250 }
2251}
2252
2253impl Operator for IndexSeekOp {
2254 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2255 if self.results.is_none() {
2256 let empty = Row::new();
2257 let value = eval_expr(&self.value_expr, &ctx.eval_ctx(&empty))?;
2258 let property = match value {
2259 Value::Property(p) => p,
2260 Value::Null => Property::Null,
2261 Value::Node(_)
2262 | Value::Edge(_)
2263 | Value::List(_)
2264 | Value::Map(_)
2265 | Value::Path { .. } => {
2266 return Err(Error::InvalidSetValue);
2267 }
2268 };
2269 let ids = ctx
2270 .store
2271 .nodes_by_property(&self.label, &self.property, &property)?;
2272 self.results = Some(ids);
2273 }
2274 let ids = self.results.as_ref().unwrap();
2275 while self.cursor < ids.len() {
2276 let id = ids[self.cursor];
2277 self.cursor += 1;
2278 if let Some(node) = ctx.store.get_node(id)? {
2279 let mut row = Row::new();
2280 row.insert(self.var.clone(), Value::Node(node));
2281 return Ok(Some(row));
2282 }
2283 }
2284 Ok(None)
2285 }
2286}
2287
2288fn matches_pattern_props(node: &Node, props: &[(String, Property)]) -> bool {
2289 props.iter().all(|(k, v)| {
2290 node.properties
2291 .get(k)
2292 .map(|stored| stored == v)
2293 .unwrap_or(false)
2294 })
2295}
2296
2297struct MergeNodeOp {
2298 var: String,
2299 labels: Vec<String>,
2300 properties: Vec<(String, Expr)>,
2304 on_create: Vec<SetAssignment>,
2309 on_match: Vec<SetAssignment>,
2313 input: Option<Box<dyn Operator>>,
2320 merged_nodes: Vec<Node>,
2327 merge_done: bool,
2331 current_input_row: Option<Row>,
2335 cursor: usize,
2336}
2337
2338impl MergeNodeOp {
2339 fn new(
2340 input: Option<Box<dyn Operator>>,
2341 var: String,
2342 labels: Vec<String>,
2343 properties: Vec<(String, Expr)>,
2344 on_create: Vec<SetAssignment>,
2345 on_match: Vec<SetAssignment>,
2346 ) -> Self {
2347 Self {
2348 var,
2349 labels,
2350 properties,
2351 on_create,
2352 on_match,
2353 input,
2354 merged_nodes: Vec::new(),
2355 merge_done: false,
2356 current_input_row: None,
2357 cursor: 0,
2358 }
2359 }
2360
2361 fn run_merge_for(&mut self, ctx: &ExecCtx, base: &Row) -> Result<Vec<Node>> {
2373 let resolved_props: Vec<(String, Property)> = self
2374 .properties
2375 .iter()
2376 .map(|(k, expr)| {
2377 let v = eval_expr(expr, &ctx.eval_ctx(base))?;
2378 Ok((k.clone(), value_to_property(v)?))
2379 })
2380 .collect::<Result<Vec<_>>>()?;
2381
2382 let candidate_ids: Vec<NodeId> = if let Some(primary) = self.labels.first() {
2383 ctx.store.nodes_by_label(primary)?
2384 } else {
2385 ctx.store.all_node_ids()?
2386 };
2387 let mut merged_nodes: Vec<Node> = Vec::new();
2388 for id in candidate_ids {
2389 if let Some(node) = ctx.store.get_node(id)? {
2390 if has_all_labels(&node, &self.labels)
2391 && matches_pattern_props(&node, &resolved_props)
2392 {
2393 merged_nodes.push(node);
2394 }
2395 }
2396 }
2397
2398 if merged_nodes.is_empty() {
2399 let mut node = Node::new();
2400 for label in &self.labels {
2401 node.labels.push(label.clone());
2402 }
2403 for (k, prop) in resolved_props {
2404 node.properties.insert(k, prop);
2405 }
2406 apply_merge_actions(&mut node, &self.on_create, &self.var, ctx, base)?;
2407 ctx.writer.put_node(&node)?;
2408 merged_nodes.push(node);
2409 } else if !self.on_match.is_empty() {
2410 for node in merged_nodes.iter_mut() {
2411 apply_merge_actions(node, &self.on_match, &self.var, ctx, base)?;
2412 ctx.writer.put_node(node)?;
2413 }
2414 }
2415 Ok(merged_nodes)
2416 }
2417}
2418
2419impl Operator for MergeNodeOp {
2420 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2421 if self.input.is_none() {
2426 if !self.merge_done {
2427 let empty = Row::new();
2428 let nodes = self.run_merge_for(ctx, &empty)?;
2429 self.merged_nodes = nodes;
2430 self.merge_done = true;
2431 }
2432 if self.cursor < self.merged_nodes.len() {
2433 let node = self.merged_nodes[self.cursor].clone();
2434 self.cursor += 1;
2435 let mut row = Row::new();
2436 row.insert(self.var.clone(), Value::Node(node));
2437 return Ok(Some(row));
2438 }
2439 return Ok(None);
2440 }
2441
2442 loop {
2449 if let Some(base) = self.current_input_row.as_ref() {
2450 if self.cursor < self.merged_nodes.len() {
2451 let node = self.merged_nodes[self.cursor].clone();
2452 self.cursor += 1;
2453 let mut row = base.clone();
2454 row.insert(self.var.clone(), Value::Node(node));
2455 return Ok(Some(row));
2456 }
2457 }
2458 match self.input.as_mut().unwrap().next(ctx)? {
2459 None => return Ok(None),
2460 Some(row) => {
2461 let nodes = self.run_merge_for(ctx, &row)?;
2462 self.merged_nodes = nodes;
2463 self.cursor = 0;
2464 self.current_input_row = Some(row);
2465 }
2466 }
2467 }
2468 }
2469}
2470
2471struct MergeEdgeOp {
2490 input: Box<dyn Operator>,
2491 edge_var: String,
2492 src_var: String,
2493 dst_var: String,
2494 edge_type: String,
2495 undirected: bool,
2496 properties: Vec<(String, Expr)>,
2500 on_create: Vec<SetAssignment>,
2501 on_match: Vec<SetAssignment>,
2502 pending: std::collections::VecDeque<Row>,
2509}
2510
2511impl MergeEdgeOp {
2512 #[allow(clippy::too_many_arguments)]
2513 fn new(
2514 input: Box<dyn Operator>,
2515 edge_var: String,
2516 src_var: String,
2517 dst_var: String,
2518 edge_type: String,
2519 undirected: bool,
2520 properties: Vec<(String, Expr)>,
2521 on_create: Vec<SetAssignment>,
2522 on_match: Vec<SetAssignment>,
2523 ) -> Self {
2524 Self {
2525 input,
2526 edge_var,
2527 src_var,
2528 dst_var,
2529 edge_type,
2530 undirected,
2531 properties,
2532 on_create,
2533 on_match,
2534 pending: std::collections::VecDeque::new(),
2535 }
2536 }
2537}
2538
2539impl Operator for MergeEdgeOp {
2540 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
2541 loop {
2542 if let Some(row) = self.pending.pop_front() {
2543 return Ok(Some(row));
2544 }
2545 let Some(row) = self.input.next(ctx)? else {
2546 return Ok(None);
2547 };
2548 let src_node = match row.get(&self.src_var) {
2553 Some(Value::Node(n)) => n.clone(),
2554 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
2555 };
2556 let dst_node = match row.get(&self.dst_var) {
2557 Some(Value::Node(n)) => n.clone(),
2558 _ => return Err(Error::UnboundVariable(self.dst_var.clone())),
2559 };
2560
2561 let required_props: Vec<(String, Property)> = self
2565 .properties
2566 .iter()
2567 .map(|(k, expr)| {
2568 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
2569 Ok((k.clone(), value_to_property(v)?))
2570 })
2571 .collect::<Result<Vec<_>>>()?;
2572 let edge_matches = |edge: &Edge| -> bool {
2573 required_props.iter().all(|(k, want)| {
2574 edge.properties
2575 .get(k)
2576 .map(|have| have == want)
2577 .unwrap_or(false)
2578 })
2579 };
2580
2581 let mut matched: Vec<Edge> = Vec::new();
2588 for (edge_id, neighbor_id) in ctx.store.outgoing(src_node.id)? {
2589 if neighbor_id != dst_node.id {
2590 continue;
2591 }
2592 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2593 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2594 matched.push(edge);
2595 }
2596 }
2597 }
2598 if self.undirected {
2599 for (edge_id, neighbor_id) in ctx.store.incoming(src_node.id)? {
2600 if neighbor_id != dst_node.id {
2601 continue;
2602 }
2603 if let Some(edge) = ctx.store.get_edge(edge_id)? {
2604 if edge.edge_type == self.edge_type && edge_matches(&edge) {
2605 matched.push(edge);
2606 }
2607 }
2608 }
2609 }
2610
2611 if matched.is_empty() {
2612 let mut new_edge = Edge::new(&self.edge_type, src_node.id, dst_node.id);
2613 for (k, p) in &required_props {
2614 new_edge.properties.insert(k.clone(), p.clone());
2615 }
2616 let mut row_out = row.clone();
2617 apply_merge_edge_actions(
2618 &mut new_edge,
2619 &self.on_create,
2620 &self.edge_var,
2621 ctx,
2622 &mut row_out,
2623 )?;
2624 ctx.writer.put_edge(&new_edge)?;
2625 row_out.insert(self.edge_var.clone(), Value::Edge(new_edge));
2626 self.pending.push_back(row_out);
2627 } else {
2628 for mut existing in matched {
2629 let mut row_out = row.clone();
2630 if !self.on_match.is_empty() {
2631 apply_merge_edge_actions(
2632 &mut existing,
2633 &self.on_match,
2634 &self.edge_var,
2635 ctx,
2636 &mut row_out,
2637 )?;
2638 ctx.writer.put_edge(&existing)?;
2639 }
2640 row_out.insert(self.edge_var.clone(), Value::Edge(existing));
2641 self.pending.push_back(row_out);
2642 }
2643 }
2644 }
2645 }
2646}
2647
2648fn apply_merge_edge_actions(
2658 edge: &mut Edge,
2659 actions: &[SetAssignment],
2660 var: &str,
2661 exec_ctx: &ExecCtx,
2662 outer: &mut Row,
2663) -> Result<()> {
2664 if actions.is_empty() {
2665 return Ok(());
2666 }
2667 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2670 for action in actions {
2671 match action {
2672 SetAssignment::Property {
2673 var: target,
2674 key,
2675 value,
2676 } => {
2677 let sub_ctx = exec_ctx.eval_ctx(outer);
2678 let evaluated = eval_expr(value, &sub_ctx)?;
2679 let prop = value_to_property(evaluated)?;
2680 if target == var {
2681 if matches!(prop, Property::Null) {
2682 edge.properties.remove(key);
2683 } else {
2684 edge.properties.insert(key.clone(), prop);
2685 }
2686 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2687 } else {
2688 apply_set_prop_to_outer(outer, exec_ctx, target, key, prop)?;
2689 }
2690 }
2691 SetAssignment::Merge {
2692 var: target,
2693 properties,
2694 } => {
2695 let sub_ctx = exec_ctx.eval_ctx(outer);
2696 let resolved: Vec<(String, Property)> = properties
2697 .iter()
2698 .map(|(k, expr)| {
2699 let v = eval_expr(expr, &sub_ctx)?;
2700 Ok((k.clone(), value_to_property(v)?))
2701 })
2702 .collect::<Result<Vec<_>>>()?;
2703 if target == var {
2704 for (k, p) in resolved {
2705 edge.properties.insert(k, p);
2706 }
2707 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2708 } else {
2709 apply_set_map_to_outer(outer, exec_ctx, target, resolved, false)?;
2710 }
2711 }
2712 SetAssignment::Replace {
2713 var: target,
2714 properties,
2715 } => {
2716 let sub_ctx = exec_ctx.eval_ctx(outer);
2717 let resolved: Vec<(String, Property)> = properties
2718 .iter()
2719 .map(|(k, expr)| {
2720 let v = eval_expr(expr, &sub_ctx)?;
2721 Ok((k.clone(), value_to_property(v)?))
2722 })
2723 .collect::<Result<Vec<_>>>()?;
2724 if target == var {
2725 edge.properties.clear();
2726 for (k, p) in resolved {
2727 edge.properties.insert(k, p);
2728 }
2729 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2730 } else {
2731 apply_set_map_to_outer(outer, exec_ctx, target, resolved, true)?;
2732 }
2733 }
2734 SetAssignment::Labels {
2735 var: target,
2736 labels,
2737 } => {
2738 if target == var {
2739 return Err(Error::UnboundVariable(target.clone()));
2741 }
2742 apply_set_labels_to_outer(outer, exec_ctx, target, labels)?;
2743 }
2744 SetAssignment::ReplaceFromExpr {
2745 var: target,
2746 source,
2747 replace,
2748 } => {
2749 let sub_ctx = exec_ctx.eval_ctx(outer);
2750 let v = eval_expr(source, &sub_ctx)?;
2751 let props = extract_property_map(&v)?;
2752 if target == var {
2753 if *replace {
2754 edge.properties.clear();
2755 }
2756 for (k, p) in props {
2757 edge.properties.insert(k, p);
2758 }
2759 outer.insert(var.to_string(), Value::Edge(edge.clone()));
2760 } else {
2761 apply_set_map_to_outer(outer, exec_ctx, target, props, *replace)?;
2762 }
2763 }
2764 }
2765 }
2766 Ok(())
2767}
2768
2769fn apply_set_prop_to_outer(
2774 outer: &mut Row,
2775 exec_ctx: &ExecCtx,
2776 target: &str,
2777 key: &str,
2778 prop: Property,
2779) -> Result<()> {
2780 match outer.get_mut(target) {
2781 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => {
2782 return Ok(());
2785 }
2786 Some(Value::Node(n)) => {
2787 if matches!(prop, Property::Null) {
2788 n.properties.remove(key);
2789 } else {
2790 n.properties.insert(key.to_string(), prop);
2791 }
2792 exec_ctx.writer.put_node(n)?;
2793 }
2794 Some(Value::Edge(e)) => {
2795 if matches!(prop, Property::Null) {
2796 e.properties.remove(key);
2797 } else {
2798 e.properties.insert(key.to_string(), prop);
2799 }
2800 exec_ctx.writer.put_edge(e)?;
2801 }
2802 _ => return Err(Error::UnboundVariable(target.to_string())),
2803 }
2804 Ok(())
2805}
2806
2807fn apply_set_map_to_outer(
2811 outer: &mut Row,
2812 exec_ctx: &ExecCtx,
2813 target: &str,
2814 props: Vec<(String, Property)>,
2815 replace: bool,
2816) -> Result<()> {
2817 match outer.get_mut(target) {
2818 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2819 Some(Value::Node(n)) => {
2820 if replace {
2821 n.properties.clear();
2822 }
2823 for (k, p) in props {
2824 if replace || !matches!(p, Property::Null) {
2825 n.properties.insert(k, p);
2826 } else {
2827 n.properties.remove(&k);
2828 }
2829 }
2830 exec_ctx.writer.put_node(n)?;
2831 Ok(())
2832 }
2833 Some(Value::Edge(e)) => {
2834 if replace {
2835 e.properties.clear();
2836 }
2837 for (k, p) in props {
2838 if replace || !matches!(p, Property::Null) {
2839 e.properties.insert(k, p);
2840 } else {
2841 e.properties.remove(&k);
2842 }
2843 }
2844 exec_ctx.writer.put_edge(e)?;
2845 Ok(())
2846 }
2847 _ => Err(Error::UnboundVariable(target.to_string())),
2848 }
2849}
2850
2851fn apply_set_labels_to_outer(
2853 outer: &mut Row,
2854 exec_ctx: &ExecCtx,
2855 target: &str,
2856 labels: &[String],
2857) -> Result<()> {
2858 match outer.get_mut(target) {
2859 Some(Value::Null) | Some(Value::Property(Property::Null)) | None => Ok(()),
2860 Some(Value::Node(n)) => {
2861 for label in labels {
2862 if !n.labels.contains(label) {
2863 n.labels.push(label.clone());
2864 }
2865 }
2866 exec_ctx.writer.put_node(n)?;
2867 Ok(())
2868 }
2869 _ => Err(Error::UnboundVariable(target.to_string())),
2870 }
2871}
2872
2873fn apply_merge_actions(
2882 node: &mut Node,
2883 actions: &[SetAssignment],
2884 var: &str,
2885 exec_ctx: &ExecCtx,
2886 base_row: &Row,
2887) -> Result<()> {
2888 if actions.is_empty() {
2889 return Ok(());
2890 }
2891 let mut row = base_row.clone();
2894 row.insert(var.to_string(), Value::Node(node.clone()));
2895 for action in actions {
2896 let sub_ctx = exec_ctx.eval_ctx(&row);
2897 match action {
2898 SetAssignment::Property {
2899 var: target,
2900 key,
2901 value,
2902 } => {
2903 if target != var {
2904 return Err(Error::UnboundVariable(target.clone()));
2905 }
2906 let evaluated = eval_expr(value, &sub_ctx)?;
2907 let prop = value_to_property(evaluated)?;
2908 node.properties.insert(key.clone(), prop);
2909 row.insert(var.to_string(), Value::Node(node.clone()));
2910 }
2911 SetAssignment::Labels {
2912 var: target,
2913 labels,
2914 } => {
2915 if target != var {
2916 return Err(Error::UnboundVariable(target.clone()));
2917 }
2918 for label in labels {
2919 if !node.labels.contains(label) {
2920 node.labels.push(label.clone());
2921 }
2922 }
2923 row.insert(var.to_string(), Value::Node(node.clone()));
2924 }
2925 SetAssignment::Replace {
2926 var: target,
2927 properties,
2928 } => {
2929 if target != var {
2930 return Err(Error::UnboundVariable(target.clone()));
2931 }
2932 let resolved: Vec<(String, Property)> = properties
2933 .iter()
2934 .map(|(k, expr)| {
2935 let v = eval_expr(expr, &sub_ctx)?;
2936 Ok((k.clone(), value_to_property(v)?))
2937 })
2938 .collect::<Result<Vec<_>>>()?;
2939 node.properties.clear();
2940 for (k, p) in resolved {
2941 node.properties.insert(k, p);
2942 }
2943 row.insert(var.to_string(), Value::Node(node.clone()));
2944 }
2945 SetAssignment::Merge {
2946 var: target,
2947 properties,
2948 } => {
2949 if target != var {
2950 return Err(Error::UnboundVariable(target.clone()));
2951 }
2952 let resolved: Vec<(String, Property)> = properties
2953 .iter()
2954 .map(|(k, expr)| {
2955 let v = eval_expr(expr, &sub_ctx)?;
2956 Ok((k.clone(), value_to_property(v)?))
2957 })
2958 .collect::<Result<Vec<_>>>()?;
2959 for (k, p) in resolved {
2960 node.properties.insert(k, p);
2961 }
2962 row.insert(var.to_string(), Value::Node(node.clone()));
2963 }
2964 SetAssignment::ReplaceFromExpr {
2965 var: target,
2966 source,
2967 replace,
2968 } => {
2969 if target != var {
2970 return Err(Error::UnboundVariable(target.clone()));
2971 }
2972 let v = eval_expr(source, &sub_ctx)?;
2973 let props = extract_property_map(&v)?;
2974 if *replace {
2975 node.properties.clear();
2976 }
2977 for (k, p) in props {
2978 node.properties.insert(k, p);
2979 }
2980 row.insert(var.to_string(), Value::Node(node.clone()));
2981 }
2982 }
2983 }
2984 Ok(())
2985}
2986
2987struct EdgeExpandOp {
2988 input: Box<dyn Operator>,
2989 src_var: String,
2990 edge_var: Option<String>,
2991 dst_var: String,
2992 dst_labels: Vec<String>,
2993 edge_properties: Vec<(String, Expr)>,
2994 edge_types: Vec<String>,
2995 direction: Direction,
2996 edge_constraint_var: Option<String>,
3002 current_row: Option<Row>,
3003 pending: Vec<(EdgeId, NodeId)>,
3004 pending_idx: usize,
3005}
3006
3007impl EdgeExpandOp {
3008 #[allow(clippy::too_many_arguments)]
3009 fn new(
3010 input: Box<dyn Operator>,
3011 src_var: String,
3012 edge_var: Option<String>,
3013 dst_var: String,
3014 dst_labels: Vec<String>,
3015 edge_properties: Vec<(String, Expr)>,
3016 edge_types: Vec<String>,
3017 direction: Direction,
3018 edge_constraint_var: Option<String>,
3019 ) -> Self {
3020 Self {
3021 input,
3022 src_var,
3023 edge_var,
3024 dst_var,
3025 dst_labels,
3026 edge_properties,
3027 edge_types,
3028 direction,
3029 edge_constraint_var,
3030 current_row: None,
3031 pending: Vec::new(),
3032 pending_idx: 0,
3033 }
3034 }
3035}
3036
3037impl Operator for EdgeExpandOp {
3038 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3039 loop {
3040 while self.pending_idx < self.pending.len() {
3041 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3042 self.pending_idx += 1;
3043
3044 let edge = match ctx.store.get_edge(edge_id)? {
3045 Some(e) => e,
3046 None => continue,
3047 };
3048 if !self.edge_types.is_empty()
3049 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3050 {
3051 continue;
3052 }
3053 if let Some(constraint_var) = &self.edge_constraint_var {
3061 let base = self
3062 .current_row
3063 .as_ref()
3064 .expect("pending edges without source row");
3065 let expected = match ctx.lookup_binding(base, constraint_var) {
3066 Some(Value::Edge(e)) => Some(e.id),
3067 _ => None,
3068 };
3069 match expected {
3070 Some(id) if id != edge.id => continue,
3071 None => continue,
3072 _ => {}
3073 }
3074 }
3075 if !self.edge_properties.is_empty() {
3080 let base = self
3081 .current_row
3082 .as_ref()
3083 .expect("pending edges without source row");
3084 let ectx = ctx.eval_ctx(base);
3085 let mut ok = true;
3086 for (key, expr) in &self.edge_properties {
3087 let expected = eval_expr(expr, &ectx)?;
3088 let actual = match edge.properties.get(key) {
3089 Some(v) => Value::Property(v.clone()),
3090 None => {
3091 ok = false;
3092 break;
3093 }
3094 };
3095 if !values_equal(&actual, &expected) {
3096 ok = false;
3097 break;
3098 }
3099 }
3100 if !ok {
3101 continue;
3102 }
3103 }
3104
3105 let neighbor = match ctx.store.get_node(neighbor_id)? {
3106 Some(n) => n,
3107 None => continue,
3108 };
3109 if !has_all_labels(&neighbor, &self.dst_labels) {
3110 continue;
3111 }
3112
3113 let base = self
3114 .current_row
3115 .as_ref()
3116 .expect("pending edges without source row");
3117 let mut out = base.clone();
3118 if let Some(ev) = &self.edge_var {
3119 out.insert(ev.clone(), Value::Edge(edge));
3120 }
3121 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3122 return Ok(Some(out));
3123 }
3124
3125 match self.input.next(ctx)? {
3126 None => return Ok(None),
3127 Some(row) => {
3128 let src_id = match row.get(&self.src_var) {
3129 Some(Value::Node(n)) => n.id,
3130 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3135 continue
3136 }
3137 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3138 };
3139 self.pending = match self.direction {
3140 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3141 Direction::Incoming => ctx.store.incoming(src_id)?,
3142 Direction::Both => {
3143 let mut all = ctx.store.outgoing(src_id)?;
3149 let mut seen: std::collections::HashSet<EdgeId> =
3150 all.iter().map(|(e, _)| *e).collect();
3151 for (e, n) in ctx.store.incoming(src_id)? {
3152 if seen.insert(e) {
3153 all.push((e, n));
3154 }
3155 }
3156 all
3157 }
3158 };
3159 self.pending_idx = 0;
3160 self.current_row = Some(row);
3161 }
3162 }
3163 }
3164 }
3165}
3166
3167struct OptionalEdgeExpandOp {
3182 input: Box<dyn Operator>,
3183 src_var: String,
3184 edge_var: Option<String>,
3185 dst_var: String,
3186 dst_labels: Vec<String>,
3187 dst_properties: Vec<(String, Expr)>,
3188 edge_types: Vec<String>,
3189 direction: Direction,
3190 dst_constraint_var: Option<String>,
3196 edge_constraint_var: Option<String>,
3201 current_row: Option<Row>,
3202 pending: Vec<(EdgeId, NodeId)>,
3203 pending_idx: usize,
3204 yielded_for_current: bool,
3205}
3206
3207impl OptionalEdgeExpandOp {
3208 #[allow(clippy::too_many_arguments)]
3209 fn new(
3210 input: Box<dyn Operator>,
3211 src_var: String,
3212 edge_var: Option<String>,
3213 dst_var: String,
3214 dst_labels: Vec<String>,
3215 dst_properties: Vec<(String, Expr)>,
3216 edge_types: Vec<String>,
3217 direction: Direction,
3218 dst_constraint_var: Option<String>,
3219 edge_constraint_var: Option<String>,
3220 ) -> Self {
3221 Self {
3222 input,
3223 src_var,
3224 edge_var,
3225 dst_var,
3226 dst_labels,
3227 dst_properties,
3228 edge_types,
3229 direction,
3230 dst_constraint_var,
3231 edge_constraint_var,
3232 current_row: None,
3233 pending: Vec::new(),
3234 pending_idx: 0,
3235 yielded_for_current: false,
3236 }
3237 }
3238}
3239
3240impl Operator for OptionalEdgeExpandOp {
3241 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3242 loop {
3243 while self.pending_idx < self.pending.len() {
3244 let (edge_id, neighbor_id) = self.pending[self.pending_idx];
3245 self.pending_idx += 1;
3246
3247 let edge = match ctx.store.get_edge(edge_id)? {
3248 Some(e) => e,
3249 None => continue,
3250 };
3251 if !self.edge_types.is_empty()
3252 && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3253 {
3254 continue;
3255 }
3256 if let Some(constraint_var) = &self.edge_constraint_var {
3262 let base = self
3263 .current_row
3264 .as_ref()
3265 .expect("pending without source row");
3266 let expected = match ctx.lookup_binding(base, constraint_var) {
3267 Some(Value::Edge(e)) => Some(e.id),
3268 _ => None,
3269 };
3270 match expected {
3271 Some(id) if id != edge.id => continue,
3272 None => continue,
3273 _ => {}
3274 }
3275 }
3276
3277 let neighbor = match ctx.store.get_node(neighbor_id)? {
3278 Some(n) => n,
3279 None => continue,
3280 };
3281 if !has_all_labels(&neighbor, &self.dst_labels) {
3282 continue;
3283 }
3284 if let Some(constraint_var) = &self.dst_constraint_var {
3291 let base = self
3292 .current_row
3293 .as_ref()
3294 .expect("pending without source row");
3295 let bound_id = match base.get(constraint_var) {
3296 Some(Value::Node(n)) => Some(n.id),
3297 Some(Value::Null)
3298 | Some(Value::Property(meshdb_core::Property::Null))
3299 | None => None,
3300 _ => None,
3301 };
3302 match bound_id {
3303 Some(id) if id != neighbor.id => continue,
3304 None => continue,
3305 _ => {}
3306 }
3307 }
3308 if !self.dst_properties.is_empty() {
3309 let base = self
3310 .current_row
3311 .as_ref()
3312 .expect("pending without source row");
3313 let ectx = ctx.eval_ctx(base);
3314 let mut props_ok = true;
3315 for (key, expr) in &self.dst_properties {
3316 let expected = eval_expr(expr, &ectx)?;
3317 let actual = neighbor
3318 .properties
3319 .get(key)
3320 .cloned()
3321 .map(Value::Property)
3322 .unwrap_or(Value::Null);
3323 if !values_equal(&expected, &actual) {
3324 props_ok = false;
3325 break;
3326 }
3327 }
3328 if !props_ok {
3329 continue;
3330 }
3331 }
3332
3333 let base = self
3334 .current_row
3335 .as_ref()
3336 .expect("pending edges without source row");
3337 let mut out = base.clone();
3338 if let Some(ev) = &self.edge_var {
3339 out.insert(ev.clone(), Value::Edge(edge));
3340 }
3341 out.insert(self.dst_var.clone(), Value::Node(neighbor));
3342 self.yielded_for_current = true;
3343 return Ok(Some(out));
3344 }
3345
3346 if let Some(base) = self.current_row.take() {
3356 if !self.yielded_for_current {
3357 let mut out = base;
3358 if let Some(ev) = &self.edge_var {
3359 let preserve = self
3360 .edge_constraint_var
3361 .as_ref()
3362 .map(|c| c == ev)
3363 .unwrap_or(false);
3364 if !preserve {
3365 out.insert(ev.clone(), Value::Null);
3366 }
3367 }
3368 let preserve_dst = self
3369 .dst_constraint_var
3370 .as_ref()
3371 .map(|c| c == &self.dst_var)
3372 .unwrap_or(false);
3373 if !preserve_dst {
3374 out.insert(self.dst_var.clone(), Value::Null);
3375 }
3376 self.yielded_for_current = true;
3377 return Ok(Some(out));
3378 }
3379 }
3380
3381 match self.input.next(ctx)? {
3382 None => return Ok(None),
3383 Some(row) => {
3384 let src_id = match row.get(&self.src_var) {
3385 Some(Value::Node(n)) => n.id,
3386 Some(Value::Null) => {
3393 self.pending = Vec::new();
3394 self.pending_idx = 0;
3395 self.yielded_for_current = false;
3396 self.current_row = Some(row);
3397 continue;
3398 }
3399 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3400 };
3401 self.pending = match self.direction {
3402 Direction::Outgoing => ctx.store.outgoing(src_id)?,
3403 Direction::Incoming => ctx.store.incoming(src_id)?,
3404 Direction::Both => {
3405 let mut all = ctx.store.outgoing(src_id)?;
3411 let mut seen: std::collections::HashSet<EdgeId> =
3412 all.iter().map(|(e, _)| *e).collect();
3413 for (e, n) in ctx.store.incoming(src_id)? {
3414 if seen.insert(e) {
3415 all.push((e, n));
3416 }
3417 }
3418 all
3419 }
3420 };
3421 self.pending_idx = 0;
3422 self.yielded_for_current = false;
3423 self.current_row = Some(row);
3424 }
3425 }
3426 }
3427 }
3428}
3429
3430struct VarLengthExpandOp {
3431 input: Box<dyn Operator>,
3432 src_var: String,
3433 edge_var: Option<String>,
3434 dst_var: String,
3435 dst_labels: Vec<String>,
3436 edge_types: Vec<String>,
3437 edge_properties: Vec<(String, Expr)>,
3443 direction: Direction,
3444 min_hops: u64,
3445 max_hops: u64,
3446 path_var: Option<String>,
3447 optional: bool,
3453 dst_constraint_var: Option<String>,
3460 bound_edge_list_var: Option<String>,
3464 excluded_edge_vars: Vec<String>,
3472 current_row: Option<Row>,
3473 pending_paths: Vec<Vec<Edge>>,
3474 pending_node_paths: Vec<Vec<NodeId>>,
3475 pending_targets: Vec<NodeId>,
3476 pending_idx: usize,
3477}
3478
3479impl VarLengthExpandOp {
3480 #[allow(clippy::too_many_arguments)]
3481 fn new(
3482 input: Box<dyn Operator>,
3483 src_var: String,
3484 edge_var: Option<String>,
3485 dst_var: String,
3486 dst_labels: Vec<String>,
3487 edge_types: Vec<String>,
3488 edge_properties: Vec<(String, Expr)>,
3489 direction: Direction,
3490 min_hops: u64,
3491 max_hops: u64,
3492 path_var: Option<String>,
3493 optional: bool,
3494 dst_constraint_var: Option<String>,
3495 bound_edge_list_var: Option<String>,
3496 excluded_edge_vars: Vec<String>,
3497 ) -> Self {
3498 Self {
3499 input,
3500 src_var,
3501 edge_var,
3502 dst_var,
3503 dst_labels,
3504 edge_types,
3505 edge_properties,
3506 direction,
3507 min_hops,
3508 max_hops,
3509 path_var,
3510 optional,
3511 dst_constraint_var,
3512 bound_edge_list_var,
3513 excluded_edge_vars,
3514 current_row: None,
3515 pending_paths: Vec::new(),
3516 pending_node_paths: Vec::new(),
3517 pending_targets: Vec::new(),
3518 pending_idx: 0,
3519 }
3520 }
3521
3522 fn enumerate(
3523 &self,
3524 ctx: &ExecCtx,
3525 start: NodeId,
3526 input_row: &Row,
3527 ) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3528 let mut paths: Vec<Vec<Edge>> = Vec::new();
3529 let mut node_paths: Vec<Vec<NodeId>> = Vec::new();
3530 let mut targets: Vec<NodeId> = Vec::new();
3531 let mut edge_buf: Vec<Edge> = Vec::new();
3532 let mut node_buf: Vec<NodeId> = vec![start];
3533 let mut used: HashSet<EdgeId> = HashSet::new();
3540 for var in &self.excluded_edge_vars {
3541 match ctx.lookup_binding(input_row, var) {
3542 Some(Value::Edge(e)) => {
3543 used.insert(e.id);
3544 }
3545 Some(Value::List(items)) => {
3546 for item in items {
3547 if let Value::Edge(e) = item {
3548 used.insert(e.id);
3549 }
3550 }
3551 }
3552 _ => {}
3553 }
3554 }
3555 let expected_edge_props: Vec<(String, Value)> = if self.edge_properties.is_empty() {
3559 Vec::new()
3560 } else {
3561 let ectx = ctx.eval_ctx(input_row);
3562 self.edge_properties
3563 .iter()
3564 .map(|(k, expr)| eval_expr(expr, &ectx).map(|v| (k.clone(), v)))
3565 .collect::<Result<Vec<_>>>()?
3566 };
3567 self.dfs(
3568 ctx,
3569 start,
3570 &expected_edge_props,
3571 &mut edge_buf,
3572 &mut node_buf,
3573 &mut used,
3574 &mut paths,
3575 &mut node_paths,
3576 &mut targets,
3577 )?;
3578 Ok((paths, node_paths, targets))
3579 }
3580
3581 #[allow(clippy::too_many_arguments)]
3582 fn dfs(
3583 &self,
3584 ctx: &ExecCtx,
3585 current_node: NodeId,
3586 expected_edge_props: &[(String, Value)],
3587 edge_buf: &mut Vec<Edge>,
3588 node_buf: &mut Vec<NodeId>,
3589 used: &mut HashSet<EdgeId>,
3590 out_paths: &mut Vec<Vec<Edge>>,
3591 out_node_paths: &mut Vec<Vec<NodeId>>,
3592 out_targets: &mut Vec<NodeId>,
3593 ) -> Result<()> {
3594 let depth = edge_buf.len() as u64;
3595
3596 if depth >= self.min_hops && depth <= self.max_hops {
3597 let terminal_ok = match ctx.store.get_node(current_node)? {
3598 Some(node) => has_all_labels(&node, &self.dst_labels),
3599 None => false,
3600 };
3601 if terminal_ok {
3602 out_paths.push(edge_buf.clone());
3603 out_node_paths.push(node_buf.clone());
3604 out_targets.push(current_node);
3605 }
3606 }
3607
3608 if depth >= self.max_hops {
3609 return Ok(());
3610 }
3611
3612 let neighbors = match self.direction {
3613 Direction::Outgoing => ctx.store.outgoing(current_node)?,
3614 Direction::Incoming => ctx.store.incoming(current_node)?,
3615 Direction::Both => {
3616 let mut all = ctx.store.outgoing(current_node)?;
3617 all.extend(ctx.store.incoming(current_node)?);
3618 all
3619 }
3620 };
3621
3622 for (eid, neighbor_id) in neighbors {
3623 if used.contains(&eid) {
3624 continue;
3625 }
3626 let edge = match ctx.store.get_edge(eid)? {
3627 Some(e) => e,
3628 None => continue,
3629 };
3630 if !self.edge_types.is_empty() && !self.edge_types.iter().any(|t| t == &edge.edge_type)
3631 {
3632 continue;
3633 }
3634 if !expected_edge_props.is_empty() {
3639 let mut ok = true;
3640 for (key, expected) in expected_edge_props {
3641 let actual = match edge.properties.get(key) {
3642 Some(v) => Value::Property(v.clone()),
3643 None => {
3644 ok = false;
3645 break;
3646 }
3647 };
3648 if !values_equal(&actual, expected) {
3649 ok = false;
3650 break;
3651 }
3652 }
3653 if !ok {
3654 continue;
3655 }
3656 }
3657 used.insert(eid);
3658 edge_buf.push(edge);
3659 node_buf.push(neighbor_id);
3660 self.dfs(
3661 ctx,
3662 neighbor_id,
3663 expected_edge_props,
3664 edge_buf,
3665 node_buf,
3666 used,
3667 out_paths,
3668 out_node_paths,
3669 out_targets,
3670 )?;
3671 edge_buf.pop();
3672 node_buf.pop();
3673 used.remove(&eid);
3674 }
3675
3676 Ok(())
3677 }
3678}
3679
3680fn replay_edge_list(
3698 ctx: &ExecCtx,
3699 row: &Row,
3700 list_var: &str,
3701 src_id: Option<NodeId>,
3702 direction: Direction,
3703 edge_types: &[String],
3704) -> Result<(Vec<Vec<Edge>>, Vec<Vec<NodeId>>, Vec<NodeId>)> {
3705 let start = match src_id {
3706 Some(id) => id,
3707 None => return Ok((Vec::new(), Vec::new(), Vec::new())),
3708 };
3709 let list = match ctx.lookup_binding(row, list_var) {
3710 Some(Value::List(items)) => items.clone(),
3711 Some(Value::Property(meshdb_core::Property::List(items))) => items
3712 .iter()
3713 .cloned()
3714 .map(Value::Property)
3715 .collect::<Vec<_>>(),
3716 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3717 };
3718 let mut edge_buf: Vec<Edge> = Vec::with_capacity(list.len());
3719 let mut node_buf: Vec<NodeId> = Vec::with_capacity(list.len() + 1);
3720 node_buf.push(start);
3721 let mut current = start;
3722 for item in list {
3723 let edge = match item {
3724 Value::Edge(e) => e,
3725 _ => return Ok((Vec::new(), Vec::new(), Vec::new())),
3726 };
3727 if !edge_types.is_empty() && !edge_types.iter().any(|t| t == &edge.edge_type) {
3728 return Ok((Vec::new(), Vec::new(), Vec::new()));
3729 }
3730 let next_node = match direction {
3731 Direction::Outgoing => {
3732 if edge.source != current {
3733 return Ok((Vec::new(), Vec::new(), Vec::new()));
3734 }
3735 edge.target
3736 }
3737 Direction::Incoming => {
3738 if edge.target != current {
3739 return Ok((Vec::new(), Vec::new(), Vec::new()));
3740 }
3741 edge.source
3742 }
3743 Direction::Both => {
3744 if edge.source == current {
3745 edge.target
3746 } else if edge.target == current {
3747 edge.source
3748 } else {
3749 return Ok((Vec::new(), Vec::new(), Vec::new()));
3750 }
3751 }
3752 };
3753 if ctx.store.get_node(next_node)?.is_none() {
3757 return Ok((Vec::new(), Vec::new(), Vec::new()));
3758 }
3759 edge_buf.push(edge);
3760 node_buf.push(next_node);
3761 current = next_node;
3762 }
3763 Ok((vec![edge_buf], vec![node_buf], vec![current]))
3764}
3765
3766impl Operator for VarLengthExpandOp {
3767 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3768 loop {
3769 while self.pending_idx < self.pending_paths.len() {
3770 let i = self.pending_idx;
3771 self.pending_idx += 1;
3772
3773 let target_id = self.pending_targets[i];
3774 let target = match ctx.store.get_node(target_id)? {
3775 Some(n) => n,
3776 None => continue,
3777 };
3778
3779 let base = self
3780 .current_row
3781 .as_ref()
3782 .expect("pending without source row");
3783 let mut out = base.clone();
3784 out.insert(self.dst_var.clone(), Value::Node(target.clone()));
3785 if let Some(ev) = &self.edge_var {
3786 let edges: Vec<Value> = self.pending_paths[i]
3787 .iter()
3788 .cloned()
3789 .map(Value::Edge)
3790 .collect();
3791 out.insert(ev.clone(), Value::List(edges));
3792 }
3793 if let Some(pv) = &self.path_var {
3794 let mut nodes = Vec::with_capacity(self.pending_node_paths[i].len());
3795 for nid in &self.pending_node_paths[i] {
3796 match ctx.store.get_node(*nid)? {
3797 Some(n) => nodes.push(n),
3798 None => continue,
3799 }
3800 }
3801 let edges = self.pending_paths[i].clone();
3802 out.insert(pv.clone(), Value::Path { nodes, edges });
3803 }
3804 return Ok(Some(out));
3805 }
3806
3807 match self.input.next(ctx)? {
3808 None => return Ok(None),
3809 Some(row) => {
3810 let src_id = match row.get(&self.src_var) {
3811 Some(Value::Node(n)) => Some(n.id),
3812 Some(Value::Null) | Some(Value::Property(meshdb_core::Property::Null)) => {
3818 None
3819 }
3820 _ => return Err(Error::UnboundVariable(self.src_var.clone())),
3821 };
3822 let (mut paths, mut node_paths, mut targets) =
3830 if let Some(list_var) = &self.bound_edge_list_var {
3831 replay_edge_list(
3832 ctx,
3833 &row,
3834 list_var,
3835 src_id,
3836 self.direction,
3837 &self.edge_types,
3838 )?
3839 } else {
3840 match src_id {
3841 Some(id) => self.enumerate(ctx, id, &row)?,
3842 None => (Vec::new(), Vec::new(), Vec::new()),
3843 }
3844 };
3845 if let Some(constraint_var) = &self.dst_constraint_var {
3852 let target_id = match row.get(constraint_var) {
3853 Some(Value::Node(n)) => Some(n.id),
3854 _ => None,
3855 };
3856 match target_id {
3857 Some(id) => {
3858 let mut kept_paths = Vec::new();
3859 let mut kept_node_paths = Vec::new();
3860 let mut kept_targets = Vec::new();
3861 for ((p, np), t) in paths
3862 .drain(..)
3863 .zip(node_paths.drain(..))
3864 .zip(targets.drain(..))
3865 {
3866 if t == id {
3867 kept_paths.push(p);
3868 kept_node_paths.push(np);
3869 kept_targets.push(t);
3870 }
3871 }
3872 paths = kept_paths;
3873 node_paths = kept_node_paths;
3874 targets = kept_targets;
3875 }
3876 None => {
3877 paths.clear();
3878 node_paths.clear();
3879 targets.clear();
3880 }
3881 }
3882 }
3883 if paths.is_empty() && self.optional {
3884 let mut out = row;
3889 if let Some(ev) = &self.edge_var {
3890 out.insert(ev.clone(), Value::Null);
3891 }
3892 out.insert(self.dst_var.clone(), Value::Null);
3893 if let Some(pv) = &self.path_var {
3894 out.insert(pv.clone(), Value::Null);
3895 }
3896 return Ok(Some(out));
3897 }
3898 self.pending_paths = paths;
3899 self.pending_node_paths = node_paths;
3900 self.pending_targets = targets;
3901 self.pending_idx = 0;
3902 self.current_row = Some(row);
3903 }
3904 }
3905 }
3906 }
3907}
3908
3909struct FilterOp {
3910 input: Box<dyn Operator>,
3911 predicate: Expr,
3912}
3913
3914impl FilterOp {
3915 fn new(input: Box<dyn Operator>, predicate: Expr) -> Self {
3916 Self { input, predicate }
3917 }
3918}
3919
3920impl Operator for FilterOp {
3921 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3922 while let Some(row) = self.input.next(ctx)? {
3923 let v = match eval_expr(&self.predicate, &ctx.eval_ctx(&row)) {
3924 Ok(v) => v,
3925 Err(Error::TypeMismatch) | Err(Error::NotBoolean) => Value::Null,
3928 Err(e) => return Err(e),
3929 };
3930 if to_bool(&v).unwrap_or(false) {
3931 return Ok(Some(row));
3932 }
3933 }
3934 Ok(None)
3935 }
3936}
3937
3938struct IdentityOp {
3941 input: Box<dyn Operator>,
3942}
3943
3944impl IdentityOp {
3945 fn new(input: Box<dyn Operator>) -> Self {
3946 Self { input }
3947 }
3948}
3949
3950impl Operator for IdentityOp {
3951 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3952 self.input.next(ctx)
3953 }
3954}
3955
3956struct CoalesceNullRowOp {
3962 input: Box<dyn Operator>,
3963 null_vars: Vec<String>,
3964 produced_any: bool,
3965 done: bool,
3966}
3967
3968impl CoalesceNullRowOp {
3969 fn new(input: Box<dyn Operator>, null_vars: Vec<String>) -> Self {
3970 Self {
3971 input,
3972 null_vars,
3973 produced_any: false,
3974 done: false,
3975 }
3976 }
3977}
3978
3979impl Operator for CoalesceNullRowOp {
3980 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
3981 if self.done {
3982 return Ok(None);
3983 }
3984 match self.input.next(ctx)? {
3985 Some(row) => {
3986 self.produced_any = true;
3987 Ok(Some(row))
3988 }
3989 None => {
3990 self.done = true;
3991 if self.produced_any {
3992 Ok(None)
3993 } else {
3994 let mut row = Row::new();
3995 for v in &self.null_vars {
3996 row.insert(v.clone(), Value::Null);
3997 }
3998 Ok(Some(row))
3999 }
4000 }
4001 }
4002 }
4003}
4004
4005struct ProjectOp {
4006 input: Box<dyn Operator>,
4007 items: Vec<ReturnItem>,
4008}
4009
4010impl ProjectOp {
4011 fn new(input: Box<dyn Operator>, items: Vec<ReturnItem>) -> Self {
4012 Self { input, items }
4013 }
4014}
4015
4016impl Operator for ProjectOp {
4017 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4018 match self.input.next(ctx)? {
4019 Some(row) => {
4020 let mut out = Row::new();
4021 for (i, item) in self.items.iter().enumerate() {
4022 let name = item.alias.clone().unwrap_or_else(|| {
4023 item.raw_text
4024 .clone()
4025 .unwrap_or_else(|| default_name(&item.expr, i))
4026 });
4027 let value = eval_expr(&item.expr, &ctx.eval_ctx(&row))?;
4028 out.insert(name, value);
4029 }
4030 Ok(Some(out))
4031 }
4032 None => Ok(None),
4033 }
4034 }
4035}
4036
4037fn default_name(expr: &Expr, idx: usize) -> String {
4038 render_expr_name(expr).unwrap_or_else(|| format!("col{}", idx))
4039}
4040
4041fn render_expr_name(expr: &Expr) -> Option<String> {
4042 Some(match expr {
4043 Expr::Identifier(s) => s.clone(),
4044 Expr::Property { var, key } => format!("{var}.{key}"),
4045 Expr::PropertyAccess { base, key } => {
4046 if matches!(
4050 base.as_ref(),
4051 Expr::IndexAccess { .. } | Expr::SliceAccess { .. }
4052 ) {
4053 format!("({}).{key}", render_expr_name(base)?)
4054 } else {
4055 format!("{}.{key}", render_expr_name(base)?)
4056 }
4057 }
4058 Expr::Parameter(name) => format!("${name}"),
4059 Expr::Literal(Literal::String(s)) => format!("'{s}'"),
4060 Expr::Literal(Literal::Integer(i)) => i.to_string(),
4061 Expr::Literal(Literal::Float(f)) => f.to_string(),
4062 Expr::Literal(Literal::Boolean(b)) => b.to_string(),
4063 Expr::Literal(Literal::Null) => "NULL".into(),
4064 Expr::Call { name, args } => {
4065 let arg_str = match args {
4066 CallArgs::Star => "*".into(),
4067 CallArgs::Exprs(es) | CallArgs::DistinctExprs(es) => {
4068 let prefix = if matches!(args, CallArgs::DistinctExprs(_)) {
4069 "DISTINCT "
4070 } else {
4071 ""
4072 };
4073 let inner: Vec<String> = es.iter().filter_map(render_expr_name).collect();
4074 if inner.len() != es.len() {
4075 return None;
4076 }
4077 format!("{prefix}{}", inner.join(", "))
4078 }
4079 };
4080 format!("{name}({arg_str})")
4081 }
4082 Expr::BinaryOp { op, left, right } => {
4083 let op_str = match op {
4084 BinaryOp::Add => " + ",
4085 BinaryOp::Sub => " - ",
4086 BinaryOp::Mul => " * ",
4087 BinaryOp::Div => " / ",
4088 BinaryOp::Mod => " % ",
4089 BinaryOp::Pow => " ^ ",
4090 };
4091 format!(
4092 "{}{op_str}{}",
4093 render_expr_name(left)?,
4094 render_expr_name(right)?
4095 )
4096 }
4097 Expr::UnaryOp { op, operand } => {
4098 let op_str = match op {
4099 UnaryOp::Neg => "-",
4100 };
4101 format!("{op_str}{}", render_expr_name(operand)?)
4102 }
4103 Expr::Not(inner) => format!("NOT {}", render_expr_name(inner)?),
4104 Expr::IsNull { negated, inner } => {
4105 if *negated {
4106 format!("{} IS NOT NULL", render_expr_name(inner)?)
4107 } else {
4108 format!("{} IS NULL", render_expr_name(inner)?)
4109 }
4110 }
4111 Expr::Compare { op, left, right } => {
4112 let op_str = match op {
4113 CompareOp::Eq => " = ",
4114 CompareOp::Ne => " <> ",
4115 CompareOp::Lt => " < ",
4116 CompareOp::Le => " <= ",
4117 CompareOp::Gt => " > ",
4118 CompareOp::Ge => " >= ",
4119 CompareOp::StartsWith => " STARTS WITH ",
4120 CompareOp::EndsWith => " ENDS WITH ",
4121 CompareOp::Contains => " CONTAINS ",
4122 CompareOp::RegexMatch => " =~ ",
4123 };
4124 format!(
4125 "{}{op_str}{}",
4126 render_expr_name(left)?,
4127 render_expr_name(right)?
4128 )
4129 }
4130 Expr::List(items) => {
4131 let inner: Vec<String> = items.iter().filter_map(render_expr_name).collect();
4132 if inner.len() != items.len() {
4133 return None;
4134 }
4135 format!("[{}]", inner.join(", "))
4136 }
4137 Expr::Map(entries) => {
4138 let inner: Vec<String> = entries
4139 .iter()
4140 .map(|(k, v)| render_expr_name(v).map(|vn| format!("{k}: {vn}")))
4141 .collect::<Option<Vec<_>>>()?;
4142 format!("{{{}}}", inner.join(", "))
4143 }
4144 Expr::IndexAccess { base, index } => {
4145 format!("{}[{}]", render_expr_name(base)?, render_expr_name(index)?)
4146 }
4147 Expr::InList { element, list } => {
4148 format!(
4149 "{} IN {}",
4150 render_expr_name(element)?,
4151 render_expr_name(list)?
4152 )
4153 }
4154 Expr::HasLabels { expr, labels } => {
4155 let mut s = format!("({}", render_expr_name(expr)?);
4156 for l in labels {
4157 s.push(':');
4158 s.push_str(l);
4159 }
4160 s.push(')');
4161 s
4162 }
4163 _ => return None,
4164 })
4165}
4166
4167struct DistinctOp {
4168 input: Box<dyn Operator>,
4169 seen: HashSet<String>,
4170}
4171
4172impl DistinctOp {
4173 fn new(input: Box<dyn Operator>) -> Self {
4174 Self {
4175 input,
4176 seen: HashSet::new(),
4177 }
4178 }
4179}
4180
4181impl Operator for DistinctOp {
4182 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4183 while let Some(row) = self.input.next(ctx)? {
4184 let key = row_key(&row);
4185 if self.seen.insert(key) {
4186 return Ok(Some(row));
4187 }
4188 }
4189 Ok(None)
4190 }
4191}
4192
4193struct BindPathOp {
4209 input: Box<dyn Operator>,
4210 path_var: String,
4211 node_vars: Vec<String>,
4212 edge_vars: Vec<String>,
4213}
4214
4215impl BindPathOp {
4216 fn new(
4217 input: Box<dyn Operator>,
4218 path_var: String,
4219 node_vars: Vec<String>,
4220 edge_vars: Vec<String>,
4221 ) -> Self {
4222 Self {
4223 input,
4224 path_var,
4225 node_vars,
4226 edge_vars,
4227 }
4228 }
4229}
4230
4231impl Operator for BindPathOp {
4232 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4233 let Some(mut row) = self.input.next(ctx)? else {
4234 return Ok(None);
4235 };
4236 let mut nodes: Vec<meshdb_core::Node> = Vec::new();
4240 let mut edges: Vec<meshdb_core::Edge> = Vec::new();
4241 let mut abort = false;
4242 if let Some(Value::Node(n)) = row.get(&self.node_vars[0]) {
4249 nodes.push(n.clone());
4250 } else {
4251 abort = true;
4252 }
4253 if !abort {
4254 for (i, ev) in self.edge_vars.iter().enumerate() {
4255 match row.get(ev) {
4256 Some(Value::Edge(e)) => {
4257 edges.push(e.clone());
4258 match row.get(&self.node_vars[i + 1]) {
4259 Some(Value::Node(n)) => nodes.push(n.clone()),
4260 _ => {
4261 abort = true;
4262 break;
4263 }
4264 }
4265 }
4266 Some(Value::Path {
4267 nodes: sub_nodes,
4268 edges: sub_edges,
4269 }) => {
4270 edges.extend(sub_edges.iter().cloned());
4276 if sub_nodes.len() > 1 {
4277 nodes.extend(sub_nodes[1..].iter().cloned());
4278 }
4279 }
4280 _ => {
4281 abort = true;
4282 break;
4283 }
4284 }
4285 }
4286 }
4287 if abort {
4288 row.insert(self.path_var.clone(), Value::Null);
4289 } else {
4290 row.insert(self.path_var.clone(), Value::Path { nodes, edges });
4291 }
4292 Ok(Some(row))
4293 }
4294}
4295
4296struct ShortestPathOp {
4315 input: Box<dyn Operator>,
4316 src_var: String,
4317 dst_var: String,
4318 path_var: String,
4319 edge_types: Vec<String>,
4320 direction: meshdb_cypher::Direction,
4321 max_hops: u64,
4322 kind: meshdb_cypher::ShortestKind,
4323 pending: std::collections::VecDeque<(Row, Value)>,
4330}
4331
4332impl ShortestPathOp {
4333 #[allow(clippy::too_many_arguments)]
4334 fn new(
4335 input: Box<dyn Operator>,
4336 src_var: String,
4337 dst_var: String,
4338 path_var: String,
4339 edge_types: Vec<String>,
4340 direction: meshdb_cypher::Direction,
4341 max_hops: u64,
4342 kind: meshdb_cypher::ShortestKind,
4343 ) -> Self {
4344 Self {
4345 input,
4346 src_var,
4347 dst_var,
4348 path_var,
4349 edge_types,
4350 direction,
4351 max_hops,
4352 kind,
4353 pending: std::collections::VecDeque::new(),
4354 }
4355 }
4356}
4357
4358impl Operator for ShortestPathOp {
4359 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4360 loop {
4361 if let Some((mut row, path)) = self.pending.pop_front() {
4366 row.insert(self.path_var.clone(), path);
4367 return Ok(Some(row));
4368 }
4369 let Some(row) = self.input.next(ctx)? else {
4370 return Ok(None);
4371 };
4372 let src = match row.get(&self.src_var) {
4373 Some(Value::Node(n)) => n.clone(),
4374 _ => continue,
4375 };
4376 let dst = match row.get(&self.dst_var) {
4377 Some(Value::Node(n)) => n.clone(),
4378 _ => continue,
4379 };
4380 let paths = bfs_shortest_paths(
4381 &src,
4382 &dst,
4383 &self.edge_types,
4384 self.direction,
4385 self.max_hops,
4386 self.kind,
4387 ctx.store,
4388 )?;
4389 if paths.is_empty() {
4390 continue;
4392 }
4393 for path in paths {
4394 self.pending.push_back((row.clone(), path));
4395 }
4396 }
4397 }
4398}
4399
4400fn bfs_shortest_paths(
4419 src: &Node,
4420 dst: &Node,
4421 edge_types: &[String],
4422 direction: meshdb_cypher::Direction,
4423 max_hops: u64,
4424 kind: meshdb_cypher::ShortestKind,
4425 reader: &dyn crate::reader::GraphReader,
4426) -> Result<Vec<Value>> {
4427 use meshdb_cypher::Direction;
4428
4429 if src.id == dst.id {
4430 return Ok(vec![Value::Path {
4431 nodes: vec![src.clone()],
4432 edges: vec![],
4433 }]);
4434 }
4435
4436 let mut dist: HashMap<NodeId, u64> = HashMap::new();
4442 dist.insert(src.id, 0);
4443 let mut parents: HashMap<NodeId, Vec<(NodeId, EdgeId)>> = HashMap::new();
4444
4445 let mut frontier: Vec<NodeId> = vec![src.id];
4446 let mut depth: u64 = 0;
4447 let mut found = false;
4448
4449 while !frontier.is_empty() && depth < max_hops && !found {
4450 let mut next_frontier: Vec<NodeId> = Vec::new();
4451 for node_id in &frontier {
4452 let neighbors = match direction {
4453 Direction::Outgoing => reader.outgoing(*node_id)?,
4454 Direction::Incoming => reader.incoming(*node_id)?,
4455 Direction::Both => {
4456 let mut out = reader.outgoing(*node_id)?;
4457 out.extend(reader.incoming(*node_id)?);
4458 out
4459 }
4460 };
4461 for (edge_id, neighbor_id) in neighbors {
4462 if !edge_types.is_empty() {
4465 let edge = match reader.get_edge(edge_id)? {
4466 Some(e) => e,
4467 None => continue,
4468 };
4469 if !edge_types.iter().any(|t| t == &edge.edge_type) {
4470 continue;
4471 }
4472 }
4473 match dist.get(&neighbor_id) {
4474 Some(&d) if d == depth + 1 => {
4475 parents
4481 .entry(neighbor_id)
4482 .or_default()
4483 .push((*node_id, edge_id));
4484 }
4485 Some(_) => {
4486 }
4490 None => {
4491 dist.insert(neighbor_id, depth + 1);
4492 parents
4493 .entry(neighbor_id)
4494 .or_default()
4495 .push((*node_id, edge_id));
4496 if neighbor_id == dst.id {
4497 found = true;
4498 } else {
4499 next_frontier.push(neighbor_id);
4500 }
4501 }
4502 }
4503 }
4504 }
4505 depth += 1;
4506 if !found {
4507 frontier = next_frontier;
4508 }
4509 }
4510
4511 if !found {
4512 return Ok(Vec::new());
4513 }
4514
4515 let mut out: Vec<Value> = Vec::new();
4519 let mut nodes_rev: Vec<Node> = Vec::new();
4520 let mut edges_rev: Vec<Edge> = Vec::new();
4521 let only_first = matches!(kind, meshdb_cypher::ShortestKind::Shortest);
4522 collect_shortest_paths(
4523 src,
4524 dst,
4525 &parents,
4526 reader,
4527 &mut nodes_rev,
4528 &mut edges_rev,
4529 &mut out,
4530 only_first,
4531 )?;
4532 Ok(out)
4533}
4534
4535#[allow(clippy::too_many_arguments)]
4547fn collect_shortest_paths(
4548 src: &Node,
4549 current: &Node,
4550 parents: &HashMap<NodeId, Vec<(NodeId, EdgeId)>>,
4551 reader: &dyn crate::reader::GraphReader,
4552 nodes_rev: &mut Vec<Node>,
4553 edges_rev: &mut Vec<Edge>,
4554 out: &mut Vec<Value>,
4555 only_first: bool,
4556) -> Result<()> {
4557 if current.id == src.id {
4558 let mut nodes: Vec<Node> = Vec::with_capacity(nodes_rev.len() + 1);
4563 nodes.push(src.clone());
4564 nodes.extend(nodes_rev.iter().rev().cloned());
4565 let edges: Vec<Edge> = edges_rev.iter().rev().cloned().collect();
4566 out.push(Value::Path { nodes, edges });
4567 return Ok(());
4568 }
4569 let Some(parent_edges) = parents.get(¤t.id) else {
4570 return Ok(());
4574 };
4575 for (parent_id, edge_id) in parent_edges {
4576 if only_first && !out.is_empty() {
4577 return Ok(());
4578 }
4579 let edge = reader
4580 .get_edge(*edge_id)?
4581 .expect("BFS inserted this edge id; it must still exist");
4582 let parent_node = reader
4583 .get_node(*parent_id)?
4584 .expect("BFS visited this node id; it must still exist");
4585 nodes_rev.push(current.clone());
4586 edges_rev.push(edge);
4587 collect_shortest_paths(
4588 src,
4589 &parent_node,
4590 parents,
4591 reader,
4592 nodes_rev,
4593 edges_rev,
4594 out,
4595 only_first,
4596 )?;
4597 nodes_rev.pop();
4598 edges_rev.pop();
4599 }
4600 Ok(())
4601}
4602
4603struct UnionOp {
4612 branches: Vec<Box<dyn Operator>>,
4613 current: usize,
4614 seen: Option<HashSet<String>>,
4615}
4616
4617impl UnionOp {
4618 fn new(branches: Vec<Box<dyn Operator>>, all: bool) -> Self {
4619 Self {
4620 branches,
4621 current: 0,
4622 seen: if all { None } else { Some(HashSet::new()) },
4623 }
4624 }
4625}
4626
4627impl Operator for UnionOp {
4628 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4629 while self.current < self.branches.len() {
4630 match self.branches[self.current].next(ctx)? {
4631 Some(row) => {
4632 if let Some(seen) = self.seen.as_mut() {
4633 let key = row_key(&row);
4634 if !seen.insert(key) {
4635 continue;
4636 }
4637 }
4638 return Ok(Some(row));
4639 }
4640 None => {
4641 self.current += 1;
4642 }
4643 }
4644 }
4645 Ok(None)
4646 }
4647}
4648
4649struct OrderByOp {
4650 input: Box<dyn Operator>,
4651 sort_items: Vec<SortItem>,
4652 sorted: Option<Vec<Row>>,
4653 cursor: usize,
4654}
4655
4656impl OrderByOp {
4657 fn new(input: Box<dyn Operator>, sort_items: Vec<SortItem>) -> Self {
4658 Self {
4659 input,
4660 sort_items,
4661 sorted: None,
4662 cursor: 0,
4663 }
4664 }
4665}
4666
4667impl Operator for OrderByOp {
4668 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4669 if self.sorted.is_none() {
4670 let mut rows: Vec<Row> = Vec::new();
4671 while let Some(row) = self.input.next(ctx)? {
4672 rows.push(row);
4673 }
4674 let mut keyed: Vec<(Vec<Value>, Row)> = Vec::with_capacity(rows.len());
4675 for row in rows {
4676 let mut keys = Vec::with_capacity(self.sort_items.len());
4677 for item in &self.sort_items {
4678 keys.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4679 }
4680 keyed.push((keys, row));
4681 }
4682 let descs: Vec<bool> = self.sort_items.iter().map(|s| s.descending).collect();
4683 keyed.sort_by(|a, b| {
4684 for (i, (va, vb)) in a.0.iter().zip(b.0.iter()).enumerate() {
4685 let ord = compare_values(va, vb);
4686 let ord = if descs[i] { ord.reverse() } else { ord };
4687 if ord != Ordering::Equal {
4688 return ord;
4689 }
4690 }
4691 Ordering::Equal
4692 });
4693 self.sorted = Some(keyed.into_iter().map(|(_, r)| r).collect());
4694 }
4695 let rows = self.sorted.as_ref().unwrap();
4696 if self.cursor < rows.len() {
4697 let row = rows[self.cursor].clone();
4698 self.cursor += 1;
4699 Ok(Some(row))
4700 } else {
4701 Ok(None)
4702 }
4703 }
4704}
4705
4706struct AggregateOp {
4707 input: Box<dyn Operator>,
4708 group_keys: Vec<ReturnItem>,
4709 aggregates: Vec<AggregateSpec>,
4710 results: Option<Vec<Row>>,
4711 cursor: usize,
4712}
4713
4714impl AggregateOp {
4715 fn new(
4716 input: Box<dyn Operator>,
4717 group_keys: Vec<ReturnItem>,
4718 aggregates: Vec<AggregateSpec>,
4719 ) -> Self {
4720 Self {
4721 input,
4722 group_keys,
4723 aggregates,
4724 results: None,
4725 cursor: 0,
4726 }
4727 }
4728
4729 fn compute(&mut self, ctx: &ExecCtx) -> Result<()> {
4730 let mut groups: HashMap<String, GroupState> = HashMap::new();
4731 let mut order: Vec<String> = Vec::new();
4732
4733 let mut saw_any = false;
4736
4737 while let Some(row) = self.input.next(ctx)? {
4738 saw_any = true;
4739 let mut key_values = Vec::with_capacity(self.group_keys.len());
4740 for item in &self.group_keys {
4741 key_values.push(eval_expr(&item.expr, &ctx.eval_ctx(&row))?);
4742 }
4743 let mut hash_key = String::new();
4744 for v in &key_values {
4745 hash_key.push_str(&value_key(v));
4746 hash_key.push('|');
4747 }
4748 let entry = groups.entry(hash_key.clone()).or_insert_with(|| {
4749 order.push(hash_key.clone());
4750 GroupState {
4751 key_values: key_values.clone(),
4752 agg_states: self
4753 .aggregates
4754 .iter()
4755 .map(|a| AggState::initial(a.function))
4756 .collect(),
4757 distinct_seen: self.aggregates.iter().map(|_| None).collect(),
4758 }
4759 });
4760 for (i, spec) in self.aggregates.iter().enumerate() {
4761 if let AggregateArg::DistinctExpr(expr) = &spec.arg {
4762 let v = eval_expr(expr, &ctx.eval_ctx(&row))?;
4763 if matches!(v, Value::Null) {
4764 continue;
4765 }
4766 let key = value_key(&v);
4767 let seen = entry.distinct_seen[i].get_or_insert_with(HashSet::new);
4768 if !seen.insert(key) {
4769 continue;
4770 }
4771 }
4772 entry.agg_states[i].update(&spec.arg, &ctx.eval_ctx(&row))?;
4773 if let Some(extra_expr) = &spec.extra_arg {
4777 let need_resolve = matches!(
4778 &entry.agg_states[i],
4779 AggState::PercentileDisc {
4780 percentile: None,
4781 ..
4782 } | AggState::PercentileCont {
4783 percentile: None,
4784 ..
4785 }
4786 );
4787 if need_resolve {
4788 let pv = eval_expr(extra_expr, &ctx.eval_ctx(&row))?;
4789 let p = match pv {
4790 Value::Property(Property::Float64(f)) => f,
4791 Value::Property(Property::Int64(i)) => i as f64,
4792 _ => 0.0,
4793 };
4794 if !(0.0..=1.0).contains(&p) || p.is_nan() {
4798 return Err(Error::Procedure(format!("percentile out of range: {p}")));
4799 }
4800 match &mut entry.agg_states[i] {
4801 AggState::PercentileDisc { percentile, .. }
4802 | AggState::PercentileCont { percentile, .. } => {
4803 *percentile = Some(p);
4804 }
4805 _ => {}
4806 }
4807 }
4808 }
4809 }
4810 }
4811
4812 let mut out = Vec::new();
4813 if !saw_any && self.group_keys.is_empty() && !self.aggregates.is_empty() {
4814 let mut row = Row::new();
4816 for spec in &self.aggregates {
4817 row.insert(
4818 spec.alias.clone(),
4819 AggState::initial(spec.function).finalize(),
4820 );
4821 }
4822 out.push(row);
4823 } else {
4824 for key in order {
4825 let state = groups.remove(&key).unwrap();
4826 let mut row = Row::new();
4827 for (i, item) in self.group_keys.iter().enumerate() {
4828 let name = item
4829 .alias
4830 .clone()
4831 .unwrap_or_else(|| default_name(&item.expr, i));
4832 row.insert(name, state.key_values[i].clone());
4833 }
4834 for (i, spec) in self.aggregates.iter().enumerate() {
4835 row.insert(spec.alias.clone(), state.agg_states[i].finalize());
4836 }
4837 out.push(row);
4838 }
4839 }
4840 self.results = Some(out);
4841 Ok(())
4842 }
4843}
4844
4845impl Operator for AggregateOp {
4846 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
4847 if self.results.is_none() {
4848 self.compute(ctx)?;
4849 }
4850 let rows = self.results.as_ref().unwrap();
4851 if self.cursor < rows.len() {
4852 let row = rows[self.cursor].clone();
4853 self.cursor += 1;
4854 Ok(Some(row))
4855 } else {
4856 Ok(None)
4857 }
4858 }
4859}
4860
4861struct GroupState {
4862 key_values: Vec<Value>,
4863 agg_states: Vec<AggState>,
4864 distinct_seen: Vec<Option<HashSet<String>>>,
4865}
4866
4867enum AggState {
4868 Count(i64),
4869 Sum {
4870 int_part: i64,
4871 float_part: f64,
4872 is_float: bool,
4873 },
4874 Avg {
4875 total: f64,
4876 count: i64,
4877 },
4878 Min(Option<Value>),
4879 Max(Option<Value>),
4880 Collect(Vec<Value>),
4881 StDev {
4882 sum: f64,
4883 sum_sq: f64,
4884 count: i64,
4885 },
4886 StDevP {
4887 sum: f64,
4888 sum_sq: f64,
4889 count: i64,
4890 },
4891 PercentileDisc {
4892 items: Vec<Value>,
4893 percentile: Option<f64>,
4894 },
4895 PercentileCont {
4896 items: Vec<Value>,
4897 percentile: Option<f64>,
4898 },
4899}
4900
4901impl AggState {
4902 fn initial(func: AggregateFn) -> Self {
4903 match func {
4904 AggregateFn::Count => AggState::Count(0),
4905 AggregateFn::Sum => AggState::Sum {
4906 int_part: 0,
4907 float_part: 0.0,
4908 is_float: false,
4909 },
4910 AggregateFn::Avg => AggState::Avg {
4911 total: 0.0,
4912 count: 0,
4913 },
4914 AggregateFn::Min => AggState::Min(None),
4915 AggregateFn::Max => AggState::Max(None),
4916 AggregateFn::Collect => AggState::Collect(Vec::new()),
4917 AggregateFn::StDev => AggState::StDev {
4918 sum: 0.0,
4919 sum_sq: 0.0,
4920 count: 0,
4921 },
4922 AggregateFn::StDevP => AggState::StDevP {
4923 sum: 0.0,
4924 sum_sq: 0.0,
4925 count: 0,
4926 },
4927 AggregateFn::PercentileDisc => AggState::PercentileDisc {
4928 items: Vec::new(),
4929 percentile: None,
4930 },
4931 AggregateFn::PercentileCont => AggState::PercentileCont {
4932 items: Vec::new(),
4933 percentile: None,
4934 },
4935 }
4936 }
4937
4938 fn update(&mut self, arg: &AggregateArg, ctx: &EvalCtx) -> Result<()> {
4939 match self {
4940 AggState::Count(c) => match arg {
4941 AggregateArg::Star => *c += 1,
4942 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => {
4943 if !matches!(eval_expr(e, ctx)?, Value::Null) {
4944 *c += 1;
4945 }
4946 }
4947 },
4948 AggState::Sum {
4949 int_part,
4950 float_part,
4951 is_float,
4952 } => {
4953 let v = expr_arg_value(arg, ctx)?;
4954 match v {
4955 Value::Null => {}
4956 Value::Property(Property::Int64(i)) => *int_part += i,
4957 Value::Property(Property::Float64(f)) => {
4958 *float_part += f;
4959 *is_float = true;
4960 }
4961 _ => return Err(Error::AggregateTypeError),
4962 }
4963 }
4964 AggState::Avg { total, count } => {
4965 let v = expr_arg_value(arg, ctx)?;
4966 match v {
4967 Value::Null => {}
4968 Value::Property(Property::Int64(i)) => {
4969 *total += i as f64;
4970 *count += 1;
4971 }
4972 Value::Property(Property::Float64(f)) => {
4973 *total += f;
4974 *count += 1;
4975 }
4976 _ => return Err(Error::AggregateTypeError),
4977 }
4978 }
4979 AggState::Min(slot) => {
4980 let v = expr_arg_value(arg, ctx)?;
4987 if matches!(v, Value::Null | Value::Property(Property::Null)) {
4988 } else {
4990 match slot {
4991 None => *slot = Some(v),
4992 Some(cur) => {
4993 if compare_values(&v, cur) == Ordering::Less {
4994 *cur = v;
4995 }
4996 }
4997 }
4998 }
4999 }
5000 AggState::Max(slot) => {
5001 let v = expr_arg_value(arg, ctx)?;
5002 if matches!(v, Value::Null | Value::Property(Property::Null)) {
5003 } else {
5005 match slot {
5006 None => *slot = Some(v),
5007 Some(cur) => {
5008 if compare_values(&v, cur) == Ordering::Greater {
5009 *cur = v;
5010 }
5011 }
5012 }
5013 }
5014 }
5015 AggState::Collect(items) => {
5016 let v = expr_arg_value(arg, ctx)?;
5017 if !matches!(v, Value::Null) {
5018 items.push(v);
5019 }
5020 }
5021 AggState::PercentileDisc { items, .. } | AggState::PercentileCont { items, .. } => {
5022 let v = expr_arg_value(arg, ctx)?;
5023 if !matches!(v, Value::Null) {
5024 items.push(v);
5025 }
5026 }
5027 AggState::StDev { sum, sum_sq, count } | AggState::StDevP { sum, sum_sq, count } => {
5028 let v = expr_arg_value(arg, ctx)?;
5029 match v {
5030 Value::Null => {}
5031 Value::Property(Property::Int64(i)) => {
5032 let f = i as f64;
5033 *sum += f;
5034 *sum_sq += f * f;
5035 *count += 1;
5036 }
5037 Value::Property(Property::Float64(f)) => {
5038 *sum += f;
5039 *sum_sq += f * f;
5040 *count += 1;
5041 }
5042 _ => return Err(Error::AggregateTypeError),
5043 }
5044 }
5045 }
5046 Ok(())
5047 }
5048
5049 fn finalize(&self) -> Value {
5050 match self {
5051 AggState::Count(c) => Value::Property(Property::Int64(*c)),
5052 AggState::Sum {
5053 int_part,
5054 float_part,
5055 is_float,
5056 } => {
5057 if *is_float {
5058 Value::Property(Property::Float64(*float_part + *int_part as f64))
5059 } else {
5060 Value::Property(Property::Int64(*int_part))
5061 }
5062 }
5063 AggState::Avg { total, count } => {
5064 if *count == 0 {
5065 Value::Null
5066 } else {
5067 Value::Property(Property::Float64(*total / *count as f64))
5068 }
5069 }
5070 AggState::Min(slot) | AggState::Max(slot) => match slot {
5071 Some(v) => v.clone(),
5072 None => Value::Null,
5073 },
5074 AggState::Collect(items) => Value::List(items.clone()),
5075 AggState::StDevP { sum, sum_sq, count } => {
5076 if *count == 0 {
5077 Value::Property(Property::Float64(0.0))
5078 } else {
5079 let n = *count as f64;
5080 let variance = *sum_sq / n - (*sum / n).powi(2);
5081 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5082 }
5083 }
5084 AggState::StDev { sum, sum_sq, count } => {
5085 if *count < 2 {
5086 Value::Property(Property::Float64(0.0))
5087 } else {
5088 let n = *count as f64;
5089 let variance = (*sum_sq - *sum * *sum / n) / (n - 1.0);
5090 Value::Property(Property::Float64(variance.max(0.0).sqrt()))
5091 }
5092 }
5093 AggState::PercentileDisc { items, percentile } => {
5094 percentile_disc(items, percentile.unwrap_or(0.0))
5095 }
5096 AggState::PercentileCont { items, percentile } => {
5097 percentile_cont(items, percentile.unwrap_or(0.0))
5098 }
5099 }
5100 }
5101}
5102
5103fn expr_arg_value(arg: &AggregateArg, ctx: &EvalCtx) -> Result<Value> {
5104 match arg {
5105 AggregateArg::Star => Err(Error::AggregateTypeError),
5106 AggregateArg::Expr(e) | AggregateArg::DistinctExpr(e) => eval_expr(e, ctx),
5107 }
5108}
5109
5110fn value_to_f64(v: &Value) -> f64 {
5114 match v {
5115 Value::Property(Property::Int64(i)) => *i as f64,
5116 Value::Property(Property::Float64(f)) => *f,
5117 _ => f64::NAN,
5118 }
5119}
5120
5121fn percentile_disc(items: &[Value], p: f64) -> Value {
5126 let mut nums: Vec<(f64, Value)> = items
5127 .iter()
5128 .map(|v| (value_to_f64(v), v.clone()))
5129 .filter(|(f, _)| !f.is_nan())
5130 .collect();
5131 if nums.is_empty() {
5132 return Value::Null;
5133 }
5134 nums.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
5135 let p = p.clamp(0.0, 1.0);
5136 let n = nums.len();
5137 let idx = ((p * n as f64).ceil() as isize - 1).max(0) as usize;
5139 nums[idx.min(n - 1)].1.clone()
5140}
5141
5142fn percentile_cont(items: &[Value], p: f64) -> Value {
5146 let mut nums: Vec<f64> = items
5147 .iter()
5148 .map(value_to_f64)
5149 .filter(|f| !f.is_nan())
5150 .collect();
5151 if nums.is_empty() {
5152 return Value::Null;
5153 }
5154 nums.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
5155 let p = p.clamp(0.0, 1.0);
5156 let n = nums.len();
5157 if n == 1 {
5158 return Value::Property(Property::Float64(nums[0]));
5159 }
5160 let pos = p * (n as f64 - 1.0);
5161 let lo = pos.floor() as usize;
5162 let hi = pos.ceil() as usize;
5163 let frac = pos - lo as f64;
5164 let v = nums[lo] + (nums[hi] - nums[lo]) * frac;
5165 Value::Property(Property::Float64(v))
5166}
5167
5168struct SkipOp {
5169 input: Box<dyn Operator>,
5170 count_expr: Expr,
5171 remaining: Option<i64>,
5172}
5173
5174impl SkipOp {
5175 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5176 Self {
5177 input,
5178 count_expr,
5179 remaining: None,
5180 }
5181 }
5182}
5183
5184impl Operator for SkipOp {
5185 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5186 if self.remaining.is_none() {
5187 let empty = Row::new();
5188 let ectx = ctx.eval_ctx(&empty);
5189 let val = eval_expr(&self.count_expr, &ectx)?;
5190 self.remaining = Some(expr_to_count(val)?);
5191 }
5192 let rem = self.remaining.as_mut().unwrap();
5193 while *rem > 0 {
5194 if self.input.next(ctx)?.is_none() {
5195 return Ok(None);
5196 }
5197 *rem -= 1;
5198 }
5199 self.input.next(ctx)
5200 }
5201}
5202
5203struct LimitOp {
5204 input: Box<dyn Operator>,
5205 count_expr: Expr,
5206 remaining: Option<i64>,
5207}
5208
5209impl LimitOp {
5210 fn new(input: Box<dyn Operator>, count_expr: Expr) -> Self {
5211 Self {
5212 input,
5213 count_expr,
5214 remaining: None,
5215 }
5216 }
5217}
5218
5219impl Operator for LimitOp {
5220 fn next(&mut self, ctx: &ExecCtx) -> Result<Option<Row>> {
5221 if self.remaining.is_none() {
5222 let empty = Row::new();
5223 let ectx = ctx.eval_ctx(&empty);
5224 let val = eval_expr(&self.count_expr, &ectx)?;
5225 self.remaining = Some(expr_to_count(val)?);
5226 }
5227 let rem = self.remaining.as_mut().unwrap();
5228 if *rem <= 0 {
5229 return Ok(None);
5230 }
5231 match self.input.next(ctx)? {
5232 Some(row) => {
5233 *rem -= 1;
5234 Ok(Some(row))
5235 }
5236 None => Ok(None),
5237 }
5238 }
5239}
5240
5241fn expr_to_count(val: Value) -> Result<i64> {
5242 match val {
5243 Value::Null | Value::Property(Property::Null) => Ok(0),
5244 Value::Property(Property::Int64(n)) if n >= 0 => Ok(n),
5245 _ => Err(Error::TypeMismatch),
5250 }
5251}