1use super::{
9 ConstraintValidator, ExpressionPredicate, Operator, OperatorResult, PropertySource,
10 SessionContext,
11};
12use crate::execution::chunk::{DataChunk, DataChunkBuilder};
13use crate::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
14use grafeo_common::types::{
15 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
16};
17use std::sync::Arc;
18
19pub struct MergeConfig {
21 pub variable: String,
23 pub labels: Vec<String>,
25 pub match_properties: Vec<(String, PropertySource)>,
27 pub on_create_properties: Vec<(String, PropertySource)>,
29 pub on_match_properties: Vec<(String, PropertySource)>,
31 pub output_schema: Vec<LogicalType>,
33 pub output_column: usize,
35 pub bound_variable_column: Option<usize>,
39}
40
41pub struct MergeOperator {
49 store: Arc<dyn GraphStoreMut>,
51 input: Option<Box<dyn Operator>>,
53 config: MergeConfig,
55 executed: bool,
57 viewing_epoch: Option<EpochId>,
59 transaction_id: Option<TransactionId>,
61 validator: Option<Arc<dyn ConstraintValidator>>,
63 search_store: Option<Arc<dyn GraphStoreSearch>>,
67 session_context: SessionContext,
69}
70
71impl MergeOperator {
72 pub fn new(
74 store: Arc<dyn GraphStoreMut>,
75 input: Option<Box<dyn Operator>>,
76 config: MergeConfig,
77 ) -> Self {
78 Self {
79 store,
80 input,
81 config,
82 executed: false,
83 viewing_epoch: None,
84 transaction_id: None,
85 validator: None,
86 search_store: None,
87 session_context: SessionContext::default(),
88 }
89 }
90
91 #[must_use]
93 pub fn variable(&self) -> &str {
94 &self.config.variable
95 }
96
97 pub fn with_transaction_context(
99 mut self,
100 epoch: EpochId,
101 transaction_id: Option<TransactionId>,
102 ) -> Self {
103 self.viewing_epoch = Some(epoch);
104 self.transaction_id = transaction_id;
105 self
106 }
107
108 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
110 self.validator = Some(validator);
111 self
112 }
113
114 #[must_use]
117 pub fn with_search_store(mut self, search_store: Arc<dyn GraphStoreSearch>) -> Self {
118 self.search_store = Some(search_store);
119 self
120 }
121
122 #[must_use]
124 pub fn with_session_context(mut self, context: SessionContext) -> Self {
125 self.session_context = context;
126 self
127 }
128
129 fn resolve_properties(
135 props: &[(String, PropertySource)],
136 chunk: Option<&DataChunk>,
137 row: usize,
138 store: &dyn GraphStore,
139 ) -> Vec<(String, Value)> {
140 props
141 .iter()
142 .map(|(name, source)| {
143 let value = if let Some(chunk) = chunk {
144 source.resolve(chunk, row, store)
145 } else {
146 match source {
148 PropertySource::Constant(v) => v.clone(),
149 _ => Value::Null,
150 }
151 };
152 (name.clone(), value)
153 })
154 .collect()
155 }
156
157 fn has_expression_source(props: &[(String, PropertySource)]) -> bool {
160 props
161 .iter()
162 .any(|(_, src)| matches!(src, PropertySource::Expression { .. }))
163 }
164
165 fn build_augmented_node_chunk(
171 &self,
172 chunk: Option<&DataChunk>,
173 row: usize,
174 merged_node: NodeId,
175 ) -> DataChunk {
176 let mut builder = DataChunkBuilder::with_capacity(&self.config.output_schema, 1);
177 if let Some(input) = chunk {
178 for col_idx in 0..input.column_count() {
179 let val = input
180 .column(col_idx)
181 .and_then(|c| c.get_value(row))
182 .unwrap_or(Value::Null);
183 if let Some(dst) = builder.column_mut(col_idx) {
184 dst.push_value(val);
185 }
186 }
187 }
188 if let Some(dst) = builder.column_mut(self.config.output_column) {
189 dst.push_node_id(merged_node);
190 }
191 builder.advance_row();
192 builder.finish()
193 }
194
195 fn resolve_action_properties(
202 &self,
203 props: &[(String, PropertySource)],
204 chunk: Option<&DataChunk>,
205 row: usize,
206 merged_node: NodeId,
207 ) -> Result<Vec<(String, Value)>, super::OperatorError> {
208 if !Self::has_expression_source(props) {
209 return Ok(Self::resolve_properties(
212 props,
213 chunk,
214 row,
215 self.store.as_ref(),
216 ));
217 }
218
219 let augmented = self.build_augmented_node_chunk(chunk, row, merged_node);
220 let mut out = Vec::with_capacity(props.len());
221 for (name, source) in props {
222 let value = match source {
223 PropertySource::Expression {
224 expr,
225 variable_columns,
226 } => {
227 let search_store = self.search_store.as_ref().ok_or_else(|| {
228 super::OperatorError::Execution(
229 "MERGE expression source requires search store; planner did not attach one"
230 .to_string(),
231 )
232 })?;
233 let mut predicate = ExpressionPredicate::new(
234 (**expr).clone(),
235 variable_columns.clone(),
236 Arc::clone(search_store),
237 )
238 .with_session_context(self.session_context.clone());
239 if let Some(epoch) = self.viewing_epoch {
240 predicate = predicate.with_transaction_context(epoch, self.transaction_id);
241 }
242 predicate.eval_at(&augmented, 0).unwrap_or(Value::Null)
243 }
244 _ => source.resolve(&augmented, 0, self.store.as_ref()),
245 };
246 out.push((name.clone(), value));
247 }
248 Ok(out)
249 }
250
251 fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
253 let use_index = resolved_match_props
256 .iter()
257 .any(|(k, v)| !v.is_null() && self.store.has_property_index(k));
258
259 let candidates: Vec<NodeId> = if use_index {
260 let conditions: Vec<(&str, Value)> = resolved_match_props
261 .iter()
262 .filter(|(_, v)| !v.is_null())
263 .map(|(k, v)| (k.as_str(), v.clone()))
264 .collect();
265 self.store.find_nodes_by_properties(&conditions)
266 } else if let Some(first_label) = self.config.labels.first() {
267 self.store.nodes_by_label(first_label)
268 } else {
269 self.store.node_ids()
270 };
271
272 for node_id in candidates {
273 let node_opt = match (self.viewing_epoch, self.transaction_id) {
280 (Some(epoch), Some(tid)) => self.store.get_node_versioned(node_id, epoch, tid),
281 _ => self.store.get_node(node_id),
282 };
283 let Some(node) = node_opt else { continue };
284
285 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
286 if !has_all_labels {
287 continue;
288 }
289
290 let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
291 let prop = node.properties.get(&PropertyKey::new(key.as_str()));
292 if expected_value.is_null() {
293 prop.map_or(true, |v| v.is_null())
295 } else {
296 prop.is_some_and(|v| v == expected_value)
297 }
298 });
299
300 if has_all_props {
301 return Some(node_id);
302 }
303 }
304
305 None
306 }
307
308 fn merge_node_props(
311 resolved_match_props: &[(String, Value)],
312 resolved_create_props: &[(String, Value)],
313 ) -> Vec<(String, Value)> {
314 let mut merged: Vec<(String, Value)> = resolved_match_props.to_vec();
315 for (k, v) in resolved_create_props {
316 if let Some(existing) = merged.iter_mut().find(|(key, _)| key == k) {
317 existing.1 = v.clone();
318 } else {
319 merged.push((k.clone(), v.clone()));
320 }
321 }
322 merged
323 }
324
325 fn write_node_props(&self, id: NodeId, props: &[(PropertyKey, Value)]) {
331 if let Some(tid) = self.transaction_id {
332 for (key, value) in props {
333 self.store
334 .set_node_property_versioned(id, key.as_str(), value.clone(), tid);
335 }
336 } else {
337 for (key, value) in props {
338 self.store
339 .set_node_property(id, key.as_str(), value.clone());
340 }
341 }
342 }
343
344 fn store_create_node(&self, label_refs: &[&str]) -> NodeId {
351 let epoch = self
352 .viewing_epoch
353 .unwrap_or_else(|| self.store.current_epoch());
354 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
355 self.store.create_node_versioned(label_refs, epoch, tx)
356 }
357
358 fn create_node(
360 &self,
361 resolved_match_props: &[(String, Value)],
362 resolved_create_props: &[(String, Value)],
363 ) -> Result<NodeId, super::OperatorError> {
364 let all_props = Self::merge_node_props(resolved_match_props, resolved_create_props);
365
366 if let Some(ref validator) = self.validator {
368 validator.validate_node_labels_allowed(&self.config.labels)?;
369 for (name, value) in &all_props {
370 validator.validate_node_property(&self.config.labels, name, value)?;
371 validator.check_unique_node_property(&self.config.labels, name, value)?;
372 }
373 validator.validate_node_complete(&self.config.labels, &all_props)?;
374 }
375
376 let prop_pairs: Vec<(PropertyKey, Value)> = all_props
377 .into_iter()
378 .map(|(k, v)| (PropertyKey::new(k.as_str()), v))
379 .collect();
380
381 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
382 let id = self.store_create_node(&labels);
383 self.write_node_props(id, &prop_pairs);
384 Ok(id)
385 }
386
387 fn create_node_phase_one(
399 &self,
400 resolved_match_props: &[(String, Value)],
401 ) -> Result<NodeId, super::OperatorError> {
402 if let Some(ref validator) = self.validator {
403 validator.validate_node_labels_allowed(&self.config.labels)?;
404 for (name, value) in resolved_match_props {
405 validator.validate_node_property(&self.config.labels, name, value)?;
406 validator.check_unique_node_property(&self.config.labels, name, value)?;
407 }
408 }
409
410 let prop_pairs: Vec<(PropertyKey, Value)> = resolved_match_props
411 .iter()
412 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
413 .collect();
414
415 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
416 let id = self.store_create_node(&labels);
417 self.write_node_props(id, &prop_pairs);
418 Ok(id)
419 }
420
421 fn validate_on_create_phase_two(
429 &self,
430 resolved_match_props: &[(String, Value)],
431 resolved_create_props: &[(String, Value)],
432 ) -> Result<(), super::OperatorError> {
433 let Some(ref validator) = self.validator else {
434 return Ok(());
435 };
436 for (name, value) in resolved_create_props {
437 validator.validate_node_property(&self.config.labels, name, value)?;
438 validator.check_unique_node_property(&self.config.labels, name, value)?;
439 }
440 let all_props = Self::merge_node_props(resolved_match_props, resolved_create_props);
441 validator.validate_node_complete(&self.config.labels, &all_props)?;
442 Ok(())
443 }
444
445 fn merge_node_for_row(
447 &self,
448 chunk: Option<&DataChunk>,
449 row: usize,
450 ) -> Result<NodeId, super::OperatorError> {
451 let store_ref: &dyn GraphStore = self.store.as_ref();
452 let resolved_match =
455 Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
456
457 if let Some(existing_id) = self.find_matching_node(&resolved_match) {
458 let resolved_on_match = self.resolve_action_properties(
461 &self.config.on_match_properties,
462 chunk,
463 row,
464 existing_id,
465 )?;
466 self.apply_on_match(existing_id, &resolved_on_match)?;
467 Ok(existing_id)
468 } else if Self::has_expression_source(&self.config.on_create_properties) {
469 let new_id = self.create_node_phase_one(&resolved_match)?;
479 let resolved_on_create = self.resolve_action_properties(
480 &self.config.on_create_properties,
481 chunk,
482 row,
483 new_id,
484 )?;
485 self.validate_on_create_phase_two(&resolved_match, &resolved_on_create)?;
486 self.apply_on_match(new_id, &resolved_on_create)?;
487 Ok(new_id)
488 } else {
489 let resolved_on_create =
491 Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
492 self.create_node(&resolved_match, &resolved_on_create)
493 }
494 }
495
496 fn apply_on_match(
498 &self,
499 node_id: NodeId,
500 resolved_on_match: &[(String, Value)],
501 ) -> Result<(), super::OperatorError> {
502 for (key, value) in resolved_on_match {
503 if let Some(ref validator) = self.validator {
504 validator.validate_node_property(&self.config.labels, key, value)?;
505 }
506 if let Some(tid) = self.transaction_id {
507 self.store
508 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
509 } else {
510 self.store
511 .set_node_property(node_id, key.as_str(), value.clone());
512 }
513 }
514 Ok(())
515 }
516}
517
518impl Operator for MergeOperator {
519 fn next(&mut self) -> OperatorResult {
520 if let Some(ref mut input) = self.input {
523 if let Some(chunk) = input.next()? {
524 let mut builder =
525 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
526
527 for row in chunk.selected_indices() {
528 if let Some(bound_col) = self.config.bound_variable_column {
530 let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
531 if is_null {
532 return Err(super::OperatorError::TypeMismatch {
533 expected: format!(
534 "non-null node for MERGE variable '{}'",
535 self.config.variable
536 ),
537 found: "NULL".to_string(),
538 });
539 }
540 }
541
542 let node_id = self.merge_node_for_row(Some(&chunk), row)?;
544
545 for col_idx in 0..chunk.column_count() {
547 if let (Some(src), Some(dst)) =
548 (chunk.column(col_idx), builder.column_mut(col_idx))
549 {
550 if let Some(val) = src.get_value(row) {
551 dst.push_value(val);
552 } else {
553 dst.push_value(Value::Null);
554 }
555 }
556 }
557
558 if let Some(dst) = builder.column_mut(self.config.output_column) {
560 dst.push_node_id(node_id);
561 }
562
563 builder.advance_row();
564 }
565
566 return Ok(Some(builder.finish()));
567 }
568 return Ok(None);
569 }
570
571 if self.executed {
573 return Ok(None);
574 }
575 self.executed = true;
576
577 let node_id = self.merge_node_for_row(None, 0)?;
578
579 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
580 if let Some(dst) = builder.column_mut(self.config.output_column) {
581 dst.push_node_id(node_id);
582 }
583 builder.advance_row();
584
585 Ok(Some(builder.finish()))
586 }
587
588 fn reset(&mut self) {
589 self.executed = false;
590 if let Some(ref mut input) = self.input {
591 input.reset();
592 }
593 }
594
595 fn name(&self) -> &'static str {
596 "Merge"
597 }
598
599 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
600 self
601 }
602}
603
604pub struct MergeRelationshipConfig {
606 pub source_column: usize,
608 pub target_column: usize,
610 pub source_variable: String,
612 pub target_variable: String,
614 pub edge_type: String,
616 pub match_properties: Vec<(String, PropertySource)>,
618 pub on_create_properties: Vec<(String, PropertySource)>,
620 pub on_match_properties: Vec<(String, PropertySource)>,
622 pub output_schema: Vec<LogicalType>,
624 pub edge_output_column: usize,
626}
627
628pub struct MergeRelationshipOperator {
635 store: Arc<dyn GraphStoreMut>,
637 input: Box<dyn Operator>,
639 config: MergeRelationshipConfig,
641 viewing_epoch: Option<EpochId>,
643 transaction_id: Option<TransactionId>,
645 validator: Option<Arc<dyn ConstraintValidator>>,
647 search_store: Option<Arc<dyn GraphStoreSearch>>,
649 session_context: SessionContext,
651}
652
653impl MergeRelationshipOperator {
654 pub fn new(
656 store: Arc<dyn GraphStoreMut>,
657 input: Box<dyn Operator>,
658 config: MergeRelationshipConfig,
659 ) -> Self {
660 Self {
661 store,
662 input,
663 config,
664 viewing_epoch: None,
665 transaction_id: None,
666 validator: None,
667 search_store: None,
668 session_context: SessionContext::default(),
669 }
670 }
671
672 pub fn with_transaction_context(
674 mut self,
675 epoch: EpochId,
676 transaction_id: Option<TransactionId>,
677 ) -> Self {
678 self.viewing_epoch = Some(epoch);
679 self.transaction_id = transaction_id;
680 self
681 }
682
683 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
685 self.validator = Some(validator);
686 self
687 }
688
689 #[must_use]
691 pub fn with_search_store(mut self, search_store: Arc<dyn GraphStoreSearch>) -> Self {
692 self.search_store = Some(search_store);
693 self
694 }
695
696 #[must_use]
698 pub fn with_session_context(mut self, context: SessionContext) -> Self {
699 self.session_context = context;
700 self
701 }
702
703 fn build_augmented_edge_chunk(
706 &self,
707 chunk: &DataChunk,
708 row: usize,
709 merged_edge: EdgeId,
710 ) -> DataChunk {
711 let mut builder = DataChunkBuilder::with_capacity(&self.config.output_schema, 1);
712 for col_idx in 0..chunk.column_count() {
713 let val = chunk
714 .column(col_idx)
715 .and_then(|c| c.get_value(row))
716 .unwrap_or(Value::Null);
717 if let Some(dst) = builder.column_mut(col_idx) {
718 dst.push_value(val);
719 }
720 }
721 if let Some(dst) = builder.column_mut(self.config.edge_output_column) {
722 dst.push_edge_id(merged_edge);
723 }
724 builder.advance_row();
725 builder.finish()
726 }
727
728 fn resolve_action_properties(
732 &self,
733 props: &[(String, PropertySource)],
734 chunk: &DataChunk,
735 row: usize,
736 merged_edge: EdgeId,
737 ) -> Result<Vec<(String, Value)>, super::OperatorError> {
738 if !MergeOperator::has_expression_source(props) {
739 return Ok(MergeOperator::resolve_properties(
740 props,
741 Some(chunk),
742 row,
743 self.store.as_ref(),
744 ));
745 }
746
747 let augmented = self.build_augmented_edge_chunk(chunk, row, merged_edge);
748 let mut out = Vec::with_capacity(props.len());
749 for (name, source) in props {
750 let value = match source {
751 PropertySource::Expression {
752 expr,
753 variable_columns,
754 } => {
755 let search_store = self.search_store.as_ref().ok_or_else(|| {
756 super::OperatorError::Execution(
757 "MERGE expression source requires search store; planner did not attach one"
758 .to_string(),
759 )
760 })?;
761 let mut predicate = ExpressionPredicate::new(
762 (**expr).clone(),
763 variable_columns.clone(),
764 Arc::clone(search_store),
765 )
766 .with_session_context(self.session_context.clone());
767 if let Some(epoch) = self.viewing_epoch {
768 predicate = predicate.with_transaction_context(epoch, self.transaction_id);
769 }
770 predicate.eval_at(&augmented, 0).unwrap_or(Value::Null)
771 }
772 _ => source.resolve(&augmented, 0, self.store.as_ref()),
773 };
774 out.push((name.clone(), value));
775 }
776 Ok(out)
777 }
778
779 fn find_matching_edge(
781 &self,
782 src: NodeId,
783 dst: NodeId,
784 resolved_match_props: &[(String, Value)],
785 ) -> Option<EdgeId> {
786 use crate::graph::Direction;
787
788 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
789 if target != dst {
790 continue;
791 }
792
793 if let Some(edge) = self.store.get_edge(edge_id) {
794 if edge.edge_type.as_str() != self.config.edge_type {
795 continue;
796 }
797
798 let has_all_props = resolved_match_props
799 .iter()
800 .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
801
802 if has_all_props {
803 return Some(edge_id);
804 }
805 }
806 }
807
808 None
809 }
810
811 fn store_create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
815 let epoch = self
816 .viewing_epoch
817 .unwrap_or_else(|| self.store.current_epoch());
818 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
819 self.store
820 .create_edge_versioned(src, dst, &self.config.edge_type, epoch, tx)
821 }
822
823 fn write_edge_props(&self, id: EdgeId, props: &[(PropertyKey, Value)]) {
827 if let Some(tid) = self.transaction_id {
828 for (key, value) in props {
829 self.store
830 .set_edge_property_versioned(id, key.as_str(), value.clone(), tid);
831 }
832 } else {
833 for (key, value) in props {
834 self.store
835 .set_edge_property(id, key.as_str(), value.clone());
836 }
837 }
838 }
839
840 fn create_edge(
842 &self,
843 src: NodeId,
844 dst: NodeId,
845 resolved_match_props: &[(String, Value)],
846 resolved_create_props: &[(String, Value)],
847 ) -> Result<EdgeId, super::OperatorError> {
848 let all_props =
849 MergeOperator::merge_node_props(resolved_match_props, resolved_create_props);
850
851 if let Some(ref validator) = self.validator {
853 validator.validate_edge_type_allowed(&self.config.edge_type)?;
854 for (name, value) in &all_props {
855 validator.validate_edge_property(&self.config.edge_type, name, value)?;
856 }
857 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
858 }
859
860 let prop_pairs: Vec<(PropertyKey, Value)> = all_props
861 .into_iter()
862 .map(|(k, v)| (PropertyKey::new(k.as_str()), v))
863 .collect();
864
865 let id = self.store_create_edge(src, dst);
866 self.write_edge_props(id, &prop_pairs);
867 Ok(id)
868 }
869
870 fn create_edge_phase_one(
879 &self,
880 src: NodeId,
881 dst: NodeId,
882 resolved_match_props: &[(String, Value)],
883 ) -> Result<EdgeId, super::OperatorError> {
884 if let Some(ref validator) = self.validator {
885 validator.validate_edge_type_allowed(&self.config.edge_type)?;
886 for (name, value) in resolved_match_props {
887 validator.validate_edge_property(&self.config.edge_type, name, value)?;
888 }
889 }
890
891 let prop_pairs: Vec<(PropertyKey, Value)> = resolved_match_props
892 .iter()
893 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
894 .collect();
895
896 let id = self.store_create_edge(src, dst);
897 self.write_edge_props(id, &prop_pairs);
898 Ok(id)
899 }
900
901 fn validate_on_create_edge_phase_two(
905 &self,
906 resolved_match_props: &[(String, Value)],
907 resolved_create_props: &[(String, Value)],
908 ) -> Result<(), super::OperatorError> {
909 let Some(ref validator) = self.validator else {
910 return Ok(());
911 };
912 for (name, value) in resolved_create_props {
913 validator.validate_edge_property(&self.config.edge_type, name, value)?;
914 }
915 let all_props =
916 MergeOperator::merge_node_props(resolved_match_props, resolved_create_props);
917 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
918 Ok(())
919 }
920
921 fn apply_on_match_edge(
923 &self,
924 edge_id: EdgeId,
925 resolved_on_match: &[(String, Value)],
926 ) -> Result<(), super::OperatorError> {
927 for (key, value) in resolved_on_match {
928 if let Some(ref validator) = self.validator {
929 validator.validate_edge_property(&self.config.edge_type, key, value)?;
930 }
931 if let Some(tid) = self.transaction_id {
932 self.store
933 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
934 } else {
935 self.store
936 .set_edge_property(edge_id, key.as_str(), value.clone());
937 }
938 }
939 Ok(())
940 }
941}
942
943impl Operator for MergeRelationshipOperator {
944 fn next(&mut self) -> OperatorResult {
945 use super::OperatorError;
946
947 if let Some(chunk) = self.input.next()? {
948 let mut builder =
949 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
950
951 for row in chunk.selected_indices() {
952 let src_val = chunk
953 .column(self.config.source_column)
954 .and_then(|c| c.get_node_id(row))
955 .ok_or_else(|| OperatorError::TypeMismatch {
956 expected: format!(
957 "non-null node for MERGE variable '{}'",
958 self.config.source_variable
959 ),
960 found: "NULL".to_string(),
961 })?;
962
963 let dst_val = chunk
964 .column(self.config.target_column)
965 .and_then(|c| c.get_node_id(row))
966 .ok_or_else(|| OperatorError::TypeMismatch {
967 expected: format!(
968 "non-null node for MERGE variable '{}'",
969 self.config.target_variable
970 ),
971 found: "None".to_string(),
972 })?;
973
974 let store_ref: &dyn GraphStore = self.store.as_ref();
975 let resolved_match = MergeOperator::resolve_properties(
976 &self.config.match_properties,
977 Some(&chunk),
978 row,
979 store_ref,
980 );
981
982 let edge_id = if let Some(existing) =
983 self.find_matching_edge(src_val, dst_val, &resolved_match)
984 {
985 let resolved_on_match = self.resolve_action_properties(
986 &self.config.on_match_properties,
987 &chunk,
988 row,
989 existing,
990 )?;
991 self.apply_on_match_edge(existing, &resolved_on_match)?;
992 existing
993 } else if MergeOperator::has_expression_source(&self.config.on_create_properties) {
994 let new_id = self.create_edge_phase_one(src_val, dst_val, &resolved_match)?;
1000 let resolved_on_create = self.resolve_action_properties(
1001 &self.config.on_create_properties,
1002 &chunk,
1003 row,
1004 new_id,
1005 )?;
1006 self.validate_on_create_edge_phase_two(&resolved_match, &resolved_on_create)?;
1007 self.apply_on_match_edge(new_id, &resolved_on_create)?;
1008 new_id
1009 } else {
1010 let resolved_on_create = MergeOperator::resolve_properties(
1011 &self.config.on_create_properties,
1012 Some(&chunk),
1013 row,
1014 store_ref,
1015 );
1016 self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
1017 };
1018
1019 for col_idx in 0..self.config.output_schema.len() {
1021 if col_idx == self.config.edge_output_column {
1022 if let Some(dst_col) = builder.column_mut(col_idx) {
1023 dst_col.push_edge_id(edge_id);
1024 }
1025 } else if let (Some(src_col), Some(dst_col)) =
1026 (chunk.column(col_idx), builder.column_mut(col_idx))
1027 && let Some(val) = src_col.get_value(row)
1028 {
1029 dst_col.push_value(val);
1030 }
1031 }
1032
1033 builder.advance_row();
1034 }
1035
1036 return Ok(Some(builder.finish()));
1037 }
1038
1039 Ok(None)
1040 }
1041
1042 fn reset(&mut self) {
1043 self.input.reset();
1044 }
1045
1046 fn name(&self) -> &'static str {
1047 "MergeRelationship"
1048 }
1049
1050 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1051 self
1052 }
1053}
1054
1055#[cfg(all(test, feature = "lpg"))]
1056mod tests {
1057 use super::*;
1058 use crate::graph::lpg::LpgStore;
1059
1060 fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
1061 props
1062 .into_iter()
1063 .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
1064 .collect()
1065 }
1066
1067 #[test]
1068 fn test_merge_creates_new_node() {
1069 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1070
1071 let mut merge = MergeOperator::new(
1073 Arc::clone(&store),
1074 None,
1075 MergeConfig {
1076 variable: "n".to_string(),
1077 labels: vec!["Person".to_string()],
1078 match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
1079 on_create_properties: vec![],
1080 on_match_properties: vec![],
1081 output_schema: vec![LogicalType::Node],
1082 output_column: 0,
1083 bound_variable_column: None,
1084 },
1085 );
1086
1087 let result = merge.next().unwrap();
1088 assert!(result.is_some());
1089
1090 let nodes = store.nodes_by_label("Person");
1092 assert_eq!(nodes.len(), 1);
1093
1094 let node = store.get_node(nodes[0]).unwrap();
1095 assert!(node.has_label("Person"));
1096 assert_eq!(
1097 node.properties.get(&PropertyKey::new("name")),
1098 Some(&Value::String("Alix".into()))
1099 );
1100 }
1101
1102 #[test]
1103 fn test_merge_matches_existing_node() {
1104 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1105
1106 store.create_node_with_props(
1108 &["Person"],
1109 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
1110 );
1111
1112 let mut merge = MergeOperator::new(
1114 Arc::clone(&store),
1115 None,
1116 MergeConfig {
1117 variable: "n".to_string(),
1118 labels: vec!["Person".to_string()],
1119 match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
1120 on_create_properties: vec![],
1121 on_match_properties: vec![],
1122 output_schema: vec![LogicalType::Node],
1123 output_column: 0,
1124 bound_variable_column: None,
1125 },
1126 );
1127
1128 let result = merge.next().unwrap();
1129 assert!(result.is_some());
1130
1131 let nodes = store.nodes_by_label("Person");
1133 assert_eq!(nodes.len(), 1);
1134 }
1135
1136 #[test]
1137 fn test_merge_with_on_create() {
1138 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1139
1140 let mut merge = MergeOperator::new(
1142 Arc::clone(&store),
1143 None,
1144 MergeConfig {
1145 variable: "n".to_string(),
1146 labels: vec!["Person".to_string()],
1147 match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
1148 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
1149 on_match_properties: vec![],
1150 output_schema: vec![LogicalType::Node],
1151 output_column: 0,
1152 bound_variable_column: None,
1153 },
1154 );
1155
1156 let _ = merge.next().unwrap();
1157
1158 let nodes = store.nodes_by_label("Person");
1160 let node = store.get_node(nodes[0]).unwrap();
1161 assert_eq!(
1162 node.properties.get(&PropertyKey::new("name")),
1163 Some(&Value::String("Vincent".into()))
1164 );
1165 assert_eq!(
1166 node.properties.get(&PropertyKey::new("created")),
1167 Some(&Value::Bool(true))
1168 );
1169 }
1170
1171 #[test]
1172 fn test_merge_with_on_match() {
1173 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1174
1175 let node_id = store.create_node_with_props(
1177 &["Person"],
1178 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
1179 );
1180
1181 let mut merge = MergeOperator::new(
1183 Arc::clone(&store),
1184 None,
1185 MergeConfig {
1186 variable: "n".to_string(),
1187 labels: vec!["Person".to_string()],
1188 match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
1189 on_create_properties: vec![],
1190 on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
1191 output_schema: vec![LogicalType::Node],
1192 output_column: 0,
1193 bound_variable_column: None,
1194 },
1195 );
1196
1197 let _ = merge.next().unwrap();
1198
1199 let node = store.get_node(node_id).unwrap();
1201 assert_eq!(
1202 node.properties.get(&PropertyKey::new("updated")),
1203 Some(&Value::Bool(true))
1204 );
1205 }
1206
1207 #[test]
1208 fn test_merge_uses_property_index() {
1209 let lpg_store = Arc::new(LpgStore::new().unwrap());
1210 lpg_store.create_property_index("name");
1211 assert!(lpg_store.has_property_index("name"));
1212
1213 let store: Arc<dyn GraphStoreMut> = lpg_store;
1215
1216 for i in 0..50u32 {
1217 store.create_node_with_props(
1218 &["Person"],
1219 &[(
1220 PropertyKey::new("name"),
1221 Value::String(format!("person_{i}").into()),
1222 )],
1223 );
1224 }
1225
1226 let target_id = store.create_node_with_props(
1227 &["Person"],
1228 &[(PropertyKey::new("name"), Value::String("Beatrix".into()))],
1229 );
1230
1231 let mut merge = MergeOperator::new(
1233 Arc::clone(&store),
1234 None,
1235 MergeConfig {
1236 variable: "n".to_string(),
1237 labels: vec!["Person".to_string()],
1238 match_properties: const_props(vec![("name", Value::String("Beatrix".into()))]),
1239 on_create_properties: vec![],
1240 on_match_properties: const_props(vec![("found", Value::Bool(true))]),
1241 output_schema: vec![LogicalType::Node],
1242 output_column: 0,
1243 bound_variable_column: None,
1244 },
1245 );
1246
1247 let result = merge.next().unwrap();
1248 assert!(result.is_some());
1249
1250 let node = store.get_node(target_id).unwrap();
1252 assert_eq!(
1253 node.properties.get(&PropertyKey::new("found")),
1254 Some(&Value::Bool(true))
1255 );
1256
1257 let persons = store.nodes_by_label("Person");
1259 assert_eq!(persons.len(), 51);
1260 }
1261
1262 #[test]
1263 fn test_merge_creates_via_index_miss() {
1264 let lpg_store = Arc::new(LpgStore::new().unwrap());
1265 lpg_store.create_property_index("name");
1266
1267 let store: Arc<dyn GraphStoreMut> = lpg_store;
1268
1269 store.create_node_with_props(
1270 &["Person"],
1271 &[(PropertyKey::new("name"), Value::String("Django".into()))],
1272 );
1273
1274 let mut merge = MergeOperator::new(
1276 Arc::clone(&store),
1277 None,
1278 MergeConfig {
1279 variable: "n".to_string(),
1280 labels: vec!["Person".to_string()],
1281 match_properties: const_props(vec![("name", Value::String("Shosanna".into()))]),
1282 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
1283 on_match_properties: vec![],
1284 output_schema: vec![LogicalType::Node],
1285 output_column: 0,
1286 bound_variable_column: None,
1287 },
1288 );
1289
1290 let result = merge.next().unwrap();
1291 assert!(result.is_some());
1292
1293 let persons = store.nodes_by_label("Person");
1294 assert_eq!(persons.len(), 2);
1295
1296 let new_nodes: Vec<_> = persons
1297 .iter()
1298 .filter_map(|&id| store.get_node(id))
1299 .filter(|n| {
1300 n.properties.get(&PropertyKey::new("name"))
1301 == Some(&Value::String("Shosanna".into()))
1302 })
1303 .collect();
1304 assert_eq!(new_nodes.len(), 1);
1305 assert_eq!(
1306 new_nodes[0].properties.get(&PropertyKey::new("created")),
1307 Some(&Value::Bool(true))
1308 );
1309 }
1310
1311 #[test]
1316 fn test_merge_on_match_resolves_expression_against_merged_node() {
1317 use super::super::filter::FilterExpression;
1318 use crate::graph::lpg::LpgStore;
1319 use std::collections::HashMap;
1320
1321 let lpg = Arc::new(LpgStore::new().unwrap());
1322 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1323 let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1324
1325 let id = store.create_node_with_props(
1327 &["Item"],
1328 &[
1329 (PropertyKey::new("val"), Value::Int64(1)),
1330 (PropertyKey::new("x"), Value::Int64(7)),
1331 ],
1332 );
1333
1334 let expr = FilterExpression::Binary {
1336 left: Box::new(FilterExpression::Property {
1337 variable: "n".to_string(),
1338 property: "x".to_string(),
1339 }),
1340 op: super::super::filter::BinaryFilterOp::Add,
1341 right: Box::new(FilterExpression::Literal(Value::Int64(5))),
1342 };
1343 let mut variable_columns = HashMap::new();
1344 variable_columns.insert("n".to_string(), 0_usize);
1347
1348 let mut merge = MergeOperator::new(
1349 Arc::clone(&store),
1350 None,
1351 MergeConfig {
1352 variable: "n".to_string(),
1353 labels: vec!["Item".to_string()],
1354 match_properties: const_props(vec![("val", Value::Int64(1))]),
1355 on_create_properties: vec![],
1356 on_match_properties: vec![(
1357 "x".to_string(),
1358 PropertySource::Expression {
1359 expr: Box::new(expr),
1360 variable_columns,
1361 },
1362 )],
1363 output_schema: vec![LogicalType::Node],
1364 output_column: 0,
1365 bound_variable_column: None,
1366 },
1367 )
1368 .with_search_store(Arc::clone(&search));
1369
1370 merge.next().unwrap();
1371
1372 let node = store.get_node(id).unwrap();
1373 assert_eq!(
1374 node.properties.get(&PropertyKey::new("x")),
1375 Some(&Value::Int64(12)),
1376 "ON MATCH expression must read the merged node, not NULL"
1377 );
1378 }
1379
1380 #[test]
1381 fn test_merge_on_create_resolves_expression_against_new_node() {
1382 use super::super::filter::FilterExpression;
1385 use crate::graph::lpg::LpgStore;
1386 use std::collections::HashMap;
1387
1388 let lpg = Arc::new(LpgStore::new().unwrap());
1389 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1390 let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1391
1392 let coalesce = FilterExpression::FunctionCall {
1393 name: "coalesce".to_string(),
1394 args: vec![
1395 FilterExpression::Property {
1396 variable: "n".to_string(),
1397 property: "x".to_string(),
1398 },
1399 FilterExpression::Literal(Value::Int64(99)),
1400 ],
1401 };
1402 let mut variable_columns = HashMap::new();
1403 variable_columns.insert("n".to_string(), 0_usize);
1404
1405 let mut merge = MergeOperator::new(
1406 Arc::clone(&store),
1407 None,
1408 MergeConfig {
1409 variable: "n".to_string(),
1410 labels: vec!["Item".to_string()],
1411 match_properties: const_props(vec![("val", Value::Int64(1))]),
1412 on_create_properties: vec![(
1413 "x".to_string(),
1414 PropertySource::Expression {
1415 expr: Box::new(coalesce),
1416 variable_columns,
1417 },
1418 )],
1419 on_match_properties: vec![],
1420 output_schema: vec![LogicalType::Node],
1421 output_column: 0,
1422 bound_variable_column: None,
1423 },
1424 )
1425 .with_search_store(Arc::clone(&search));
1426
1427 merge.next().unwrap();
1428
1429 let nodes = store.nodes_by_label("Item");
1430 assert_eq!(nodes.len(), 1);
1431 let node = store.get_node(nodes[0]).unwrap();
1432 assert_eq!(
1433 node.properties.get(&PropertyKey::new("x")),
1434 Some(&Value::Int64(99))
1435 );
1436 }
1437
1438 use super::ConstraintValidator;
1447
1448 struct RequirePropertyValidator {
1450 required_property: &'static str,
1451 }
1452
1453 impl ConstraintValidator for RequirePropertyValidator {
1454 fn validate_node_property(
1455 &self,
1456 _labels: &[String],
1457 _key: &str,
1458 _value: &Value,
1459 ) -> Result<(), super::super::OperatorError> {
1460 Ok(())
1461 }
1462 fn validate_node_complete(
1463 &self,
1464 _labels: &[String],
1465 properties: &[(String, Value)],
1466 ) -> Result<(), super::super::OperatorError> {
1467 if !properties.iter().any(|(k, _)| k == self.required_property) {
1468 return Err(super::super::OperatorError::ConstraintViolation(format!(
1469 "missing required property '{}'",
1470 self.required_property
1471 )));
1472 }
1473 Ok(())
1474 }
1475 fn check_unique_node_property(
1476 &self,
1477 _labels: &[String],
1478 _key: &str,
1479 _value: &Value,
1480 ) -> Result<(), super::super::OperatorError> {
1481 Ok(())
1482 }
1483 fn validate_edge_property(
1484 &self,
1485 _edge_type: &str,
1486 _key: &str,
1487 _value: &Value,
1488 ) -> Result<(), super::super::OperatorError> {
1489 Ok(())
1490 }
1491 fn validate_edge_complete(
1492 &self,
1493 _edge_type: &str,
1494 properties: &[(String, Value)],
1495 ) -> Result<(), super::super::OperatorError> {
1496 if !properties.iter().any(|(k, _)| k == self.required_property) {
1497 return Err(super::super::OperatorError::ConstraintViolation(format!(
1498 "missing required edge property '{}'",
1499 self.required_property
1500 )));
1501 }
1502 Ok(())
1503 }
1504 }
1505
1506 struct RecordingUniqueValidator {
1509 seen: std::sync::Mutex<Vec<(String, Value)>>,
1510 }
1511
1512 impl RecordingUniqueValidator {
1513 fn new() -> Self {
1514 Self {
1515 seen: std::sync::Mutex::new(Vec::new()),
1516 }
1517 }
1518 }
1519
1520 impl ConstraintValidator for RecordingUniqueValidator {
1521 fn validate_node_property(
1522 &self,
1523 _labels: &[String],
1524 _key: &str,
1525 _value: &Value,
1526 ) -> Result<(), super::super::OperatorError> {
1527 Ok(())
1528 }
1529 fn validate_node_complete(
1530 &self,
1531 _labels: &[String],
1532 _properties: &[(String, Value)],
1533 ) -> Result<(), super::super::OperatorError> {
1534 Ok(())
1535 }
1536 fn check_unique_node_property(
1537 &self,
1538 _labels: &[String],
1539 key: &str,
1540 value: &Value,
1541 ) -> Result<(), super::super::OperatorError> {
1542 self.seen
1543 .lock()
1544 .unwrap()
1545 .push((key.to_string(), value.clone()));
1546 Ok(())
1547 }
1548 fn validate_edge_property(
1549 &self,
1550 _edge_type: &str,
1551 _key: &str,
1552 _value: &Value,
1553 ) -> Result<(), super::super::OperatorError> {
1554 Ok(())
1555 }
1556 fn validate_edge_complete(
1557 &self,
1558 _edge_type: &str,
1559 _properties: &[(String, Value)],
1560 ) -> Result<(), super::super::OperatorError> {
1561 Ok(())
1562 }
1563 }
1564
1565 fn coalesce_n_x_else(default: i64) -> super::super::filter::FilterExpression {
1566 use super::super::filter::FilterExpression;
1567 FilterExpression::FunctionCall {
1568 name: "coalesce".to_string(),
1569 args: vec![
1570 FilterExpression::Property {
1571 variable: "n".to_string(),
1572 property: "x".to_string(),
1573 },
1574 FilterExpression::Literal(Value::Int64(default)),
1575 ],
1576 }
1577 }
1578
1579 #[test]
1580 fn test_merge_two_phase_completeness_uses_full_property_set() {
1581 use crate::graph::lpg::LpgStore;
1586 use std::collections::HashMap;
1587
1588 let lpg = Arc::new(LpgStore::new().unwrap());
1589 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1590 let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1591
1592 let mut variable_columns = HashMap::new();
1593 variable_columns.insert("n".to_string(), 0_usize);
1594
1595 let mut merge = MergeOperator::new(
1596 Arc::clone(&store),
1597 None,
1598 MergeConfig {
1599 variable: "n".to_string(),
1600 labels: vec!["Item".to_string()],
1601 match_properties: const_props(vec![("val", Value::Int64(1))]),
1602 on_create_properties: vec![(
1604 "x".to_string(),
1605 PropertySource::Expression {
1606 expr: Box::new(coalesce_n_x_else(99)),
1607 variable_columns,
1608 },
1609 )],
1610 on_match_properties: vec![],
1611 output_schema: vec![LogicalType::Node],
1612 output_column: 0,
1613 bound_variable_column: None,
1614 },
1615 )
1616 .with_search_store(Arc::clone(&search))
1617 .with_validator(Arc::new(RequirePropertyValidator {
1618 required_property: "x",
1619 }));
1620
1621 merge
1622 .next()
1623 .expect("MERGE must succeed because ON CREATE supplies the required property");
1624
1625 let nodes = store.nodes_by_label("Item");
1626 assert_eq!(nodes.len(), 1);
1627 let node = store.get_node(nodes[0]).unwrap();
1628 assert_eq!(
1629 node.properties.get(&PropertyKey::new("x")),
1630 Some(&Value::Int64(99)),
1631 "ON CREATE expression value must be persisted"
1632 );
1633 }
1634
1635 #[test]
1636 fn test_merge_two_phase_unique_check_runs_on_on_create_props() {
1637 use crate::graph::lpg::LpgStore;
1642 use std::collections::HashMap;
1643
1644 let lpg = Arc::new(LpgStore::new().unwrap());
1645 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1646 let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1647
1648 let mut variable_columns = HashMap::new();
1649 variable_columns.insert("n".to_string(), 0_usize);
1650
1651 let recorder = Arc::new(RecordingUniqueValidator::new());
1652
1653 let mut merge = MergeOperator::new(
1654 Arc::clone(&store),
1655 None,
1656 MergeConfig {
1657 variable: "n".to_string(),
1658 labels: vec!["Item".to_string()],
1659 match_properties: const_props(vec![("val", Value::Int64(1))]),
1660 on_create_properties: vec![(
1661 "x".to_string(),
1662 PropertySource::Expression {
1663 expr: Box::new(coalesce_n_x_else(42)),
1664 variable_columns,
1665 },
1666 )],
1667 on_match_properties: vec![],
1668 output_schema: vec![LogicalType::Node],
1669 output_column: 0,
1670 bound_variable_column: None,
1671 },
1672 )
1673 .with_search_store(Arc::clone(&search))
1674 .with_validator(Arc::clone(&recorder) as Arc<dyn ConstraintValidator>);
1675
1676 merge.next().unwrap();
1677
1678 let seen = recorder.seen.lock().unwrap().clone();
1679 assert!(
1680 seen.iter().any(|(k, v)| k == "x" && *v == Value::Int64(42)),
1681 "uniqueness check must fire for ON CREATE expression property `x`, observed: {seen:?}"
1682 );
1683 }
1684
1685 #[test]
1686 fn test_merge_relationship_two_phase_completeness_uses_full_property_set() {
1687 use super::super::filter::FilterExpression;
1689 use crate::execution::chunk::DataChunkBuilder;
1690 use crate::graph::lpg::LpgStore;
1691 use std::collections::HashMap;
1692
1693 let lpg = Arc::new(LpgStore::new().unwrap());
1694 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1695 let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1696
1697 let src_id = store.create_node_with_props(
1698 &["Node"],
1699 &[(PropertyKey::new("name"), Value::String("Vincent".into()))],
1700 );
1701 let dst_id = store.create_node_with_props(
1702 &["Node"],
1703 &[(PropertyKey::new("name"), Value::String("Mia".into()))],
1704 );
1705
1706 let input_schema = vec![LogicalType::Node, LogicalType::Node];
1708 let mut builder = DataChunkBuilder::with_capacity(&input_schema, 1);
1709 builder.column_mut(0).unwrap().push_node_id(src_id);
1710 builder.column_mut(1).unwrap().push_node_id(dst_id);
1711 builder.advance_row();
1712 let chunk = builder.finish();
1713
1714 struct OneShot(Option<DataChunk>);
1715 impl Operator for OneShot {
1716 fn next(&mut self) -> OperatorResult {
1717 Ok(self.0.take())
1718 }
1719 fn reset(&mut self) {}
1720 fn name(&self) -> &'static str {
1721 "OneShot"
1722 }
1723 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1724 self
1725 }
1726 }
1727
1728 let coalesce = FilterExpression::FunctionCall {
1730 name: "coalesce".to_string(),
1731 args: vec![
1732 FilterExpression::Property {
1733 variable: "r".to_string(),
1734 property: "x".to_string(),
1735 },
1736 FilterExpression::Literal(Value::Int64(7)),
1737 ],
1738 };
1739 let mut variable_columns = HashMap::new();
1740 variable_columns.insert("r".to_string(), 2_usize);
1742
1743 let mut merge_rel = MergeRelationshipOperator::new(
1744 Arc::clone(&store),
1745 Box::new(OneShot(Some(chunk))),
1746 MergeRelationshipConfig {
1747 source_column: 0,
1748 target_column: 1,
1749 source_variable: "a".to_string(),
1750 target_variable: "b".to_string(),
1751 edge_type: "KNOWS".to_string(),
1752 match_properties: vec![],
1753 on_create_properties: vec![(
1754 "x".to_string(),
1755 PropertySource::Expression {
1756 expr: Box::new(coalesce),
1757 variable_columns,
1758 },
1759 )],
1760 on_match_properties: vec![],
1761 output_schema: vec![LogicalType::Node, LogicalType::Node, LogicalType::Edge],
1762 edge_output_column: 2,
1763 },
1764 )
1765 .with_search_store(Arc::clone(&search))
1766 .with_validator(Arc::new(RequirePropertyValidator {
1767 required_property: "x",
1768 }));
1769
1770 merge_rel.next().expect(
1771 "MERGE relationship must succeed because ON CREATE supplies the required property",
1772 );
1773
1774 use crate::graph::Direction;
1776 let edges: Vec<EdgeId> = store
1777 .edges_from(src_id, Direction::Outgoing)
1778 .into_iter()
1779 .filter_map(|(target, edge_id)| (target == dst_id).then_some(edge_id))
1780 .collect();
1781 assert_eq!(edges.len(), 1, "expected exactly one outgoing edge");
1782 let edge = store.get_edge(edges[0]).unwrap();
1783 assert_eq!(edge.get_property("x"), Some(&Value::Int64(7)));
1784 }
1785
1786 #[test]
1787 fn test_merge_in_transaction_dedupes_within_unwind() {
1788 use crate::execution::chunk::DataChunkBuilder;
1795 use crate::graph::lpg::LpgStore;
1796 use grafeo_common::types::EpochId;
1797
1798 let lpg = Arc::new(LpgStore::new().unwrap());
1799 let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1800
1801 let input_schema = vec![LogicalType::Int64];
1803 let mut builder = DataChunkBuilder::with_capacity(&input_schema, 3);
1804 for _ in 0..3 {
1805 builder.column_mut(0).unwrap().push_value(Value::Int64(1));
1806 builder.advance_row();
1807 }
1808 let chunk = builder.finish();
1809
1810 struct OneShot(Option<DataChunk>);
1811 impl Operator for OneShot {
1812 fn next(&mut self) -> OperatorResult {
1813 Ok(self.0.take())
1814 }
1815 fn reset(&mut self) {}
1816 fn name(&self) -> &'static str {
1817 "OneShot"
1818 }
1819 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1820 self
1821 }
1822 }
1823
1824 let tx = TransactionId::new(1);
1826 let mut merge = MergeOperator::new(
1827 Arc::clone(&store),
1828 Some(Box::new(OneShot(Some(chunk)))),
1829 MergeConfig {
1830 variable: "n".to_string(),
1831 labels: vec!["Item".to_string()],
1832 match_properties: vec![("val".to_string(), PropertySource::Column(0))],
1833 on_create_properties: vec![],
1834 on_match_properties: vec![],
1835 output_schema: vec![LogicalType::Int64, LogicalType::Node],
1836 output_column: 1,
1837 bound_variable_column: None,
1838 },
1839 )
1840 .with_transaction_context(EpochId::INITIAL, Some(tx));
1841
1842 while merge.next().unwrap().is_some() {}
1843
1844 let nodes = store.nodes_by_label("Item");
1847 let visible: Vec<_> = nodes
1848 .iter()
1849 .filter_map(|&id| store.get_node_versioned(id, EpochId::INITIAL, tx))
1850 .collect();
1851 assert_eq!(
1852 visible.len(),
1853 1,
1854 "MERGE inside UNWIND must dedupe nodes its own transaction created in earlier rows"
1855 );
1856 }
1857
1858 #[test]
1859 fn test_merge_into_any() {
1860 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1861 let op = MergeOperator::new(
1862 Arc::clone(&store),
1863 None,
1864 MergeConfig {
1865 variable: "n".to_string(),
1866 labels: vec!["Person".to_string()],
1867 match_properties: vec![],
1868 on_create_properties: vec![],
1869 on_match_properties: vec![],
1870 output_schema: vec![LogicalType::Node],
1871 output_column: 0,
1872 bound_variable_column: None,
1873 },
1874 );
1875 let any = Box::new(op).into_any();
1876 assert!(any.downcast::<MergeOperator>().is_ok());
1877 }
1878}