1use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, PropertySource)>,
24 pub on_create_properties: Vec<(String, PropertySource)>,
26 pub on_match_properties: Vec<(String, PropertySource)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32 pub bound_variable_column: Option<usize>,
36}
37
38pub struct MergeOperator {
46 store: Arc<dyn GraphStoreMut>,
48 input: Option<Box<dyn Operator>>,
50 config: MergeConfig,
52 executed: bool,
54 viewing_epoch: Option<EpochId>,
56 transaction_id: Option<TransactionId>,
58 validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63 pub fn new(
65 store: Arc<dyn GraphStoreMut>,
66 input: Option<Box<dyn Operator>>,
67 config: MergeConfig,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 config,
73 executed: false,
74 viewing_epoch: None,
75 transaction_id: None,
76 validator: None,
77 }
78 }
79
80 #[must_use]
82 pub fn variable(&self) -> &str {
83 &self.config.variable
84 }
85
86 pub fn with_transaction_context(
88 mut self,
89 epoch: EpochId,
90 transaction_id: Option<TransactionId>,
91 ) -> Self {
92 self.viewing_epoch = Some(epoch);
93 self.transaction_id = transaction_id;
94 self
95 }
96
97 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99 self.validator = Some(validator);
100 self
101 }
102
103 fn resolve_properties(
105 props: &[(String, PropertySource)],
106 chunk: Option<&DataChunk>,
107 row: usize,
108 store: &dyn GraphStore,
109 ) -> Vec<(String, Value)> {
110 props
111 .iter()
112 .map(|(name, source)| {
113 let value = if let Some(chunk) = chunk {
114 source.resolve(chunk, row, store)
115 } else {
116 match source {
118 PropertySource::Constant(v) => v.clone(),
119 _ => Value::Null,
120 }
121 };
122 (name.clone(), value)
123 })
124 .collect()
125 }
126
127 fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129 let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130 self.store.nodes_by_label(first_label)
131 } else {
132 self.store.node_ids()
133 };
134
135 for node_id in candidates {
136 if let Some(node) = self.store.get_node(node_id) {
137 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138 if !has_all_labels {
139 continue;
140 }
141
142 let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143 let prop = node.properties.get(&PropertyKey::new(key.as_str()));
144 if expected_value.is_null() {
145 prop.map_or(true, |v| v.is_null())
147 } else {
148 prop.is_some_and(|v| v == expected_value)
149 }
150 });
151
152 if has_all_props {
153 return Some(node_id);
154 }
155 }
156 }
157
158 None
159 }
160
161 fn create_node(
163 &self,
164 resolved_match_props: &[(String, Value)],
165 resolved_create_props: &[(String, Value)],
166 ) -> Result<NodeId, super::OperatorError> {
167 if let Some(ref validator) = self.validator {
169 validator.validate_node_labels_allowed(&self.config.labels)?;
170
171 let all_props: Vec<(String, Value)> = resolved_match_props
172 .iter()
173 .chain(resolved_create_props.iter())
174 .map(|(k, v)| (k.clone(), v.clone()))
175 .collect();
176 for (name, value) in &all_props {
177 validator.validate_node_property(&self.config.labels, name, value)?;
178 validator.check_unique_node_property(&self.config.labels, name, value)?;
179 }
180 validator.validate_node_complete(&self.config.labels, &all_props)?;
181 }
182
183 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
184 .iter()
185 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
186 .collect();
187
188 for (k, v) in resolved_create_props {
189 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
190 existing.1 = v.clone();
191 } else {
192 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
193 }
194 }
195
196 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
197 Ok(self.store.create_node_with_props(&labels, &all_props))
198 }
199
200 fn merge_node_for_row(
202 &self,
203 chunk: Option<&DataChunk>,
204 row: usize,
205 ) -> Result<NodeId, super::OperatorError> {
206 let store_ref: &dyn GraphStore = self.store.as_ref();
207 let resolved_match =
208 Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
209
210 if let Some(existing_id) = self.find_matching_node(&resolved_match) {
211 let resolved_on_match =
212 Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
213 self.apply_on_match(existing_id, &resolved_on_match);
214 Ok(existing_id)
215 } else {
216 let resolved_on_create =
217 Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
218 self.create_node(&resolved_match, &resolved_on_create)
219 }
220 }
221
222 fn apply_on_match(&self, node_id: NodeId, resolved_on_match: &[(String, Value)]) {
224 for (key, value) in resolved_on_match {
225 if let Some(tid) = self.transaction_id {
226 self.store
227 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
228 } else {
229 self.store
230 .set_node_property(node_id, key.as_str(), value.clone());
231 }
232 }
233 }
234}
235
236impl Operator for MergeOperator {
237 fn next(&mut self) -> OperatorResult {
238 if let Some(ref mut input) = self.input {
241 if let Some(chunk) = input.next()? {
242 let mut builder =
243 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
244
245 for row in chunk.selected_indices() {
246 if let Some(bound_col) = self.config.bound_variable_column {
248 let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
249 if is_null {
250 return Err(super::OperatorError::TypeMismatch {
251 expected: format!(
252 "non-null node for MERGE variable '{}'",
253 self.config.variable
254 ),
255 found: "NULL".to_string(),
256 });
257 }
258 }
259
260 let node_id = self.merge_node_for_row(Some(&chunk), row)?;
262
263 for col_idx in 0..chunk.column_count() {
265 if let (Some(src), Some(dst)) =
266 (chunk.column(col_idx), builder.column_mut(col_idx))
267 {
268 if let Some(val) = src.get_value(row) {
269 dst.push_value(val);
270 } else {
271 dst.push_value(Value::Null);
272 }
273 }
274 }
275
276 if let Some(dst) = builder.column_mut(self.config.output_column) {
278 dst.push_node_id(node_id);
279 }
280
281 builder.advance_row();
282 }
283
284 return Ok(Some(builder.finish()));
285 }
286 return Ok(None);
287 }
288
289 if self.executed {
291 return Ok(None);
292 }
293 self.executed = true;
294
295 let node_id = self.merge_node_for_row(None, 0)?;
296
297 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
298 if let Some(dst) = builder.column_mut(self.config.output_column) {
299 dst.push_node_id(node_id);
300 }
301 builder.advance_row();
302
303 Ok(Some(builder.finish()))
304 }
305
306 fn reset(&mut self) {
307 self.executed = false;
308 if let Some(ref mut input) = self.input {
309 input.reset();
310 }
311 }
312
313 fn name(&self) -> &'static str {
314 "Merge"
315 }
316}
317
318pub struct MergeRelationshipConfig {
320 pub source_column: usize,
322 pub target_column: usize,
324 pub source_variable: String,
326 pub target_variable: String,
328 pub edge_type: String,
330 pub match_properties: Vec<(String, PropertySource)>,
332 pub on_create_properties: Vec<(String, PropertySource)>,
334 pub on_match_properties: Vec<(String, PropertySource)>,
336 pub output_schema: Vec<LogicalType>,
338 pub edge_output_column: usize,
340}
341
342pub struct MergeRelationshipOperator {
349 store: Arc<dyn GraphStoreMut>,
351 input: Box<dyn Operator>,
353 config: MergeRelationshipConfig,
355 viewing_epoch: Option<EpochId>,
357 transaction_id: Option<TransactionId>,
359 validator: Option<Arc<dyn ConstraintValidator>>,
361}
362
363impl MergeRelationshipOperator {
364 pub fn new(
366 store: Arc<dyn GraphStoreMut>,
367 input: Box<dyn Operator>,
368 config: MergeRelationshipConfig,
369 ) -> Self {
370 Self {
371 store,
372 input,
373 config,
374 viewing_epoch: None,
375 transaction_id: None,
376 validator: None,
377 }
378 }
379
380 pub fn with_transaction_context(
382 mut self,
383 epoch: EpochId,
384 transaction_id: Option<TransactionId>,
385 ) -> Self {
386 self.viewing_epoch = Some(epoch);
387 self.transaction_id = transaction_id;
388 self
389 }
390
391 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
393 self.validator = Some(validator);
394 self
395 }
396
397 fn find_matching_edge(
399 &self,
400 src: NodeId,
401 dst: NodeId,
402 resolved_match_props: &[(String, Value)],
403 ) -> Option<EdgeId> {
404 use crate::graph::Direction;
405
406 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
407 if target != dst {
408 continue;
409 }
410
411 if let Some(edge) = self.store.get_edge(edge_id) {
412 if edge.edge_type.as_str() != self.config.edge_type {
413 continue;
414 }
415
416 let has_all_props = resolved_match_props
417 .iter()
418 .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
419
420 if has_all_props {
421 return Some(edge_id);
422 }
423 }
424 }
425
426 None
427 }
428
429 fn create_edge(
431 &self,
432 src: NodeId,
433 dst: NodeId,
434 resolved_match_props: &[(String, Value)],
435 resolved_create_props: &[(String, Value)],
436 ) -> Result<EdgeId, super::OperatorError> {
437 if let Some(ref validator) = self.validator {
439 validator.validate_edge_type_allowed(&self.config.edge_type)?;
440
441 let all_props: Vec<(String, Value)> = resolved_match_props
442 .iter()
443 .chain(resolved_create_props.iter())
444 .map(|(k, v)| (k.clone(), v.clone()))
445 .collect();
446 for (name, value) in &all_props {
447 validator.validate_edge_property(&self.config.edge_type, name, value)?;
448 }
449 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
450 }
451
452 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
453 .iter()
454 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
455 .collect();
456
457 for (k, v) in resolved_create_props {
458 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
459 existing.1 = v.clone();
460 } else {
461 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
462 }
463 }
464
465 Ok(self
466 .store
467 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
468 }
469
470 fn apply_on_match_edge(&self, edge_id: EdgeId, resolved_on_match: &[(String, Value)]) {
472 for (key, value) in resolved_on_match {
473 if let Some(tid) = self.transaction_id {
474 self.store
475 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
476 } else {
477 self.store
478 .set_edge_property(edge_id, key.as_str(), value.clone());
479 }
480 }
481 }
482}
483
484impl Operator for MergeRelationshipOperator {
485 fn next(&mut self) -> OperatorResult {
486 use super::OperatorError;
487
488 if let Some(chunk) = self.input.next()? {
489 let mut builder =
490 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
491
492 for row in chunk.selected_indices() {
493 let src_val = chunk
494 .column(self.config.source_column)
495 .and_then(|c| c.get_node_id(row))
496 .ok_or_else(|| OperatorError::TypeMismatch {
497 expected: format!(
498 "non-null node for MERGE variable '{}'",
499 self.config.source_variable
500 ),
501 found: "NULL".to_string(),
502 })?;
503
504 let dst_val = chunk
505 .column(self.config.target_column)
506 .and_then(|c| c.get_node_id(row))
507 .ok_or_else(|| OperatorError::TypeMismatch {
508 expected: format!(
509 "non-null node for MERGE variable '{}'",
510 self.config.target_variable
511 ),
512 found: "None".to_string(),
513 })?;
514
515 let store_ref: &dyn GraphStore = self.store.as_ref();
516 let resolved_match = MergeOperator::resolve_properties(
517 &self.config.match_properties,
518 Some(&chunk),
519 row,
520 store_ref,
521 );
522
523 let edge_id = if let Some(existing) =
524 self.find_matching_edge(src_val, dst_val, &resolved_match)
525 {
526 let resolved_on_match = MergeOperator::resolve_properties(
527 &self.config.on_match_properties,
528 Some(&chunk),
529 row,
530 store_ref,
531 );
532 self.apply_on_match_edge(existing, &resolved_on_match);
533 existing
534 } else {
535 let resolved_on_create = MergeOperator::resolve_properties(
536 &self.config.on_create_properties,
537 Some(&chunk),
538 row,
539 store_ref,
540 );
541 self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
542 };
543
544 for col_idx in 0..self.config.output_schema.len() {
546 if col_idx == self.config.edge_output_column {
547 if let Some(dst_col) = builder.column_mut(col_idx) {
548 dst_col.push_edge_id(edge_id);
549 }
550 } else if let (Some(src_col), Some(dst_col)) =
551 (chunk.column(col_idx), builder.column_mut(col_idx))
552 && let Some(val) = src_col.get_value(row)
553 {
554 dst_col.push_value(val);
555 }
556 }
557
558 builder.advance_row();
559 }
560
561 return Ok(Some(builder.finish()));
562 }
563
564 Ok(None)
565 }
566
567 fn reset(&mut self) {
568 self.input.reset();
569 }
570
571 fn name(&self) -> &'static str {
572 "MergeRelationship"
573 }
574}
575
576#[cfg(all(test, feature = "lpg"))]
577mod tests {
578 use super::*;
579 use crate::graph::lpg::LpgStore;
580
581 fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
582 props
583 .into_iter()
584 .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
585 .collect()
586 }
587
588 #[test]
589 fn test_merge_creates_new_node() {
590 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
591
592 let mut merge = MergeOperator::new(
594 Arc::clone(&store),
595 None,
596 MergeConfig {
597 variable: "n".to_string(),
598 labels: vec!["Person".to_string()],
599 match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
600 on_create_properties: vec![],
601 on_match_properties: vec![],
602 output_schema: vec![LogicalType::Node],
603 output_column: 0,
604 bound_variable_column: None,
605 },
606 );
607
608 let result = merge.next().unwrap();
609 assert!(result.is_some());
610
611 let nodes = store.nodes_by_label("Person");
613 assert_eq!(nodes.len(), 1);
614
615 let node = store.get_node(nodes[0]).unwrap();
616 assert!(node.has_label("Person"));
617 assert_eq!(
618 node.properties.get(&PropertyKey::new("name")),
619 Some(&Value::String("Alix".into()))
620 );
621 }
622
623 #[test]
624 fn test_merge_matches_existing_node() {
625 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
626
627 store.create_node_with_props(
629 &["Person"],
630 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
631 );
632
633 let mut merge = MergeOperator::new(
635 Arc::clone(&store),
636 None,
637 MergeConfig {
638 variable: "n".to_string(),
639 labels: vec!["Person".to_string()],
640 match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
641 on_create_properties: vec![],
642 on_match_properties: vec![],
643 output_schema: vec![LogicalType::Node],
644 output_column: 0,
645 bound_variable_column: None,
646 },
647 );
648
649 let result = merge.next().unwrap();
650 assert!(result.is_some());
651
652 let nodes = store.nodes_by_label("Person");
654 assert_eq!(nodes.len(), 1);
655 }
656
657 #[test]
658 fn test_merge_with_on_create() {
659 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
660
661 let mut merge = MergeOperator::new(
663 Arc::clone(&store),
664 None,
665 MergeConfig {
666 variable: "n".to_string(),
667 labels: vec!["Person".to_string()],
668 match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
669 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
670 on_match_properties: vec![],
671 output_schema: vec![LogicalType::Node],
672 output_column: 0,
673 bound_variable_column: None,
674 },
675 );
676
677 let _ = merge.next().unwrap();
678
679 let nodes = store.nodes_by_label("Person");
681 let node = store.get_node(nodes[0]).unwrap();
682 assert_eq!(
683 node.properties.get(&PropertyKey::new("name")),
684 Some(&Value::String("Vincent".into()))
685 );
686 assert_eq!(
687 node.properties.get(&PropertyKey::new("created")),
688 Some(&Value::Bool(true))
689 );
690 }
691
692 #[test]
693 fn test_merge_with_on_match() {
694 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
695
696 let node_id = store.create_node_with_props(
698 &["Person"],
699 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
700 );
701
702 let mut merge = MergeOperator::new(
704 Arc::clone(&store),
705 None,
706 MergeConfig {
707 variable: "n".to_string(),
708 labels: vec!["Person".to_string()],
709 match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
710 on_create_properties: vec![],
711 on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
712 output_schema: vec![LogicalType::Node],
713 output_column: 0,
714 bound_variable_column: None,
715 },
716 );
717
718 let _ = merge.next().unwrap();
719
720 let node = store.get_node(node_id).unwrap();
722 assert_eq!(
723 node.properties.get(&PropertyKey::new("updated")),
724 Some(&Value::Bool(true))
725 );
726 }
727}