Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, PropertySource)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, PropertySource)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, PropertySource)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32    /// If the merge variable was already bound in the input, this column index
33    /// is used to detect NULL references (e.g., from unmatched OPTIONAL MATCH).
34    /// `None` for standalone MERGE that introduces a new variable.
35    pub bound_variable_column: Option<usize>,
36}
37
38/// Merge operator for MERGE clause.
39///
40/// Tries to match a node with the given labels and properties.
41/// If found, returns the existing node. If not found, creates a new node.
42///
43/// When an input operator is provided (chained MERGE), input rows are
44/// passed through with the merged node ID appended as an additional column.
45pub struct MergeOperator {
46    /// The graph store.
47    store: Arc<dyn GraphStoreMut>,
48    /// Optional input operator (for chained MERGE patterns).
49    input: Option<Box<dyn Operator>>,
50    /// Merge configuration.
51    config: MergeConfig,
52    /// Whether we've already executed (standalone mode only).
53    executed: bool,
54    /// Epoch for MVCC versioning.
55    viewing_epoch: Option<EpochId>,
56    /// Transaction ID for undo log tracking.
57    transaction_id: Option<TransactionId>,
58    /// Optional constraint validator for schema enforcement.
59    validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63    /// Creates a new merge operator.
64    pub fn new(
65        store: Arc<dyn GraphStoreMut>,
66        input: Option<Box<dyn Operator>>,
67        config: MergeConfig,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            config,
73            executed: false,
74            viewing_epoch: None,
75            transaction_id: None,
76            validator: None,
77        }
78    }
79
80    /// Returns the variable name for the merged node.
81    #[must_use]
82    pub fn variable(&self) -> &str {
83        &self.config.variable
84    }
85
86    /// Sets the transaction context for versioned mutations.
87    pub fn with_transaction_context(
88        mut self,
89        epoch: EpochId,
90        transaction_id: Option<TransactionId>,
91    ) -> Self {
92        self.viewing_epoch = Some(epoch);
93        self.transaction_id = transaction_id;
94        self
95    }
96
97    /// Sets the constraint validator for schema enforcement.
98    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99        self.validator = Some(validator);
100        self
101    }
102
103    /// Resolves property sources to concrete values for a given row.
104    fn resolve_properties(
105        props: &[(String, PropertySource)],
106        chunk: Option<&DataChunk>,
107        row: usize,
108        store: &dyn GraphStore,
109    ) -> Vec<(String, Value)> {
110        props
111            .iter()
112            .map(|(name, source)| {
113                let value = if let Some(chunk) = chunk {
114                    source.resolve(chunk, row, store)
115                } else {
116                    // Standalone mode: only constants are valid
117                    match source {
118                        PropertySource::Constant(v) => v.clone(),
119                        _ => Value::Null,
120                    }
121                };
122                (name.clone(), value)
123            })
124            .collect()
125    }
126
127    /// Tries to find a matching node with the given resolved properties.
128    fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129        let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130            self.store.nodes_by_label(first_label)
131        } else {
132            self.store.node_ids()
133        };
134
135        for node_id in candidates {
136            if let Some(node) = self.store.get_node(node_id) {
137                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138                if !has_all_labels {
139                    continue;
140                }
141
142                let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143                    let prop = node.properties.get(&PropertyKey::new(key.as_str()));
144                    if expected_value.is_null() {
145                        // Null in a MERGE pattern matches both absent and explicitly null properties
146                        prop.map_or(true, |v| v.is_null())
147                    } else {
148                        prop.is_some_and(|v| v == expected_value)
149                    }
150                });
151
152                if has_all_props {
153                    return Some(node_id);
154                }
155            }
156        }
157
158        None
159    }
160
161    /// Creates a new node with the specified labels and resolved properties.
162    fn create_node(
163        &self,
164        resolved_match_props: &[(String, Value)],
165        resolved_create_props: &[(String, Value)],
166    ) -> Result<NodeId, super::OperatorError> {
167        // Validate constraints before creating the node
168        if let Some(ref validator) = self.validator {
169            validator.validate_node_labels_allowed(&self.config.labels)?;
170
171            let all_props: Vec<(String, Value)> = resolved_match_props
172                .iter()
173                .chain(resolved_create_props.iter())
174                .map(|(k, v)| (k.clone(), v.clone()))
175                .collect();
176            for (name, value) in &all_props {
177                validator.validate_node_property(&self.config.labels, name, value)?;
178                validator.check_unique_node_property(&self.config.labels, name, value)?;
179            }
180            validator.validate_node_complete(&self.config.labels, &all_props)?;
181        }
182
183        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
184            .iter()
185            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
186            .collect();
187
188        for (k, v) in resolved_create_props {
189            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
190                existing.1 = v.clone();
191            } else {
192                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
193            }
194        }
195
196        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
197        Ok(self.store.create_node_with_props(&labels, &all_props))
198    }
199
200    /// Finds or creates a matching node for a single row, applying ON MATCH/ON CREATE.
201    fn merge_node_for_row(
202        &self,
203        chunk: Option<&DataChunk>,
204        row: usize,
205    ) -> Result<NodeId, super::OperatorError> {
206        let store_ref: &dyn GraphStore = self.store.as_ref();
207        let resolved_match =
208            Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
209
210        if let Some(existing_id) = self.find_matching_node(&resolved_match) {
211            let resolved_on_match =
212                Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
213            self.apply_on_match(existing_id, &resolved_on_match);
214            Ok(existing_id)
215        } else {
216            let resolved_on_create =
217                Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
218            self.create_node(&resolved_match, &resolved_on_create)
219        }
220    }
221
222    /// Applies ON MATCH properties to an existing node.
223    fn apply_on_match(&self, node_id: NodeId, resolved_on_match: &[(String, Value)]) {
224        for (key, value) in resolved_on_match {
225            if let Some(tid) = self.transaction_id {
226                self.store
227                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
228            } else {
229                self.store
230                    .set_node_property(node_id, key.as_str(), value.clone());
231            }
232        }
233    }
234}
235
236impl Operator for MergeOperator {
237    fn next(&mut self) -> OperatorResult {
238        // When we have an input operator, pass through input rows with the
239        // merged node ID appended (used for chained inline MERGE patterns).
240        if let Some(ref mut input) = self.input {
241            if let Some(chunk) = input.next()? {
242                let mut builder =
243                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
244
245                for row in chunk.selected_indices() {
246                    // Reject NULL bound variables (e.g., from unmatched OPTIONAL MATCH)
247                    if let Some(bound_col) = self.config.bound_variable_column {
248                        let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
249                        if is_null {
250                            return Err(super::OperatorError::TypeMismatch {
251                                expected: format!(
252                                    "non-null node for MERGE variable '{}'",
253                                    self.config.variable
254                                ),
255                                found: "NULL".to_string(),
256                            });
257                        }
258                    }
259
260                    // Merge the node per-row: resolve properties from this row
261                    let node_id = self.merge_node_for_row(Some(&chunk), row)?;
262
263                    // Copy input columns to output
264                    for col_idx in 0..chunk.column_count() {
265                        if let (Some(src), Some(dst)) =
266                            (chunk.column(col_idx), builder.column_mut(col_idx))
267                        {
268                            if let Some(val) = src.get_value(row) {
269                                dst.push_value(val);
270                            } else {
271                                dst.push_value(Value::Null);
272                            }
273                        }
274                    }
275
276                    // Append the merged node ID
277                    if let Some(dst) = builder.column_mut(self.config.output_column) {
278                        dst.push_node_id(node_id);
279                    }
280
281                    builder.advance_row();
282                }
283
284                return Ok(Some(builder.finish()));
285            }
286            return Ok(None);
287        }
288
289        // Standalone mode (no input operator)
290        if self.executed {
291            return Ok(None);
292        }
293        self.executed = true;
294
295        let node_id = self.merge_node_for_row(None, 0)?;
296
297        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
298        if let Some(dst) = builder.column_mut(self.config.output_column) {
299            dst.push_node_id(node_id);
300        }
301        builder.advance_row();
302
303        Ok(Some(builder.finish()))
304    }
305
306    fn reset(&mut self) {
307        self.executed = false;
308        if let Some(ref mut input) = self.input {
309            input.reset();
310        }
311    }
312
313    fn name(&self) -> &'static str {
314        "Merge"
315    }
316}
317
318/// Configuration for a relationship merge operation.
319pub struct MergeRelationshipConfig {
320    /// Column index for the source node ID in the input.
321    pub source_column: usize,
322    /// Column index for the target node ID in the input.
323    pub target_column: usize,
324    /// Variable name for the source node (for error messages).
325    pub source_variable: String,
326    /// Variable name for the target node (for error messages).
327    pub target_variable: String,
328    /// Relationship type to match/create.
329    pub edge_type: String,
330    /// Properties that must match (also used for creation).
331    pub match_properties: Vec<(String, PropertySource)>,
332    /// Properties to set on CREATE.
333    pub on_create_properties: Vec<(String, PropertySource)>,
334    /// Properties to set on MATCH.
335    pub on_match_properties: Vec<(String, PropertySource)>,
336    /// Output schema (input columns + edge column).
337    pub output_schema: Vec<LogicalType>,
338    /// Column index for the edge variable in the output.
339    pub edge_output_column: usize,
340}
341
342/// Merge operator for relationship patterns.
343///
344/// Takes input rows containing source and target node IDs, then for each row:
345/// 1. Searches for an existing relationship matching the type and properties
346/// 2. If found, applies ON MATCH properties and returns the existing edge
347/// 3. If not found, creates a new relationship and applies ON CREATE properties
348pub struct MergeRelationshipOperator {
349    /// The graph store.
350    store: Arc<dyn GraphStoreMut>,
351    /// Input operator providing rows with source/target node columns.
352    input: Box<dyn Operator>,
353    /// Merge configuration.
354    config: MergeRelationshipConfig,
355    /// Epoch for MVCC versioning.
356    viewing_epoch: Option<EpochId>,
357    /// Transaction ID for undo log tracking.
358    transaction_id: Option<TransactionId>,
359    /// Optional constraint validator for schema enforcement.
360    validator: Option<Arc<dyn ConstraintValidator>>,
361}
362
363impl MergeRelationshipOperator {
364    /// Creates a new merge relationship operator.
365    pub fn new(
366        store: Arc<dyn GraphStoreMut>,
367        input: Box<dyn Operator>,
368        config: MergeRelationshipConfig,
369    ) -> Self {
370        Self {
371            store,
372            input,
373            config,
374            viewing_epoch: None,
375            transaction_id: None,
376            validator: None,
377        }
378    }
379
380    /// Sets the transaction context for versioned mutations.
381    pub fn with_transaction_context(
382        mut self,
383        epoch: EpochId,
384        transaction_id: Option<TransactionId>,
385    ) -> Self {
386        self.viewing_epoch = Some(epoch);
387        self.transaction_id = transaction_id;
388        self
389    }
390
391    /// Sets the constraint validator for schema enforcement.
392    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
393        self.validator = Some(validator);
394        self
395    }
396
397    /// Tries to find a matching relationship between source and target.
398    fn find_matching_edge(
399        &self,
400        src: NodeId,
401        dst: NodeId,
402        resolved_match_props: &[(String, Value)],
403    ) -> Option<EdgeId> {
404        use crate::graph::Direction;
405
406        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
407            if target != dst {
408                continue;
409            }
410
411            if let Some(edge) = self.store.get_edge(edge_id) {
412                if edge.edge_type.as_str() != self.config.edge_type {
413                    continue;
414                }
415
416                let has_all_props = resolved_match_props
417                    .iter()
418                    .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
419
420                if has_all_props {
421                    return Some(edge_id);
422                }
423            }
424        }
425
426        None
427    }
428
429    /// Creates a new edge with resolved match and on_create properties.
430    fn create_edge(
431        &self,
432        src: NodeId,
433        dst: NodeId,
434        resolved_match_props: &[(String, Value)],
435        resolved_create_props: &[(String, Value)],
436    ) -> Result<EdgeId, super::OperatorError> {
437        // Validate constraints before creating the edge
438        if let Some(ref validator) = self.validator {
439            validator.validate_edge_type_allowed(&self.config.edge_type)?;
440
441            let all_props: Vec<(String, Value)> = resolved_match_props
442                .iter()
443                .chain(resolved_create_props.iter())
444                .map(|(k, v)| (k.clone(), v.clone()))
445                .collect();
446            for (name, value) in &all_props {
447                validator.validate_edge_property(&self.config.edge_type, name, value)?;
448            }
449            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
450        }
451
452        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
453            .iter()
454            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
455            .collect();
456
457        for (k, v) in resolved_create_props {
458            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
459                existing.1 = v.clone();
460            } else {
461                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
462            }
463        }
464
465        Ok(self
466            .store
467            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
468    }
469
470    /// Applies ON MATCH properties to an existing edge.
471    fn apply_on_match_edge(&self, edge_id: EdgeId, resolved_on_match: &[(String, Value)]) {
472        for (key, value) in resolved_on_match {
473            if let Some(tid) = self.transaction_id {
474                self.store
475                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
476            } else {
477                self.store
478                    .set_edge_property(edge_id, key.as_str(), value.clone());
479            }
480        }
481    }
482}
483
484impl Operator for MergeRelationshipOperator {
485    fn next(&mut self) -> OperatorResult {
486        use super::OperatorError;
487
488        if let Some(chunk) = self.input.next()? {
489            let mut builder =
490                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
491
492            for row in chunk.selected_indices() {
493                let src_val = chunk
494                    .column(self.config.source_column)
495                    .and_then(|c| c.get_node_id(row))
496                    .ok_or_else(|| OperatorError::TypeMismatch {
497                        expected: format!(
498                            "non-null node for MERGE variable '{}'",
499                            self.config.source_variable
500                        ),
501                        found: "NULL".to_string(),
502                    })?;
503
504                let dst_val = chunk
505                    .column(self.config.target_column)
506                    .and_then(|c| c.get_node_id(row))
507                    .ok_or_else(|| OperatorError::TypeMismatch {
508                        expected: format!(
509                            "non-null node for MERGE variable '{}'",
510                            self.config.target_variable
511                        ),
512                        found: "None".to_string(),
513                    })?;
514
515                let store_ref: &dyn GraphStore = self.store.as_ref();
516                let resolved_match = MergeOperator::resolve_properties(
517                    &self.config.match_properties,
518                    Some(&chunk),
519                    row,
520                    store_ref,
521                );
522
523                let edge_id = if let Some(existing) =
524                    self.find_matching_edge(src_val, dst_val, &resolved_match)
525                {
526                    let resolved_on_match = MergeOperator::resolve_properties(
527                        &self.config.on_match_properties,
528                        Some(&chunk),
529                        row,
530                        store_ref,
531                    );
532                    self.apply_on_match_edge(existing, &resolved_on_match);
533                    existing
534                } else {
535                    let resolved_on_create = MergeOperator::resolve_properties(
536                        &self.config.on_create_properties,
537                        Some(&chunk),
538                        row,
539                        store_ref,
540                    );
541                    self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
542                };
543
544                // Copy input columns to output, then add the edge column
545                for col_idx in 0..self.config.output_schema.len() {
546                    if col_idx == self.config.edge_output_column {
547                        if let Some(dst_col) = builder.column_mut(col_idx) {
548                            dst_col.push_edge_id(edge_id);
549                        }
550                    } else if let (Some(src_col), Some(dst_col)) =
551                        (chunk.column(col_idx), builder.column_mut(col_idx))
552                        && let Some(val) = src_col.get_value(row)
553                    {
554                        dst_col.push_value(val);
555                    }
556                }
557
558                builder.advance_row();
559            }
560
561            return Ok(Some(builder.finish()));
562        }
563
564        Ok(None)
565    }
566
567    fn reset(&mut self) {
568        self.input.reset();
569    }
570
571    fn name(&self) -> &'static str {
572        "MergeRelationship"
573    }
574}
575
576#[cfg(all(test, feature = "lpg"))]
577mod tests {
578    use super::*;
579    use crate::graph::lpg::LpgStore;
580
581    fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
582        props
583            .into_iter()
584            .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
585            .collect()
586    }
587
588    #[test]
589    fn test_merge_creates_new_node() {
590        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
591
592        // MERGE should create a new node since none exists
593        let mut merge = MergeOperator::new(
594            Arc::clone(&store),
595            None,
596            MergeConfig {
597                variable: "n".to_string(),
598                labels: vec!["Person".to_string()],
599                match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
600                on_create_properties: vec![],
601                on_match_properties: vec![],
602                output_schema: vec![LogicalType::Node],
603                output_column: 0,
604                bound_variable_column: None,
605            },
606        );
607
608        let result = merge.next().unwrap();
609        assert!(result.is_some());
610
611        // Verify node was created
612        let nodes = store.nodes_by_label("Person");
613        assert_eq!(nodes.len(), 1);
614
615        let node = store.get_node(nodes[0]).unwrap();
616        assert!(node.has_label("Person"));
617        assert_eq!(
618            node.properties.get(&PropertyKey::new("name")),
619            Some(&Value::String("Alix".into()))
620        );
621    }
622
623    #[test]
624    fn test_merge_matches_existing_node() {
625        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
626
627        // Create an existing node
628        store.create_node_with_props(
629            &["Person"],
630            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
631        );
632
633        // MERGE should find the existing node
634        let mut merge = MergeOperator::new(
635            Arc::clone(&store),
636            None,
637            MergeConfig {
638                variable: "n".to_string(),
639                labels: vec!["Person".to_string()],
640                match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
641                on_create_properties: vec![],
642                on_match_properties: vec![],
643                output_schema: vec![LogicalType::Node],
644                output_column: 0,
645                bound_variable_column: None,
646            },
647        );
648
649        let result = merge.next().unwrap();
650        assert!(result.is_some());
651
652        // Verify only one node exists (no new node created)
653        let nodes = store.nodes_by_label("Person");
654        assert_eq!(nodes.len(), 1);
655    }
656
657    #[test]
658    fn test_merge_with_on_create() {
659        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
660
661        // MERGE with ON CREATE SET
662        let mut merge = MergeOperator::new(
663            Arc::clone(&store),
664            None,
665            MergeConfig {
666                variable: "n".to_string(),
667                labels: vec!["Person".to_string()],
668                match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
669                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
670                on_match_properties: vec![],
671                output_schema: vec![LogicalType::Node],
672                output_column: 0,
673                bound_variable_column: None,
674            },
675        );
676
677        let _ = merge.next().unwrap();
678
679        // Verify node has both match properties and on_create properties
680        let nodes = store.nodes_by_label("Person");
681        let node = store.get_node(nodes[0]).unwrap();
682        assert_eq!(
683            node.properties.get(&PropertyKey::new("name")),
684            Some(&Value::String("Vincent".into()))
685        );
686        assert_eq!(
687            node.properties.get(&PropertyKey::new("created")),
688            Some(&Value::Bool(true))
689        );
690    }
691
692    #[test]
693    fn test_merge_with_on_match() {
694        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
695
696        // Create an existing node
697        let node_id = store.create_node_with_props(
698            &["Person"],
699            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
700        );
701
702        // MERGE with ON MATCH SET
703        let mut merge = MergeOperator::new(
704            Arc::clone(&store),
705            None,
706            MergeConfig {
707                variable: "n".to_string(),
708                labels: vec!["Person".to_string()],
709                match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
710                on_create_properties: vec![],
711                on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
712                output_schema: vec![LogicalType::Node],
713                output_column: 0,
714                bound_variable_column: None,
715            },
716        );
717
718        let _ = merge.next().unwrap();
719
720        // Verify node has the on_match property added
721        let node = store.get_node(node_id).unwrap();
722        assert_eq!(
723            node.properties.get(&PropertyKey::new("updated")),
724            Some(&Value::Bool(true))
725        );
726    }
727}