1use super::{ConstraintValidator, Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, Value)>,
24 pub on_create_properties: Vec<(String, Value)>,
26 pub on_match_properties: Vec<(String, Value)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32}
33
34pub struct MergeOperator {
42 store: Arc<dyn GraphStoreMut>,
44 input: Option<Box<dyn Operator>>,
46 config: MergeConfig,
48 executed: bool,
50 viewing_epoch: Option<EpochId>,
52 transaction_id: Option<TransactionId>,
54 validator: Option<Arc<dyn ConstraintValidator>>,
56}
57
58impl MergeOperator {
59 pub fn new(
61 store: Arc<dyn GraphStoreMut>,
62 input: Option<Box<dyn Operator>>,
63 config: MergeConfig,
64 ) -> Self {
65 Self {
66 store,
67 input,
68 config,
69 executed: false,
70 viewing_epoch: None,
71 transaction_id: None,
72 validator: None,
73 }
74 }
75
76 #[must_use]
78 pub fn variable(&self) -> &str {
79 &self.config.variable
80 }
81
82 pub fn with_transaction_context(
84 mut self,
85 epoch: EpochId,
86 transaction_id: Option<TransactionId>,
87 ) -> Self {
88 self.viewing_epoch = Some(epoch);
89 self.transaction_id = transaction_id;
90 self
91 }
92
93 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
95 self.validator = Some(validator);
96 self
97 }
98
99 fn find_matching_node(&self) -> Option<NodeId> {
101 let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
102 self.store.nodes_by_label(first_label)
103 } else {
104 self.store.node_ids()
105 };
106
107 for node_id in candidates {
108 if let Some(node) = self.store.get_node(node_id) {
109 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
110 if !has_all_labels {
111 continue;
112 }
113
114 let has_all_props =
115 self.config
116 .match_properties
117 .iter()
118 .all(|(key, expected_value)| {
119 node.properties
120 .get(&PropertyKey::new(key.as_str()))
121 .is_some_and(|v| v == expected_value)
122 });
123
124 if has_all_props {
125 return Some(node_id);
126 }
127 }
128 }
129
130 None
131 }
132
133 fn create_node(&self) -> Result<NodeId, super::OperatorError> {
135 if let Some(ref validator) = self.validator {
137 validator.validate_node_labels_allowed(&self.config.labels)?;
138
139 let all_props: Vec<(String, Value)> = self
140 .config
141 .match_properties
142 .iter()
143 .chain(self.config.on_create_properties.iter())
144 .map(|(k, v)| (k.clone(), v.clone()))
145 .collect();
146 for (name, value) in &all_props {
147 validator.validate_node_property(&self.config.labels, name, value)?;
148 validator.check_unique_node_property(&self.config.labels, name, value)?;
149 }
150 validator.validate_node_complete(&self.config.labels, &all_props)?;
151 }
152
153 let mut all_props: Vec<(PropertyKey, Value)> = self
154 .config
155 .match_properties
156 .iter()
157 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
158 .collect();
159
160 for (k, v) in &self.config.on_create_properties {
161 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
162 existing.1 = v.clone();
163 } else {
164 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
165 }
166 }
167
168 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
169 Ok(self.store.create_node_with_props(&labels, &all_props))
170 }
171
172 fn merge_node(&self) -> Result<NodeId, super::OperatorError> {
174 if let Some(existing_id) = self.find_matching_node() {
175 self.apply_on_match(existing_id);
176 Ok(existing_id)
177 } else {
178 self.create_node()
179 }
180 }
181
182 fn apply_on_match(&self, node_id: NodeId) {
184 for (key, value) in &self.config.on_match_properties {
185 if let Some(tid) = self.transaction_id {
186 self.store
187 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
188 } else {
189 self.store
190 .set_node_property(node_id, key.as_str(), value.clone());
191 }
192 }
193 }
194}
195
196impl Operator for MergeOperator {
197 fn next(&mut self) -> OperatorResult {
198 if let Some(ref mut input) = self.input {
201 if let Some(chunk) = input.next()? {
202 let node_id = self.merge_node()?;
204
205 let mut builder =
206 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
207
208 for row in chunk.selected_indices() {
209 for col_idx in 0..chunk.column_count() {
211 if let (Some(src), Some(dst)) =
212 (chunk.column(col_idx), builder.column_mut(col_idx))
213 {
214 if let Some(val) = src.get_value(row) {
215 dst.push_value(val);
216 } else {
217 dst.push_value(Value::Null);
218 }
219 }
220 }
221
222 if let Some(dst) = builder.column_mut(self.config.output_column) {
224 dst.push_node_id(node_id);
225 }
226
227 builder.advance_row();
228 }
229
230 return Ok(Some(builder.finish()));
231 }
232 return Ok(None);
233 }
234
235 if self.executed {
237 return Ok(None);
238 }
239 self.executed = true;
240
241 let node_id = self.merge_node()?;
242
243 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
244 if let Some(dst) = builder.column_mut(self.config.output_column) {
245 dst.push_node_id(node_id);
246 }
247 builder.advance_row();
248
249 Ok(Some(builder.finish()))
250 }
251
252 fn reset(&mut self) {
253 self.executed = false;
254 if let Some(ref mut input) = self.input {
255 input.reset();
256 }
257 }
258
259 fn name(&self) -> &'static str {
260 "Merge"
261 }
262}
263
264pub struct MergeRelationshipConfig {
266 pub source_column: usize,
268 pub target_column: usize,
270 pub edge_type: String,
272 pub match_properties: Vec<(String, Value)>,
274 pub on_create_properties: Vec<(String, Value)>,
276 pub on_match_properties: Vec<(String, Value)>,
278 pub output_schema: Vec<LogicalType>,
280 pub edge_output_column: usize,
282}
283
284pub struct MergeRelationshipOperator {
291 store: Arc<dyn GraphStoreMut>,
293 input: Box<dyn Operator>,
295 config: MergeRelationshipConfig,
297 viewing_epoch: Option<EpochId>,
299 transaction_id: Option<TransactionId>,
301 validator: Option<Arc<dyn ConstraintValidator>>,
303}
304
305impl MergeRelationshipOperator {
306 pub fn new(
308 store: Arc<dyn GraphStoreMut>,
309 input: Box<dyn Operator>,
310 config: MergeRelationshipConfig,
311 ) -> Self {
312 Self {
313 store,
314 input,
315 config,
316 viewing_epoch: None,
317 transaction_id: None,
318 validator: None,
319 }
320 }
321
322 pub fn with_transaction_context(
324 mut self,
325 epoch: EpochId,
326 transaction_id: Option<TransactionId>,
327 ) -> Self {
328 self.viewing_epoch = Some(epoch);
329 self.transaction_id = transaction_id;
330 self
331 }
332
333 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
335 self.validator = Some(validator);
336 self
337 }
338
339 fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
341 use crate::graph::Direction;
342
343 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
344 if target != dst {
345 continue;
346 }
347
348 if let Some(edge) = self.store.get_edge(edge_id) {
349 if edge.edge_type.as_str() != self.config.edge_type {
350 continue;
351 }
352
353 let has_all_props =
354 self.config.match_properties.iter().all(|(key, expected)| {
355 edge.get_property(key).is_some_and(|v| v == expected)
356 });
357
358 if has_all_props {
359 return Some(edge_id);
360 }
361 }
362 }
363
364 None
365 }
366
367 fn create_edge(&self, src: NodeId, dst: NodeId) -> Result<EdgeId, super::OperatorError> {
369 if let Some(ref validator) = self.validator {
371 validator.validate_edge_type_allowed(&self.config.edge_type)?;
372
373 let all_props: Vec<(String, Value)> = self
374 .config
375 .match_properties
376 .iter()
377 .chain(self.config.on_create_properties.iter())
378 .map(|(k, v)| (k.clone(), v.clone()))
379 .collect();
380 for (name, value) in &all_props {
381 validator.validate_edge_property(&self.config.edge_type, name, value)?;
382 }
383 validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
384 }
385
386 let mut all_props: Vec<(PropertyKey, Value)> = self
387 .config
388 .match_properties
389 .iter()
390 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
391 .collect();
392
393 for (k, v) in &self.config.on_create_properties {
394 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
395 existing.1 = v.clone();
396 } else {
397 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
398 }
399 }
400
401 Ok(self
402 .store
403 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
404 }
405
406 fn apply_on_match(&self, edge_id: EdgeId) {
408 for (key, value) in &self.config.on_match_properties {
409 if let Some(tid) = self.transaction_id {
410 self.store
411 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
412 } else {
413 self.store
414 .set_edge_property(edge_id, key.as_str(), value.clone());
415 }
416 }
417 }
418}
419
420impl Operator for MergeRelationshipOperator {
421 fn next(&mut self) -> OperatorResult {
422 use super::OperatorError;
423
424 if let Some(chunk) = self.input.next()? {
425 let mut builder =
426 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
427
428 for row in chunk.selected_indices() {
429 let src_val = chunk
430 .column(self.config.source_column)
431 .and_then(|c| c.get_node_id(row))
432 .ok_or_else(|| OperatorError::TypeMismatch {
433 expected: "NodeId (source)".to_string(),
434 found: "None".to_string(),
435 })?;
436
437 let dst_val = chunk
438 .column(self.config.target_column)
439 .and_then(|c| c.get_node_id(row))
440 .ok_or_else(|| OperatorError::TypeMismatch {
441 expected: "NodeId (target)".to_string(),
442 found: "None".to_string(),
443 })?;
444
445 let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
446 self.apply_on_match(existing);
447 existing
448 } else {
449 self.create_edge(src_val, dst_val)?
450 };
451
452 for col_idx in 0..self.config.output_schema.len() {
454 if col_idx == self.config.edge_output_column {
455 if let Some(dst_col) = builder.column_mut(col_idx) {
456 dst_col.push_edge_id(edge_id);
457 }
458 } else if let (Some(src_col), Some(dst_col)) =
459 (chunk.column(col_idx), builder.column_mut(col_idx))
460 && let Some(val) = src_col.get_value(row)
461 {
462 dst_col.push_value(val);
463 }
464 }
465
466 builder.advance_row();
467 }
468
469 return Ok(Some(builder.finish()));
470 }
471
472 Ok(None)
473 }
474
475 fn reset(&mut self) {
476 self.input.reset();
477 }
478
479 fn name(&self) -> &'static str {
480 "MergeRelationship"
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::graph::lpg::LpgStore;
488
489 #[test]
490 fn test_merge_creates_new_node() {
491 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
492
493 let mut merge = MergeOperator::new(
495 Arc::clone(&store),
496 None,
497 MergeConfig {
498 variable: "n".to_string(),
499 labels: vec!["Person".to_string()],
500 match_properties: vec![("name".to_string(), Value::String("Alix".into()))],
501 on_create_properties: vec![],
502 on_match_properties: vec![],
503 output_schema: vec![LogicalType::Node],
504 output_column: 0,
505 },
506 );
507
508 let result = merge.next().unwrap();
509 assert!(result.is_some());
510
511 let nodes = store.nodes_by_label("Person");
513 assert_eq!(nodes.len(), 1);
514
515 let node = store.get_node(nodes[0]).unwrap();
516 assert!(node.has_label("Person"));
517 assert_eq!(
518 node.properties.get(&PropertyKey::new("name")),
519 Some(&Value::String("Alix".into()))
520 );
521 }
522
523 #[test]
524 fn test_merge_matches_existing_node() {
525 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
526
527 store.create_node_with_props(
529 &["Person"],
530 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
531 );
532
533 let mut merge = MergeOperator::new(
535 Arc::clone(&store),
536 None,
537 MergeConfig {
538 variable: "n".to_string(),
539 labels: vec!["Person".to_string()],
540 match_properties: vec![("name".to_string(), Value::String("Gus".into()))],
541 on_create_properties: vec![],
542 on_match_properties: vec![],
543 output_schema: vec![LogicalType::Node],
544 output_column: 0,
545 },
546 );
547
548 let result = merge.next().unwrap();
549 assert!(result.is_some());
550
551 let nodes = store.nodes_by_label("Person");
553 assert_eq!(nodes.len(), 1);
554 }
555
556 #[test]
557 fn test_merge_with_on_create() {
558 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
559
560 let mut merge = MergeOperator::new(
562 Arc::clone(&store),
563 None,
564 MergeConfig {
565 variable: "n".to_string(),
566 labels: vec!["Person".to_string()],
567 match_properties: vec![("name".to_string(), Value::String("Vincent".into()))],
568 on_create_properties: vec![("created".to_string(), Value::Bool(true))],
569 on_match_properties: vec![],
570 output_schema: vec![LogicalType::Node],
571 output_column: 0,
572 },
573 );
574
575 let _ = merge.next().unwrap();
576
577 let nodes = store.nodes_by_label("Person");
579 let node = store.get_node(nodes[0]).unwrap();
580 assert_eq!(
581 node.properties.get(&PropertyKey::new("name")),
582 Some(&Value::String("Vincent".into()))
583 );
584 assert_eq!(
585 node.properties.get(&PropertyKey::new("created")),
586 Some(&Value::Bool(true))
587 );
588 }
589
590 #[test]
591 fn test_merge_with_on_match() {
592 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
593
594 let node_id = store.create_node_with_props(
596 &["Person"],
597 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
598 );
599
600 let mut merge = MergeOperator::new(
602 Arc::clone(&store),
603 None,
604 MergeConfig {
605 variable: "n".to_string(),
606 labels: vec!["Person".to_string()],
607 match_properties: vec![("name".to_string(), Value::String("Jules".into()))],
608 on_create_properties: vec![],
609 on_match_properties: vec![("updated".to_string(), Value::Bool(true))],
610 output_schema: vec![LogicalType::Node],
611 output_column: 0,
612 },
613 );
614
615 let _ = merge.next().unwrap();
616
617 let node = store.get_node(node_id).unwrap();
619 assert_eq!(
620 node.properties.get(&PropertyKey::new("updated")),
621 Some(&Value::Bool(true))
622 );
623 }
624}