1use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, PropertySource)>,
24 pub on_create_properties: Vec<(String, PropertySource)>,
26 pub on_match_properties: Vec<(String, PropertySource)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32 pub bound_variable_column: Option<usize>,
36}
37
38pub struct MergeOperator {
46 store: Arc<dyn GraphStoreMut>,
48 input: Option<Box<dyn Operator>>,
50 config: MergeConfig,
52 executed: bool,
54 viewing_epoch: Option<EpochId>,
56 transaction_id: Option<TransactionId>,
58 validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63 pub fn new(
65 store: Arc<dyn GraphStoreMut>,
66 input: Option<Box<dyn Operator>>,
67 config: MergeConfig,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 config,
73 executed: false,
74 viewing_epoch: None,
75 transaction_id: None,
76 validator: None,
77 }
78 }
79
80 #[must_use]
82 pub fn variable(&self) -> &str {
83 &self.config.variable
84 }
85
86 pub fn with_transaction_context(
88 mut self,
89 epoch: EpochId,
90 transaction_id: Option<TransactionId>,
91 ) -> Self {
92 self.viewing_epoch = Some(epoch);
93 self.transaction_id = transaction_id;
94 self
95 }
96
97 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99 self.validator = Some(validator);
100 self
101 }
102
103 fn resolve_properties(
105 props: &[(String, PropertySource)],
106 chunk: Option<&DataChunk>,
107 row: usize,
108 store: &dyn GraphStore,
109 ) -> Vec<(String, Value)> {
110 props
111 .iter()
112 .map(|(name, source)| {
113 let value = if let Some(chunk) = chunk {
114 source.resolve(chunk, row, store)
115 } else {
116 match source {
118 PropertySource::Constant(v) => v.clone(),
119 _ => Value::Null,
120 }
121 };
122 (name.clone(), value)
123 })
124 .collect()
125 }
126
127 fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129 let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130 self.store.nodes_by_label(first_label)
131 } else {
132 self.store.node_ids()
133 };
134
135 for node_id in candidates {
136 if let Some(node) = self.store.get_node(node_id) {
137 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138 if !has_all_labels {
139 continue;
140 }
141
142 let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143 node.properties
144 .get(&PropertyKey::new(key.as_str()))
145 .is_some_and(|v| v == expected_value)
146 });
147
148 if has_all_props {
149 return Some(node_id);
150 }
151 }
152 }
153
154 None
155 }
156
157 fn create_node(
159 &self,
160 resolved_match_props: &[(String, Value)],
161 resolved_create_props: &[(String, Value)],
162 ) -> Result<NodeId, super::OperatorError> {
163 if let Some(ref validator) = self.validator {
165 validator.validate_node_labels_allowed(&self.config.labels)?;
166
167 let all_props: Vec<(String, Value)> = resolved_match_props
168 .iter()
169 .chain(resolved_create_props.iter())
170 .map(|(k, v)| (k.clone(), v.clone()))
171 .collect();
172 for (name, value) in &all_props {
173 validator.validate_node_property(&self.config.labels, name, value)?;
174 validator.check_unique_node_property(&self.config.labels, name, value)?;
175 }
176 validator.validate_node_complete(&self.config.labels, &all_props)?;
177 }
178
179 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
180 .iter()
181 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
182 .collect();
183
184 for (k, v) in resolved_create_props {
185 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
186 existing.1 = v.clone();
187 } else {
188 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
189 }
190 }
191
192 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
193 Ok(self.store.create_node_with_props(&labels, &all_props))
194 }
195
196 fn merge_node_for_row(
198 &self,
199 chunk: Option<&DataChunk>,
200 row: usize,
201 ) -> Result<NodeId, super::OperatorError> {
202 let store_ref: &dyn GraphStore = self.store.as_ref();
203 let resolved_match =
204 Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
205
206 if let Some(existing_id) = self.find_matching_node(&resolved_match) {
207 let resolved_on_match =
208 Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
209 self.apply_on_match(existing_id, &resolved_on_match);
210 Ok(existing_id)
211 } else {
212 let resolved_on_create =
213 Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
214 self.create_node(&resolved_match, &resolved_on_create)
215 }
216 }
217
218 fn apply_on_match(&self, node_id: NodeId, resolved_on_match: &[(String, Value)]) {
220 for (key, value) in resolved_on_match {
221 if let Some(tid) = self.transaction_id {
222 self.store
223 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
224 } else {
225 self.store
226 .set_node_property(node_id, key.as_str(), value.clone());
227 }
228 }
229 }
230}
231
232impl Operator for MergeOperator {
233 fn next(&mut self) -> OperatorResult {
234 if let Some(ref mut input) = self.input {
237 if let Some(chunk) = input.next()? {
238 let mut builder =
239 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
240
241 for row in chunk.selected_indices() {
242 if let Some(bound_col) = self.config.bound_variable_column {
244 let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
245 if is_null {
246 return Err(super::OperatorError::TypeMismatch {
247 expected: format!(
248 "non-null node for MERGE variable '{}'",
249 self.config.variable
250 ),
251 found: "NULL".to_string(),
252 });
253 }
254 }
255
256 let node_id = self.merge_node_for_row(Some(&chunk), row)?;
258
259 for col_idx in 0..chunk.column_count() {
261 if let (Some(src), Some(dst)) =
262 (chunk.column(col_idx), builder.column_mut(col_idx))
263 {
264 if let Some(val) = src.get_value(row) {
265 dst.push_value(val);
266 } else {
267 dst.push_value(Value::Null);
268 }
269 }
270 }
271
272 if let Some(dst) = builder.column_mut(self.config.output_column) {
274 dst.push_node_id(node_id);
275 }
276
277 builder.advance_row();
278 }
279
280 return Ok(Some(builder.finish()));
281 }
282 return Ok(None);
283 }
284
285 if self.executed {
287 return Ok(None);
288 }
289 self.executed = true;
290
291 let node_id = self.merge_node_for_row(None, 0)?;
292
293 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
294 if let Some(dst) = builder.column_mut(self.config.output_column) {
295 dst.push_node_id(node_id);
296 }
297 builder.advance_row();
298
299 Ok(Some(builder.finish()))
300 }
301
302 fn reset(&mut self) {
303 self.executed = false;
304 if let Some(ref mut input) = self.input {
305 input.reset();
306 }
307 }
308
309 fn name(&self) -> &'static str {
310 "Merge"
311 }
312}
313
314pub struct MergeRelationshipConfig {
316 pub source_column: usize,
318 pub target_column: usize,
320 pub source_variable: String,
322 pub target_variable: String,
324 pub edge_type: String,
326 pub match_properties: Vec<(String, PropertySource)>,
328 pub on_create_properties: Vec<(String, PropertySource)>,
330 pub on_match_properties: Vec<(String, PropertySource)>,
332 pub output_schema: Vec<LogicalType>,
334 pub edge_output_column: usize,
336}
337
338pub struct MergeRelationshipOperator {
345 store: Arc<dyn GraphStoreMut>,
347 input: Box<dyn Operator>,
349 config: MergeRelationshipConfig,
351 viewing_epoch: Option<EpochId>,
353 transaction_id: Option<TransactionId>,
355 validator: Option<Arc<dyn ConstraintValidator>>,
357}
358
359impl MergeRelationshipOperator {
360 pub fn new(
362 store: Arc<dyn GraphStoreMut>,
363 input: Box<dyn Operator>,
364 config: MergeRelationshipConfig,
365 ) -> Self {
366 Self {
367 store,
368 input,
369 config,
370 viewing_epoch: None,
371 transaction_id: None,
372 validator: None,
373 }
374 }
375
376 pub fn with_transaction_context(
378 mut self,
379 epoch: EpochId,
380 transaction_id: Option<TransactionId>,
381 ) -> Self {
382 self.viewing_epoch = Some(epoch);
383 self.transaction_id = transaction_id;
384 self
385 }
386
387 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
389 self.validator = Some(validator);
390 self
391 }
392
393 fn find_matching_edge(
395 &self,
396 src: NodeId,
397 dst: NodeId,
398 resolved_match_props: &[(String, Value)],
399 ) -> Option<EdgeId> {
400 use crate::graph::Direction;
401
402 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
403 if target != dst {
404 continue;
405 }
406
407 if let Some(edge) = self.store.get_edge(edge_id) {
408 if edge.edge_type.as_str() != self.config.edge_type {
409 continue;
410 }
411
412 let has_all_props = resolved_match_props
413 .iter()
414 .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
415
416 if has_all_props {
417 return Some(edge_id);
418 }
419 }
420 }
421
422 None
423 }
424
425 fn create_edge(
427 &self,
428 src: NodeId,
429 dst: NodeId,
430 resolved_match_props: &[(String, Value)],
431 resolved_create_props: &[(String, Value)],
432 ) -> Result<EdgeId, super::OperatorError> {
433 if let Some(ref validator) = self.validator {
435 validator.validate_edge_type_allowed(&self.config.edge_type)?;
436
437 let all_props: Vec<(String, Value)> = resolved_match_props
438 .iter()
439 .chain(resolved_create_props.iter())
440 .map(|(k, v)| (k.clone(), v.clone()))
441 .collect();
442 for (name, value) in &all_props {
443 validator.validate_edge_property(&self.config.edge_type, name, value)?;
444 }
445 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
446 }
447
448 let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
449 .iter()
450 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
451 .collect();
452
453 for (k, v) in resolved_create_props {
454 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
455 existing.1 = v.clone();
456 } else {
457 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
458 }
459 }
460
461 Ok(self
462 .store
463 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
464 }
465
466 fn apply_on_match_edge(&self, edge_id: EdgeId, resolved_on_match: &[(String, Value)]) {
468 for (key, value) in resolved_on_match {
469 if let Some(tid) = self.transaction_id {
470 self.store
471 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
472 } else {
473 self.store
474 .set_edge_property(edge_id, key.as_str(), value.clone());
475 }
476 }
477 }
478}
479
480impl Operator for MergeRelationshipOperator {
481 fn next(&mut self) -> OperatorResult {
482 use super::OperatorError;
483
484 if let Some(chunk) = self.input.next()? {
485 let mut builder =
486 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
487
488 for row in chunk.selected_indices() {
489 let src_val = chunk
490 .column(self.config.source_column)
491 .and_then(|c| c.get_node_id(row))
492 .ok_or_else(|| OperatorError::TypeMismatch {
493 expected: format!(
494 "non-null node for MERGE variable '{}'",
495 self.config.source_variable
496 ),
497 found: "NULL".to_string(),
498 })?;
499
500 let dst_val = chunk
501 .column(self.config.target_column)
502 .and_then(|c| c.get_node_id(row))
503 .ok_or_else(|| OperatorError::TypeMismatch {
504 expected: format!(
505 "non-null node for MERGE variable '{}'",
506 self.config.target_variable
507 ),
508 found: "None".to_string(),
509 })?;
510
511 let store_ref: &dyn GraphStore = self.store.as_ref();
512 let resolved_match = MergeOperator::resolve_properties(
513 &self.config.match_properties,
514 Some(&chunk),
515 row,
516 store_ref,
517 );
518
519 let edge_id = if let Some(existing) =
520 self.find_matching_edge(src_val, dst_val, &resolved_match)
521 {
522 let resolved_on_match = MergeOperator::resolve_properties(
523 &self.config.on_match_properties,
524 Some(&chunk),
525 row,
526 store_ref,
527 );
528 self.apply_on_match_edge(existing, &resolved_on_match);
529 existing
530 } else {
531 let resolved_on_create = MergeOperator::resolve_properties(
532 &self.config.on_create_properties,
533 Some(&chunk),
534 row,
535 store_ref,
536 );
537 self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
538 };
539
540 for col_idx in 0..self.config.output_schema.len() {
542 if col_idx == self.config.edge_output_column {
543 if let Some(dst_col) = builder.column_mut(col_idx) {
544 dst_col.push_edge_id(edge_id);
545 }
546 } else if let (Some(src_col), Some(dst_col)) =
547 (chunk.column(col_idx), builder.column_mut(col_idx))
548 && let Some(val) = src_col.get_value(row)
549 {
550 dst_col.push_value(val);
551 }
552 }
553
554 builder.advance_row();
555 }
556
557 return Ok(Some(builder.finish()));
558 }
559
560 Ok(None)
561 }
562
563 fn reset(&mut self) {
564 self.input.reset();
565 }
566
567 fn name(&self) -> &'static str {
568 "MergeRelationship"
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use crate::graph::lpg::LpgStore;
576
577 fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
578 props
579 .into_iter()
580 .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
581 .collect()
582 }
583
584 #[test]
585 fn test_merge_creates_new_node() {
586 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
587
588 let mut merge = MergeOperator::new(
590 Arc::clone(&store),
591 None,
592 MergeConfig {
593 variable: "n".to_string(),
594 labels: vec!["Person".to_string()],
595 match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
596 on_create_properties: vec![],
597 on_match_properties: vec![],
598 output_schema: vec![LogicalType::Node],
599 output_column: 0,
600 bound_variable_column: None,
601 },
602 );
603
604 let result = merge.next().unwrap();
605 assert!(result.is_some());
606
607 let nodes = store.nodes_by_label("Person");
609 assert_eq!(nodes.len(), 1);
610
611 let node = store.get_node(nodes[0]).unwrap();
612 assert!(node.has_label("Person"));
613 assert_eq!(
614 node.properties.get(&PropertyKey::new("name")),
615 Some(&Value::String("Alix".into()))
616 );
617 }
618
619 #[test]
620 fn test_merge_matches_existing_node() {
621 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
622
623 store.create_node_with_props(
625 &["Person"],
626 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
627 );
628
629 let mut merge = MergeOperator::new(
631 Arc::clone(&store),
632 None,
633 MergeConfig {
634 variable: "n".to_string(),
635 labels: vec!["Person".to_string()],
636 match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
637 on_create_properties: vec![],
638 on_match_properties: vec![],
639 output_schema: vec![LogicalType::Node],
640 output_column: 0,
641 bound_variable_column: None,
642 },
643 );
644
645 let result = merge.next().unwrap();
646 assert!(result.is_some());
647
648 let nodes = store.nodes_by_label("Person");
650 assert_eq!(nodes.len(), 1);
651 }
652
653 #[test]
654 fn test_merge_with_on_create() {
655 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
656
657 let mut merge = MergeOperator::new(
659 Arc::clone(&store),
660 None,
661 MergeConfig {
662 variable: "n".to_string(),
663 labels: vec!["Person".to_string()],
664 match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
665 on_create_properties: const_props(vec![("created", Value::Bool(true))]),
666 on_match_properties: vec![],
667 output_schema: vec![LogicalType::Node],
668 output_column: 0,
669 bound_variable_column: None,
670 },
671 );
672
673 let _ = merge.next().unwrap();
674
675 let nodes = store.nodes_by_label("Person");
677 let node = store.get_node(nodes[0]).unwrap();
678 assert_eq!(
679 node.properties.get(&PropertyKey::new("name")),
680 Some(&Value::String("Vincent".into()))
681 );
682 assert_eq!(
683 node.properties.get(&PropertyKey::new("created")),
684 Some(&Value::Bool(true))
685 );
686 }
687
688 #[test]
689 fn test_merge_with_on_match() {
690 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
691
692 let node_id = store.create_node_with_props(
694 &["Person"],
695 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
696 );
697
698 let mut merge = MergeOperator::new(
700 Arc::clone(&store),
701 None,
702 MergeConfig {
703 variable: "n".to_string(),
704 labels: vec!["Person".to_string()],
705 match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
706 on_create_properties: vec![],
707 on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
708 output_schema: vec![LogicalType::Node],
709 output_column: 0,
710 bound_variable_column: None,
711 },
712 );
713
714 let _ = merge.next().unwrap();
715
716 let node = store.get_node(node_id).unwrap();
718 assert_eq!(
719 node.properties.get(&PropertyKey::new("updated")),
720 Some(&Value::Bool(true))
721 );
722 }
723}