1use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16pub struct MergeConfig {
18 pub variable: String,
20 pub labels: Vec<String>,
22 pub match_properties: Vec<(String, Value)>,
24 pub on_create_properties: Vec<(String, Value)>,
26 pub on_match_properties: Vec<(String, Value)>,
28 pub output_schema: Vec<LogicalType>,
30 pub output_column: usize,
32}
33
34pub struct MergeOperator {
42 store: Arc<dyn GraphStoreMut>,
44 input: Option<Box<dyn Operator>>,
46 config: MergeConfig,
48 executed: bool,
50 viewing_epoch: Option<EpochId>,
52 transaction_id: Option<TransactionId>,
54}
55
56impl MergeOperator {
57 pub fn new(
59 store: Arc<dyn GraphStoreMut>,
60 input: Option<Box<dyn Operator>>,
61 config: MergeConfig,
62 ) -> Self {
63 Self {
64 store,
65 input,
66 config,
67 executed: false,
68 viewing_epoch: None,
69 transaction_id: None,
70 }
71 }
72
73 #[must_use]
75 pub fn variable(&self) -> &str {
76 &self.config.variable
77 }
78
79 pub fn with_transaction_context(
81 mut self,
82 epoch: EpochId,
83 transaction_id: Option<TransactionId>,
84 ) -> Self {
85 self.viewing_epoch = Some(epoch);
86 self.transaction_id = transaction_id;
87 self
88 }
89
90 fn find_matching_node(&self) -> Option<NodeId> {
92 let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
93 self.store.nodes_by_label(first_label)
94 } else {
95 self.store.node_ids()
96 };
97
98 for node_id in candidates {
99 if let Some(node) = self.store.get_node(node_id) {
100 let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
101 if !has_all_labels {
102 continue;
103 }
104
105 let has_all_props =
106 self.config
107 .match_properties
108 .iter()
109 .all(|(key, expected_value)| {
110 node.properties
111 .get(&PropertyKey::new(key.as_str()))
112 .is_some_and(|v| v == expected_value)
113 });
114
115 if has_all_props {
116 return Some(node_id);
117 }
118 }
119 }
120
121 None
122 }
123
124 fn create_node(&self) -> NodeId {
126 let mut all_props: Vec<(PropertyKey, Value)> = self
127 .config
128 .match_properties
129 .iter()
130 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
131 .collect();
132
133 for (k, v) in &self.config.on_create_properties {
134 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
135 existing.1 = v.clone();
136 } else {
137 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
138 }
139 }
140
141 let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
142 self.store.create_node_with_props(&labels, &all_props)
143 }
144
145 fn merge_node(&self) -> NodeId {
147 if let Some(existing_id) = self.find_matching_node() {
148 self.apply_on_match(existing_id);
149 existing_id
150 } else {
151 self.create_node()
152 }
153 }
154
155 fn apply_on_match(&self, node_id: NodeId) {
157 for (key, value) in &self.config.on_match_properties {
158 if let Some(tid) = self.transaction_id {
159 self.store
160 .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
161 } else {
162 self.store
163 .set_node_property(node_id, key.as_str(), value.clone());
164 }
165 }
166 }
167}
168
169impl Operator for MergeOperator {
170 fn next(&mut self) -> OperatorResult {
171 if let Some(ref mut input) = self.input {
174 if let Some(chunk) = input.next()? {
175 let node_id = self.merge_node();
177
178 let mut builder =
179 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
180
181 for row in chunk.selected_indices() {
182 for col_idx in 0..chunk.column_count() {
184 if let (Some(src), Some(dst)) =
185 (chunk.column(col_idx), builder.column_mut(col_idx))
186 {
187 if let Some(val) = src.get_value(row) {
188 dst.push_value(val);
189 } else {
190 dst.push_value(Value::Null);
191 }
192 }
193 }
194
195 if let Some(dst) = builder.column_mut(self.config.output_column) {
197 dst.push_node_id(node_id);
198 }
199
200 builder.advance_row();
201 }
202
203 return Ok(Some(builder.finish()));
204 }
205 return Ok(None);
206 }
207
208 if self.executed {
210 return Ok(None);
211 }
212 self.executed = true;
213
214 let node_id = self.merge_node();
215
216 let mut builder = DataChunkBuilder::new(&self.config.output_schema);
217 if let Some(dst) = builder.column_mut(self.config.output_column) {
218 dst.push_node_id(node_id);
219 }
220 builder.advance_row();
221
222 Ok(Some(builder.finish()))
223 }
224
225 fn reset(&mut self) {
226 self.executed = false;
227 if let Some(ref mut input) = self.input {
228 input.reset();
229 }
230 }
231
232 fn name(&self) -> &'static str {
233 "Merge"
234 }
235}
236
237pub struct MergeRelationshipConfig {
239 pub source_column: usize,
241 pub target_column: usize,
243 pub edge_type: String,
245 pub match_properties: Vec<(String, Value)>,
247 pub on_create_properties: Vec<(String, Value)>,
249 pub on_match_properties: Vec<(String, Value)>,
251 pub output_schema: Vec<LogicalType>,
253 pub edge_output_column: usize,
255}
256
257pub struct MergeRelationshipOperator {
264 store: Arc<dyn GraphStoreMut>,
266 input: Box<dyn Operator>,
268 config: MergeRelationshipConfig,
270 viewing_epoch: Option<EpochId>,
272 transaction_id: Option<TransactionId>,
274}
275
276impl MergeRelationshipOperator {
277 pub fn new(
279 store: Arc<dyn GraphStoreMut>,
280 input: Box<dyn Operator>,
281 config: MergeRelationshipConfig,
282 ) -> Self {
283 Self {
284 store,
285 input,
286 config,
287 viewing_epoch: None,
288 transaction_id: None,
289 }
290 }
291
292 pub fn with_transaction_context(
294 mut self,
295 epoch: EpochId,
296 transaction_id: Option<TransactionId>,
297 ) -> Self {
298 self.viewing_epoch = Some(epoch);
299 self.transaction_id = transaction_id;
300 self
301 }
302
303 fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
305 use crate::graph::Direction;
306
307 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
308 if target != dst {
309 continue;
310 }
311
312 if let Some(edge) = self.store.get_edge(edge_id) {
313 if edge.edge_type.as_str() != self.config.edge_type {
314 continue;
315 }
316
317 let has_all_props =
318 self.config.match_properties.iter().all(|(key, expected)| {
319 edge.get_property(key).is_some_and(|v| v == expected)
320 });
321
322 if has_all_props {
323 return Some(edge_id);
324 }
325 }
326 }
327
328 None
329 }
330
331 fn create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
333 let mut all_props: Vec<(PropertyKey, Value)> = self
334 .config
335 .match_properties
336 .iter()
337 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
338 .collect();
339
340 for (k, v) in &self.config.on_create_properties {
341 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
342 existing.1 = v.clone();
343 } else {
344 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
345 }
346 }
347
348 self.store
349 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props)
350 }
351
352 fn apply_on_match(&self, edge_id: EdgeId) {
354 for (key, value) in &self.config.on_match_properties {
355 if let Some(tid) = self.transaction_id {
356 self.store
357 .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
358 } else {
359 self.store
360 .set_edge_property(edge_id, key.as_str(), value.clone());
361 }
362 }
363 }
364}
365
366impl Operator for MergeRelationshipOperator {
367 fn next(&mut self) -> OperatorResult {
368 use super::OperatorError;
369
370 if let Some(chunk) = self.input.next()? {
371 let mut builder =
372 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
373
374 for row in chunk.selected_indices() {
375 let src_val = chunk
376 .column(self.config.source_column)
377 .and_then(|c| c.get_node_id(row))
378 .ok_or_else(|| OperatorError::TypeMismatch {
379 expected: "NodeId (source)".to_string(),
380 found: "None".to_string(),
381 })?;
382
383 let dst_val = chunk
384 .column(self.config.target_column)
385 .and_then(|c| c.get_node_id(row))
386 .ok_or_else(|| OperatorError::TypeMismatch {
387 expected: "NodeId (target)".to_string(),
388 found: "None".to_string(),
389 })?;
390
391 let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
392 self.apply_on_match(existing);
393 existing
394 } else {
395 self.create_edge(src_val, dst_val)
396 };
397
398 for col_idx in 0..self.config.output_schema.len() {
400 if col_idx == self.config.edge_output_column {
401 if let Some(dst_col) = builder.column_mut(col_idx) {
402 dst_col.push_edge_id(edge_id);
403 }
404 } else if let (Some(src_col), Some(dst_col)) =
405 (chunk.column(col_idx), builder.column_mut(col_idx))
406 && let Some(val) = src_col.get_value(row)
407 {
408 dst_col.push_value(val);
409 }
410 }
411
412 builder.advance_row();
413 }
414
415 return Ok(Some(builder.finish()));
416 }
417
418 Ok(None)
419 }
420
421 fn reset(&mut self) {
422 self.input.reset();
423 }
424
425 fn name(&self) -> &'static str {
426 "MergeRelationship"
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::graph::lpg::LpgStore;
434
435 #[test]
436 fn test_merge_creates_new_node() {
437 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
438
439 let mut merge = MergeOperator::new(
441 Arc::clone(&store),
442 None,
443 MergeConfig {
444 variable: "n".to_string(),
445 labels: vec!["Person".to_string()],
446 match_properties: vec![("name".to_string(), Value::String("Alix".into()))],
447 on_create_properties: vec![],
448 on_match_properties: vec![],
449 output_schema: vec![LogicalType::Node],
450 output_column: 0,
451 },
452 );
453
454 let result = merge.next().unwrap();
455 assert!(result.is_some());
456
457 let nodes = store.nodes_by_label("Person");
459 assert_eq!(nodes.len(), 1);
460
461 let node = store.get_node(nodes[0]).unwrap();
462 assert!(node.has_label("Person"));
463 assert_eq!(
464 node.properties.get(&PropertyKey::new("name")),
465 Some(&Value::String("Alix".into()))
466 );
467 }
468
469 #[test]
470 fn test_merge_matches_existing_node() {
471 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
472
473 store.create_node_with_props(
475 &["Person"],
476 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
477 );
478
479 let mut merge = MergeOperator::new(
481 Arc::clone(&store),
482 None,
483 MergeConfig {
484 variable: "n".to_string(),
485 labels: vec!["Person".to_string()],
486 match_properties: vec![("name".to_string(), Value::String("Gus".into()))],
487 on_create_properties: vec![],
488 on_match_properties: vec![],
489 output_schema: vec![LogicalType::Node],
490 output_column: 0,
491 },
492 );
493
494 let result = merge.next().unwrap();
495 assert!(result.is_some());
496
497 let nodes = store.nodes_by_label("Person");
499 assert_eq!(nodes.len(), 1);
500 }
501
502 #[test]
503 fn test_merge_with_on_create() {
504 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
505
506 let mut merge = MergeOperator::new(
508 Arc::clone(&store),
509 None,
510 MergeConfig {
511 variable: "n".to_string(),
512 labels: vec!["Person".to_string()],
513 match_properties: vec![("name".to_string(), Value::String("Vincent".into()))],
514 on_create_properties: vec![("created".to_string(), Value::Bool(true))],
515 on_match_properties: vec![],
516 output_schema: vec![LogicalType::Node],
517 output_column: 0,
518 },
519 );
520
521 let _ = merge.next().unwrap();
522
523 let nodes = store.nodes_by_label("Person");
525 let node = store.get_node(nodes[0]).unwrap();
526 assert_eq!(
527 node.properties.get(&PropertyKey::new("name")),
528 Some(&Value::String("Vincent".into()))
529 );
530 assert_eq!(
531 node.properties.get(&PropertyKey::new("created")),
532 Some(&Value::Bool(true))
533 );
534 }
535
536 #[test]
537 fn test_merge_with_on_match() {
538 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
539
540 let node_id = store.create_node_with_props(
542 &["Person"],
543 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
544 );
545
546 let mut merge = MergeOperator::new(
548 Arc::clone(&store),
549 None,
550 MergeConfig {
551 variable: "n".to_string(),
552 labels: vec!["Person".to_string()],
553 match_properties: vec![("name".to_string(), Value::String("Jules".into()))],
554 on_create_properties: vec![],
555 on_match_properties: vec![("updated".to_string(), Value::Bool(true))],
556 output_schema: vec![LogicalType::Node],
557 output_column: 0,
558 },
559 );
560
561 let _ = merge.next().unwrap();
562
563 let node = store.get_node(node_id).unwrap();
565 assert_eq!(
566 node.properties.get(&PropertyKey::new("updated")),
567 Some(&Value::Bool(true))
568 );
569 }
570}