Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, PropertySource)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, PropertySource)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, PropertySource)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32    /// If the merge variable was already bound in the input, this column index
33    /// is used to detect NULL references (e.g., from unmatched OPTIONAL MATCH).
34    /// `None` for standalone MERGE that introduces a new variable.
35    pub bound_variable_column: Option<usize>,
36}
37
38/// Merge operator for MERGE clause.
39///
40/// Tries to match a node with the given labels and properties.
41/// If found, returns the existing node. If not found, creates a new node.
42///
43/// When an input operator is provided (chained MERGE), input rows are
44/// passed through with the merged node ID appended as an additional column.
45pub struct MergeOperator {
46    /// The graph store.
47    store: Arc<dyn GraphStoreMut>,
48    /// Optional input operator (for chained MERGE patterns).
49    input: Option<Box<dyn Operator>>,
50    /// Merge configuration.
51    config: MergeConfig,
52    /// Whether we've already executed (standalone mode only).
53    executed: bool,
54    /// Epoch for MVCC versioning.
55    viewing_epoch: Option<EpochId>,
56    /// Transaction ID for undo log tracking.
57    transaction_id: Option<TransactionId>,
58    /// Optional constraint validator for schema enforcement.
59    validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63    /// Creates a new merge operator.
64    pub fn new(
65        store: Arc<dyn GraphStoreMut>,
66        input: Option<Box<dyn Operator>>,
67        config: MergeConfig,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            config,
73            executed: false,
74            viewing_epoch: None,
75            transaction_id: None,
76            validator: None,
77        }
78    }
79
80    /// Returns the variable name for the merged node.
81    #[must_use]
82    pub fn variable(&self) -> &str {
83        &self.config.variable
84    }
85
86    /// Sets the transaction context for versioned mutations.
87    pub fn with_transaction_context(
88        mut self,
89        epoch: EpochId,
90        transaction_id: Option<TransactionId>,
91    ) -> Self {
92        self.viewing_epoch = Some(epoch);
93        self.transaction_id = transaction_id;
94        self
95    }
96
97    /// Sets the constraint validator for schema enforcement.
98    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99        self.validator = Some(validator);
100        self
101    }
102
103    /// Resolves property sources to concrete values for a given row.
104    fn resolve_properties(
105        props: &[(String, PropertySource)],
106        chunk: Option<&DataChunk>,
107        row: usize,
108        store: &dyn GraphStore,
109    ) -> Vec<(String, Value)> {
110        props
111            .iter()
112            .map(|(name, source)| {
113                let value = if let Some(chunk) = chunk {
114                    source.resolve(chunk, row, store)
115                } else {
116                    // Standalone mode: only constants are valid
117                    match source {
118                        PropertySource::Constant(v) => v.clone(),
119                        _ => Value::Null,
120                    }
121                };
122                (name.clone(), value)
123            })
124            .collect()
125    }
126
127    /// Tries to find a matching node with the given resolved properties.
128    fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129        let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130            self.store.nodes_by_label(first_label)
131        } else {
132            self.store.node_ids()
133        };
134
135        for node_id in candidates {
136            if let Some(node) = self.store.get_node(node_id) {
137                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138                if !has_all_labels {
139                    continue;
140                }
141
142                let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143                    node.properties
144                        .get(&PropertyKey::new(key.as_str()))
145                        .is_some_and(|v| v == expected_value)
146                });
147
148                if has_all_props {
149                    return Some(node_id);
150                }
151            }
152        }
153
154        None
155    }
156
157    /// Creates a new node with the specified labels and resolved properties.
158    fn create_node(
159        &self,
160        resolved_match_props: &[(String, Value)],
161        resolved_create_props: &[(String, Value)],
162    ) -> Result<NodeId, super::OperatorError> {
163        // Validate constraints before creating the node
164        if let Some(ref validator) = self.validator {
165            validator.validate_node_labels_allowed(&self.config.labels)?;
166
167            let all_props: Vec<(String, Value)> = resolved_match_props
168                .iter()
169                .chain(resolved_create_props.iter())
170                .map(|(k, v)| (k.clone(), v.clone()))
171                .collect();
172            for (name, value) in &all_props {
173                validator.validate_node_property(&self.config.labels, name, value)?;
174                validator.check_unique_node_property(&self.config.labels, name, value)?;
175            }
176            validator.validate_node_complete(&self.config.labels, &all_props)?;
177        }
178
179        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
180            .iter()
181            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
182            .collect();
183
184        for (k, v) in resolved_create_props {
185            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
186                existing.1 = v.clone();
187            } else {
188                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
189            }
190        }
191
192        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
193        Ok(self.store.create_node_with_props(&labels, &all_props))
194    }
195
196    /// Finds or creates a matching node for a single row, applying ON MATCH/ON CREATE.
197    fn merge_node_for_row(
198        &self,
199        chunk: Option<&DataChunk>,
200        row: usize,
201    ) -> Result<NodeId, super::OperatorError> {
202        let store_ref: &dyn GraphStore = self.store.as_ref();
203        let resolved_match =
204            Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
205
206        if let Some(existing_id) = self.find_matching_node(&resolved_match) {
207            let resolved_on_match =
208                Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
209            self.apply_on_match(existing_id, &resolved_on_match);
210            Ok(existing_id)
211        } else {
212            let resolved_on_create =
213                Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
214            self.create_node(&resolved_match, &resolved_on_create)
215        }
216    }
217
218    /// Applies ON MATCH properties to an existing node.
219    fn apply_on_match(&self, node_id: NodeId, resolved_on_match: &[(String, Value)]) {
220        for (key, value) in resolved_on_match {
221            if let Some(tid) = self.transaction_id {
222                self.store
223                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
224            } else {
225                self.store
226                    .set_node_property(node_id, key.as_str(), value.clone());
227            }
228        }
229    }
230}
231
232impl Operator for MergeOperator {
233    fn next(&mut self) -> OperatorResult {
234        // When we have an input operator, pass through input rows with the
235        // merged node ID appended (used for chained inline MERGE patterns).
236        if let Some(ref mut input) = self.input {
237            if let Some(chunk) = input.next()? {
238                let mut builder =
239                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
240
241                for row in chunk.selected_indices() {
242                    // Reject NULL bound variables (e.g., from unmatched OPTIONAL MATCH)
243                    if let Some(bound_col) = self.config.bound_variable_column {
244                        let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
245                        if is_null {
246                            return Err(super::OperatorError::TypeMismatch {
247                                expected: format!(
248                                    "non-null node for MERGE variable '{}'",
249                                    self.config.variable
250                                ),
251                                found: "NULL".to_string(),
252                            });
253                        }
254                    }
255
256                    // Merge the node per-row: resolve properties from this row
257                    let node_id = self.merge_node_for_row(Some(&chunk), row)?;
258
259                    // Copy input columns to output
260                    for col_idx in 0..chunk.column_count() {
261                        if let (Some(src), Some(dst)) =
262                            (chunk.column(col_idx), builder.column_mut(col_idx))
263                        {
264                            if let Some(val) = src.get_value(row) {
265                                dst.push_value(val);
266                            } else {
267                                dst.push_value(Value::Null);
268                            }
269                        }
270                    }
271
272                    // Append the merged node ID
273                    if let Some(dst) = builder.column_mut(self.config.output_column) {
274                        dst.push_node_id(node_id);
275                    }
276
277                    builder.advance_row();
278                }
279
280                return Ok(Some(builder.finish()));
281            }
282            return Ok(None);
283        }
284
285        // Standalone mode (no input operator)
286        if self.executed {
287            return Ok(None);
288        }
289        self.executed = true;
290
291        let node_id = self.merge_node_for_row(None, 0)?;
292
293        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
294        if let Some(dst) = builder.column_mut(self.config.output_column) {
295            dst.push_node_id(node_id);
296        }
297        builder.advance_row();
298
299        Ok(Some(builder.finish()))
300    }
301
302    fn reset(&mut self) {
303        self.executed = false;
304        if let Some(ref mut input) = self.input {
305            input.reset();
306        }
307    }
308
309    fn name(&self) -> &'static str {
310        "Merge"
311    }
312}
313
314/// Configuration for a relationship merge operation.
315pub struct MergeRelationshipConfig {
316    /// Column index for the source node ID in the input.
317    pub source_column: usize,
318    /// Column index for the target node ID in the input.
319    pub target_column: usize,
320    /// Variable name for the source node (for error messages).
321    pub source_variable: String,
322    /// Variable name for the target node (for error messages).
323    pub target_variable: String,
324    /// Relationship type to match/create.
325    pub edge_type: String,
326    /// Properties that must match (also used for creation).
327    pub match_properties: Vec<(String, PropertySource)>,
328    /// Properties to set on CREATE.
329    pub on_create_properties: Vec<(String, PropertySource)>,
330    /// Properties to set on MATCH.
331    pub on_match_properties: Vec<(String, PropertySource)>,
332    /// Output schema (input columns + edge column).
333    pub output_schema: Vec<LogicalType>,
334    /// Column index for the edge variable in the output.
335    pub edge_output_column: usize,
336}
337
338/// Merge operator for relationship patterns.
339///
340/// Takes input rows containing source and target node IDs, then for each row:
341/// 1. Searches for an existing relationship matching the type and properties
342/// 2. If found, applies ON MATCH properties and returns the existing edge
343/// 3. If not found, creates a new relationship and applies ON CREATE properties
344pub struct MergeRelationshipOperator {
345    /// The graph store.
346    store: Arc<dyn GraphStoreMut>,
347    /// Input operator providing rows with source/target node columns.
348    input: Box<dyn Operator>,
349    /// Merge configuration.
350    config: MergeRelationshipConfig,
351    /// Epoch for MVCC versioning.
352    viewing_epoch: Option<EpochId>,
353    /// Transaction ID for undo log tracking.
354    transaction_id: Option<TransactionId>,
355    /// Optional constraint validator for schema enforcement.
356    validator: Option<Arc<dyn ConstraintValidator>>,
357}
358
359impl MergeRelationshipOperator {
360    /// Creates a new merge relationship operator.
361    pub fn new(
362        store: Arc<dyn GraphStoreMut>,
363        input: Box<dyn Operator>,
364        config: MergeRelationshipConfig,
365    ) -> Self {
366        Self {
367            store,
368            input,
369            config,
370            viewing_epoch: None,
371            transaction_id: None,
372            validator: None,
373        }
374    }
375
376    /// Sets the transaction context for versioned mutations.
377    pub fn with_transaction_context(
378        mut self,
379        epoch: EpochId,
380        transaction_id: Option<TransactionId>,
381    ) -> Self {
382        self.viewing_epoch = Some(epoch);
383        self.transaction_id = transaction_id;
384        self
385    }
386
387    /// Sets the constraint validator for schema enforcement.
388    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
389        self.validator = Some(validator);
390        self
391    }
392
393    /// Tries to find a matching relationship between source and target.
394    fn find_matching_edge(
395        &self,
396        src: NodeId,
397        dst: NodeId,
398        resolved_match_props: &[(String, Value)],
399    ) -> Option<EdgeId> {
400        use crate::graph::Direction;
401
402        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
403            if target != dst {
404                continue;
405            }
406
407            if let Some(edge) = self.store.get_edge(edge_id) {
408                if edge.edge_type.as_str() != self.config.edge_type {
409                    continue;
410                }
411
412                let has_all_props = resolved_match_props
413                    .iter()
414                    .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
415
416                if has_all_props {
417                    return Some(edge_id);
418                }
419            }
420        }
421
422        None
423    }
424
425    /// Creates a new edge with resolved match and on_create properties.
426    fn create_edge(
427        &self,
428        src: NodeId,
429        dst: NodeId,
430        resolved_match_props: &[(String, Value)],
431        resolved_create_props: &[(String, Value)],
432    ) -> Result<EdgeId, super::OperatorError> {
433        // Validate constraints before creating the edge
434        if let Some(ref validator) = self.validator {
435            validator.validate_edge_type_allowed(&self.config.edge_type)?;
436
437            let all_props: Vec<(String, Value)> = resolved_match_props
438                .iter()
439                .chain(resolved_create_props.iter())
440                .map(|(k, v)| (k.clone(), v.clone()))
441                .collect();
442            for (name, value) in &all_props {
443                validator.validate_edge_property(&self.config.edge_type, name, value)?;
444            }
445            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
446        }
447
448        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
449            .iter()
450            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
451            .collect();
452
453        for (k, v) in resolved_create_props {
454            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
455                existing.1 = v.clone();
456            } else {
457                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
458            }
459        }
460
461        Ok(self
462            .store
463            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
464    }
465
466    /// Applies ON MATCH properties to an existing edge.
467    fn apply_on_match_edge(&self, edge_id: EdgeId, resolved_on_match: &[(String, Value)]) {
468        for (key, value) in resolved_on_match {
469            if let Some(tid) = self.transaction_id {
470                self.store
471                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
472            } else {
473                self.store
474                    .set_edge_property(edge_id, key.as_str(), value.clone());
475            }
476        }
477    }
478}
479
480impl Operator for MergeRelationshipOperator {
481    fn next(&mut self) -> OperatorResult {
482        use super::OperatorError;
483
484        if let Some(chunk) = self.input.next()? {
485            let mut builder =
486                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
487
488            for row in chunk.selected_indices() {
489                let src_val = chunk
490                    .column(self.config.source_column)
491                    .and_then(|c| c.get_node_id(row))
492                    .ok_or_else(|| OperatorError::TypeMismatch {
493                        expected: format!(
494                            "non-null node for MERGE variable '{}'",
495                            self.config.source_variable
496                        ),
497                        found: "NULL".to_string(),
498                    })?;
499
500                let dst_val = chunk
501                    .column(self.config.target_column)
502                    .and_then(|c| c.get_node_id(row))
503                    .ok_or_else(|| OperatorError::TypeMismatch {
504                        expected: format!(
505                            "non-null node for MERGE variable '{}'",
506                            self.config.target_variable
507                        ),
508                        found: "None".to_string(),
509                    })?;
510
511                let store_ref: &dyn GraphStore = self.store.as_ref();
512                let resolved_match = MergeOperator::resolve_properties(
513                    &self.config.match_properties,
514                    Some(&chunk),
515                    row,
516                    store_ref,
517                );
518
519                let edge_id = if let Some(existing) =
520                    self.find_matching_edge(src_val, dst_val, &resolved_match)
521                {
522                    let resolved_on_match = MergeOperator::resolve_properties(
523                        &self.config.on_match_properties,
524                        Some(&chunk),
525                        row,
526                        store_ref,
527                    );
528                    self.apply_on_match_edge(existing, &resolved_on_match);
529                    existing
530                } else {
531                    let resolved_on_create = MergeOperator::resolve_properties(
532                        &self.config.on_create_properties,
533                        Some(&chunk),
534                        row,
535                        store_ref,
536                    );
537                    self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
538                };
539
540                // Copy input columns to output, then add the edge column
541                for col_idx in 0..self.config.output_schema.len() {
542                    if col_idx == self.config.edge_output_column {
543                        if let Some(dst_col) = builder.column_mut(col_idx) {
544                            dst_col.push_edge_id(edge_id);
545                        }
546                    } else if let (Some(src_col), Some(dst_col)) =
547                        (chunk.column(col_idx), builder.column_mut(col_idx))
548                        && let Some(val) = src_col.get_value(row)
549                    {
550                        dst_col.push_value(val);
551                    }
552                }
553
554                builder.advance_row();
555            }
556
557            return Ok(Some(builder.finish()));
558        }
559
560        Ok(None)
561    }
562
563    fn reset(&mut self) {
564        self.input.reset();
565    }
566
567    fn name(&self) -> &'static str {
568        "MergeRelationship"
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use crate::graph::lpg::LpgStore;
576
577    fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
578        props
579            .into_iter()
580            .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
581            .collect()
582    }
583
584    #[test]
585    fn test_merge_creates_new_node() {
586        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
587
588        // MERGE should create a new node since none exists
589        let mut merge = MergeOperator::new(
590            Arc::clone(&store),
591            None,
592            MergeConfig {
593                variable: "n".to_string(),
594                labels: vec!["Person".to_string()],
595                match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
596                on_create_properties: vec![],
597                on_match_properties: vec![],
598                output_schema: vec![LogicalType::Node],
599                output_column: 0,
600                bound_variable_column: None,
601            },
602        );
603
604        let result = merge.next().unwrap();
605        assert!(result.is_some());
606
607        // Verify node was created
608        let nodes = store.nodes_by_label("Person");
609        assert_eq!(nodes.len(), 1);
610
611        let node = store.get_node(nodes[0]).unwrap();
612        assert!(node.has_label("Person"));
613        assert_eq!(
614            node.properties.get(&PropertyKey::new("name")),
615            Some(&Value::String("Alix".into()))
616        );
617    }
618
619    #[test]
620    fn test_merge_matches_existing_node() {
621        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
622
623        // Create an existing node
624        store.create_node_with_props(
625            &["Person"],
626            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
627        );
628
629        // MERGE should find the existing node
630        let mut merge = MergeOperator::new(
631            Arc::clone(&store),
632            None,
633            MergeConfig {
634                variable: "n".to_string(),
635                labels: vec!["Person".to_string()],
636                match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
637                on_create_properties: vec![],
638                on_match_properties: vec![],
639                output_schema: vec![LogicalType::Node],
640                output_column: 0,
641                bound_variable_column: None,
642            },
643        );
644
645        let result = merge.next().unwrap();
646        assert!(result.is_some());
647
648        // Verify only one node exists (no new node created)
649        let nodes = store.nodes_by_label("Person");
650        assert_eq!(nodes.len(), 1);
651    }
652
653    #[test]
654    fn test_merge_with_on_create() {
655        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
656
657        // MERGE with ON CREATE SET
658        let mut merge = MergeOperator::new(
659            Arc::clone(&store),
660            None,
661            MergeConfig {
662                variable: "n".to_string(),
663                labels: vec!["Person".to_string()],
664                match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
665                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
666                on_match_properties: vec![],
667                output_schema: vec![LogicalType::Node],
668                output_column: 0,
669                bound_variable_column: None,
670            },
671        );
672
673        let _ = merge.next().unwrap();
674
675        // Verify node has both match properties and on_create properties
676        let nodes = store.nodes_by_label("Person");
677        let node = store.get_node(nodes[0]).unwrap();
678        assert_eq!(
679            node.properties.get(&PropertyKey::new("name")),
680            Some(&Value::String("Vincent".into()))
681        );
682        assert_eq!(
683            node.properties.get(&PropertyKey::new("created")),
684            Some(&Value::Bool(true))
685        );
686    }
687
688    #[test]
689    fn test_merge_with_on_match() {
690        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
691
692        // Create an existing node
693        let node_id = store.create_node_with_props(
694            &["Person"],
695            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
696        );
697
698        // MERGE with ON MATCH SET
699        let mut merge = MergeOperator::new(
700            Arc::clone(&store),
701            None,
702            MergeConfig {
703                variable: "n".to_string(),
704                labels: vec!["Person".to_string()],
705                match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
706                on_create_properties: vec![],
707                on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
708                output_schema: vec![LogicalType::Node],
709                output_column: 0,
710                bound_variable_column: None,
711            },
712        );
713
714        let _ = merge.next().unwrap();
715
716        // Verify node has the on_match property added
717        let node = store.get_node(node_id).unwrap();
718        assert_eq!(
719            node.properties.get(&PropertyKey::new("updated")),
720            Some(&Value::Bool(true))
721        );
722    }
723}