1use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, PropertySource)>,
24 pub on_create_properties: Vec<(String, PropertySource)>,
26 pub on_match_properties: Vec<(String, PropertySource)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32 pub bound_variable_column: Option<usize>,
36}
37
38pub struct MergeOperator {
46 store: Arc<dyn GraphStoreMut>,
48 input: Option<Box<dyn Operator>>,
50 config: MergeConfig,
52 executed: bool,
54 viewing_epoch: Option<EpochId>,
56 transaction_id: Option<TransactionId>,
58 validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63 pub fn new(
65 store: Arc<dyn GraphStoreMut>,
66 input: Option<Box<dyn Operator>>,
67 config: MergeConfig,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 config,
73 executed: false,
74 viewing_epoch: None,
75 transaction_id: None,
76 validator: None,
77 }
78 }
79
80 #[must_use]
82 pub fn variable(&self) -> &str {
83 &self.config.variable
84 }
85
86 pub fn with_transaction_context(
88 mut self,
89 epoch: EpochId,
90 transaction_id: Option<TransactionId>,
91 ) -> Self {
92 self.viewing_epoch = Some(epoch);
93 self.transaction_id = transaction_id;
94 self
95 }
96
97 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99 self.validator = Some(validator);
100 self
101 }
102
103 fn resolve_properties(
105 props: &[(String, PropertySource)],
106 chunk: Option<&DataChunk>,
107 row: usize,
108 store: &dyn GraphStore,
109 ) -> Vec<(String, Value)> {
110 props
111 .iter()
112 .map(|(name, source)| {
113 let value = if let Some(chunk) = chunk {
114 source.resolve(chunk, row, store)
115 } else {
116 match source {
118 PropertySource::Constant(v) => v.clone(),
119 _ => Value::Null,
120 }
121 };
122 (name.clone(), value)
123 })
124 .collect()
125 }
126
127 fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129 let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130 self.store.nodes_by_label(first_label)
131 } else {
132 self.store.node_ids()
133 };
134
135 for node_id in candidates {
136 if let Some(node) = self.store.get_node(node_id) {
137 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138 if !has_all_labels {
139 continue;
140 }
141
142 let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143 let prop = node.properties.get(&PropertyKey::new(key.as_str()));
144 if expected_value.is_null() {
145 prop.map_or(true, |v| v.is_null())
147 } else {
148 prop.is_some_and(|v| v == expected_value)
149 }
150 });
151
152 if has_all_props {
153 return Some(node_id);
154 }
155 }
156 }
157
158 None
159 }
160
161 fn create_node(
163 &self,
164 resolved_match_props: &[(String, Value)],
165 resolved_create_props: &[(String, Value)],
166 ) -> Result<NodeId, super::OperatorError> {
167 if let Some(ref validator) = self.validator {
169 validator.validate_node_labels_allowed(&self.config.labels)?;
170
171 let all_props: Vec<(String, Value)> = resolved_match_props
172 .iter()
173 .chain(resolved_create_props.iter())
174 .map(|(k, v)| (k.clone(), v.clone()))
175 .collect();
176 for (name, value) in &all_props {
177 validator.validate_node_property(&self.config.labels, name, value)?;
178 validator.check_unique_node_property(&self.config.labels, name, value)?;
179 }
180 validator.validate_node_complete(&self.config.labels, &all_props)?;
181 }
182
183 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
184 .iter()
185 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
186 .collect();
187
188 for (k, v) in resolved_create_props {
189 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
190 existing.1 = v.clone();
191 } else {
192 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
193 }
194 }
195
196 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
197 Ok(self.store.create_node_with_props(&labels, &all_props))
198 }
199
200 fn merge_node_for_row(
202 &self,
203 chunk: Option<&DataChunk>,
204 row: usize,
205 ) -> Result<NodeId, super::OperatorError> {
206 let store_ref: &dyn GraphStore = self.store.as_ref();
207 let resolved_match =
208 Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
209
210 if let Some(existing_id) = self.find_matching_node(&resolved_match) {
211 let resolved_on_match =
212 Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
213 self.apply_on_match(existing_id, &resolved_on_match)?;
214 Ok(existing_id)
215 } else {
216 let resolved_on_create =
217 Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
218 self.create_node(&resolved_match, &resolved_on_create)
219 }
220 }
221
222 fn apply_on_match(
224 &self,
225 node_id: NodeId,
226 resolved_on_match: &[(String, Value)],
227 ) -> Result<(), super::OperatorError> {
228 for (key, value) in resolved_on_match {
229 if let Some(ref validator) = self.validator {
230 validator.validate_node_property(&self.config.labels, key, value)?;
231 }
232 if let Some(tid) = self.transaction_id {
233 self.store
234 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
235 } else {
236 self.store
237 .set_node_property(node_id, key.as_str(), value.clone());
238 }
239 }
240 Ok(())
241 }
242}
243
244impl Operator for MergeOperator {
245 fn next(&mut self) -> OperatorResult {
246 if let Some(ref mut input) = self.input {
249 if let Some(chunk) = input.next()? {
250 let mut builder =
251 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
252
253 for row in chunk.selected_indices() {
254 if let Some(bound_col) = self.config.bound_variable_column {
256 let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
257 if is_null {
258 return Err(super::OperatorError::TypeMismatch {
259 expected: format!(
260 "non-null node for MERGE variable '{}'",
261 self.config.variable
262 ),
263 found: "NULL".to_string(),
264 });
265 }
266 }
267
268 let node_id = self.merge_node_for_row(Some(&chunk), row)?;
270
271 for col_idx in 0..chunk.column_count() {
273 if let (Some(src), Some(dst)) =
274 (chunk.column(col_idx), builder.column_mut(col_idx))
275 {
276 if let Some(val) = src.get_value(row) {
277 dst.push_value(val);
278 } else {
279 dst.push_value(Value::Null);
280 }
281 }
282 }
283
284 if let Some(dst) = builder.column_mut(self.config.output_column) {
286 dst.push_node_id(node_id);
287 }
288
289 builder.advance_row();
290 }
291
292 return Ok(Some(builder.finish()));
293 }
294 return Ok(None);
295 }
296
297 if self.executed {
299 return Ok(None);
300 }
301 self.executed = true;
302
303 let node_id = self.merge_node_for_row(None, 0)?;
304
305 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
306 if let Some(dst) = builder.column_mut(self.config.output_column) {
307 dst.push_node_id(node_id);
308 }
309 builder.advance_row();
310
311 Ok(Some(builder.finish()))
312 }
313
314 fn reset(&mut self) {
315 self.executed = false;
316 if let Some(ref mut input) = self.input {
317 input.reset();
318 }
319 }
320
321 fn name(&self) -> &'static str {
322 "Merge"
323 }
324
325 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
326 self
327 }
328}
329
330pub struct MergeRelationshipConfig {
332 pub source_column: usize,
334 pub target_column: usize,
336 pub source_variable: String,
338 pub target_variable: String,
340 pub edge_type: String,
342 pub match_properties: Vec<(String, PropertySource)>,
344 pub on_create_properties: Vec<(String, PropertySource)>,
346 pub on_match_properties: Vec<(String, PropertySource)>,
348 pub output_schema: Vec<LogicalType>,
350 pub edge_output_column: usize,
352}
353
354pub struct MergeRelationshipOperator {
361 store: Arc<dyn GraphStoreMut>,
363 input: Box<dyn Operator>,
365 config: MergeRelationshipConfig,
367 viewing_epoch: Option<EpochId>,
369 transaction_id: Option<TransactionId>,
371 validator: Option<Arc<dyn ConstraintValidator>>,
373}
374
375impl MergeRelationshipOperator {
376 pub fn new(
378 store: Arc<dyn GraphStoreMut>,
379 input: Box<dyn Operator>,
380 config: MergeRelationshipConfig,
381 ) -> Self {
382 Self {
383 store,
384 input,
385 config,
386 viewing_epoch: None,
387 transaction_id: None,
388 validator: None,
389 }
390 }
391
392 pub fn with_transaction_context(
394 mut self,
395 epoch: EpochId,
396 transaction_id: Option<TransactionId>,
397 ) -> Self {
398 self.viewing_epoch = Some(epoch);
399 self.transaction_id = transaction_id;
400 self
401 }
402
403 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
405 self.validator = Some(validator);
406 self
407 }
408
409 fn find_matching_edge(
411 &self,
412 src: NodeId,
413 dst: NodeId,
414 resolved_match_props: &[(String, Value)],
415 ) -> Option<EdgeId> {
416 use crate::graph::Direction;
417
418 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
419 if target != dst {
420 continue;
421 }
422
423 if let Some(edge) = self.store.get_edge(edge_id) {
424 if edge.edge_type.as_str() != self.config.edge_type {
425 continue;
426 }
427
428 let has_all_props = resolved_match_props
429 .iter()
430 .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
431
432 if has_all_props {
433 return Some(edge_id);
434 }
435 }
436 }
437
438 None
439 }
440
441 fn create_edge(
443 &self,
444 src: NodeId,
445 dst: NodeId,
446 resolved_match_props: &[(String, Value)],
447 resolved_create_props: &[(String, Value)],
448 ) -> Result<EdgeId, super::OperatorError> {
449 if let Some(ref validator) = self.validator {
451 validator.validate_edge_type_allowed(&self.config.edge_type)?;
452
453 let all_props: Vec<(String, Value)> = resolved_match_props
454 .iter()
455 .chain(resolved_create_props.iter())
456 .map(|(k, v)| (k.clone(), v.clone()))
457 .collect();
458 for (name, value) in &all_props {
459 validator.validate_edge_property(&self.config.edge_type, name, value)?;
460 }
461 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
462 }
463
464 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
465 .iter()
466 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
467 .collect();
468
469 for (k, v) in resolved_create_props {
470 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
471 existing.1 = v.clone();
472 } else {
473 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
474 }
475 }
476
477 Ok(self
478 .store
479 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
480 }
481
482 fn apply_on_match_edge(
484 &self,
485 edge_id: EdgeId,
486 resolved_on_match: &[(String, Value)],
487 ) -> Result<(), super::OperatorError> {
488 for (key, value) in resolved_on_match {
489 if let Some(ref validator) = self.validator {
490 validator.validate_edge_property(&self.config.edge_type, key, value)?;
491 }
492 if let Some(tid) = self.transaction_id {
493 self.store
494 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
495 } else {
496 self.store
497 .set_edge_property(edge_id, key.as_str(), value.clone());
498 }
499 }
500 Ok(())
501 }
502}
503
504impl Operator for MergeRelationshipOperator {
505 fn next(&mut self) -> OperatorResult {
506 use super::OperatorError;
507
508 if let Some(chunk) = self.input.next()? {
509 let mut builder =
510 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
511
512 for row in chunk.selected_indices() {
513 let src_val = chunk
514 .column(self.config.source_column)
515 .and_then(|c| c.get_node_id(row))
516 .ok_or_else(|| OperatorError::TypeMismatch {
517 expected: format!(
518 "non-null node for MERGE variable '{}'",
519 self.config.source_variable
520 ),
521 found: "NULL".to_string(),
522 })?;
523
524 let dst_val = chunk
525 .column(self.config.target_column)
526 .and_then(|c| c.get_node_id(row))
527 .ok_or_else(|| OperatorError::TypeMismatch {
528 expected: format!(
529 "non-null node for MERGE variable '{}'",
530 self.config.target_variable
531 ),
532 found: "None".to_string(),
533 })?;
534
535 let store_ref: &dyn GraphStore = self.store.as_ref();
536 let resolved_match = MergeOperator::resolve_properties(
537 &self.config.match_properties,
538 Some(&chunk),
539 row,
540 store_ref,
541 );
542
543 let edge_id = if let Some(existing) =
544 self.find_matching_edge(src_val, dst_val, &resolved_match)
545 {
546 let resolved_on_match = MergeOperator::resolve_properties(
547 &self.config.on_match_properties,
548 Some(&chunk),
549 row,
550 store_ref,
551 );
552 self.apply_on_match_edge(existing, &resolved_on_match)?;
553 existing
554 } else {
555 let resolved_on_create = MergeOperator::resolve_properties(
556 &self.config.on_create_properties,
557 Some(&chunk),
558 row,
559 store_ref,
560 );
561 self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
562 };
563
564 for col_idx in 0..self.config.output_schema.len() {
566 if col_idx == self.config.edge_output_column {
567 if let Some(dst_col) = builder.column_mut(col_idx) {
568 dst_col.push_edge_id(edge_id);
569 }
570 } else if let (Some(src_col), Some(dst_col)) =
571 (chunk.column(col_idx), builder.column_mut(col_idx))
572 && let Some(val) = src_col.get_value(row)
573 {
574 dst_col.push_value(val);
575 }
576 }
577
578 builder.advance_row();
579 }
580
581 return Ok(Some(builder.finish()));
582 }
583
584 Ok(None)
585 }
586
587 fn reset(&mut self) {
588 self.input.reset();
589 }
590
591 fn name(&self) -> &'static str {
592 "MergeRelationship"
593 }
594
595 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
596 self
597 }
598}
599
600#[cfg(all(test, feature = "lpg"))]
601mod tests {
602 use super::*;
603 use crate::graph::lpg::LpgStore;
604
605 fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
606 props
607 .into_iter()
608 .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
609 .collect()
610 }
611
612 #[test]
613 fn test_merge_creates_new_node() {
614 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
615
616 let mut merge = MergeOperator::new(
618 Arc::clone(&store),
619 None,
620 MergeConfig {
621 variable: "n".to_string(),
622 labels: vec!["Person".to_string()],
623 match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
624 on_create_properties: vec![],
625 on_match_properties: vec![],
626 output_schema: vec![LogicalType::Node],
627 output_column: 0,
628 bound_variable_column: None,
629 },
630 );
631
632 let result = merge.next().unwrap();
633 assert!(result.is_some());
634
635 let nodes = store.nodes_by_label("Person");
637 assert_eq!(nodes.len(), 1);
638
639 let node = store.get_node(nodes[0]).unwrap();
640 assert!(node.has_label("Person"));
641 assert_eq!(
642 node.properties.get(&PropertyKey::new("name")),
643 Some(&Value::String("Alix".into()))
644 );
645 }
646
647 #[test]
648 fn test_merge_matches_existing_node() {
649 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
650
651 store.create_node_with_props(
653 &["Person"],
654 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
655 );
656
657 let mut merge = MergeOperator::new(
659 Arc::clone(&store),
660 None,
661 MergeConfig {
662 variable: "n".to_string(),
663 labels: vec!["Person".to_string()],
664 match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
665 on_create_properties: vec![],
666 on_match_properties: vec![],
667 output_schema: vec![LogicalType::Node],
668 output_column: 0,
669 bound_variable_column: None,
670 },
671 );
672
673 let result = merge.next().unwrap();
674 assert!(result.is_some());
675
676 let nodes = store.nodes_by_label("Person");
678 assert_eq!(nodes.len(), 1);
679 }
680
681 #[test]
682 fn test_merge_with_on_create() {
683 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
684
685 let mut merge = MergeOperator::new(
687 Arc::clone(&store),
688 None,
689 MergeConfig {
690 variable: "n".to_string(),
691 labels: vec!["Person".to_string()],
692 match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
693 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
694 on_match_properties: vec![],
695 output_schema: vec![LogicalType::Node],
696 output_column: 0,
697 bound_variable_column: None,
698 },
699 );
700
701 let _ = merge.next().unwrap();
702
703 let nodes = store.nodes_by_label("Person");
705 let node = store.get_node(nodes[0]).unwrap();
706 assert_eq!(
707 node.properties.get(&PropertyKey::new("name")),
708 Some(&Value::String("Vincent".into()))
709 );
710 assert_eq!(
711 node.properties.get(&PropertyKey::new("created")),
712 Some(&Value::Bool(true))
713 );
714 }
715
716 #[test]
717 fn test_merge_with_on_match() {
718 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
719
720 let node_id = store.create_node_with_props(
722 &["Person"],
723 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
724 );
725
726 let mut merge = MergeOperator::new(
728 Arc::clone(&store),
729 None,
730 MergeConfig {
731 variable: "n".to_string(),
732 labels: vec!["Person".to_string()],
733 match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
734 on_create_properties: vec![],
735 on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
736 output_schema: vec![LogicalType::Node],
737 output_column: 0,
738 bound_variable_column: None,
739 },
740 );
741
742 let _ = merge.next().unwrap();
743
744 let node = store.get_node(node_id).unwrap();
746 assert_eq!(
747 node.properties.get(&PropertyKey::new("updated")),
748 Some(&Value::Bool(true))
749 );
750 }
751
752 #[test]
753 fn test_merge_into_any() {
754 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
755 let op = MergeOperator::new(
756 Arc::clone(&store),
757 None,
758 MergeConfig {
759 variable: "n".to_string(),
760 labels: vec!["Person".to_string()],
761 match_properties: vec![],
762 on_create_properties: vec![],
763 on_match_properties: vec![],
764 output_schema: vec![LogicalType::Node],
765 output_column: 0,
766 bound_variable_column: None,
767 },
768 );
769 let any = Box::new(op).into_any();
770 assert!(any.downcast::<MergeOperator>().is_ok());
771 }
772}