Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{ConstraintValidator, Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, Value)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, Value)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, Value)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32}
33
34/// Merge operator for MERGE clause.
35///
36/// Tries to match a node with the given labels and properties.
37/// If found, returns the existing node. If not found, creates a new node.
38///
39/// When an input operator is provided (chained MERGE), input rows are
40/// passed through with the merged node ID appended as an additional column.
41pub struct MergeOperator {
42    /// The graph store.
43    store: Arc<dyn GraphStoreMut>,
44    /// Optional input operator (for chained MERGE patterns).
45    input: Option<Box<dyn Operator>>,
46    /// Merge configuration.
47    config: MergeConfig,
48    /// Whether we've already executed (standalone mode only).
49    executed: bool,
50    /// Epoch for MVCC versioning.
51    viewing_epoch: Option<EpochId>,
52    /// Transaction ID for undo log tracking.
53    transaction_id: Option<TransactionId>,
54    /// Optional constraint validator for schema enforcement.
55    validator: Option<Arc<dyn ConstraintValidator>>,
56}
57
58impl MergeOperator {
59    /// Creates a new merge operator.
60    pub fn new(
61        store: Arc<dyn GraphStoreMut>,
62        input: Option<Box<dyn Operator>>,
63        config: MergeConfig,
64    ) -> Self {
65        Self {
66            store,
67            input,
68            config,
69            executed: false,
70            viewing_epoch: None,
71            transaction_id: None,
72            validator: None,
73        }
74    }
75
76    /// Returns the variable name for the merged node.
77    #[must_use]
78    pub fn variable(&self) -> &str {
79        &self.config.variable
80    }
81
82    /// Sets the transaction context for versioned mutations.
83    pub fn with_transaction_context(
84        mut self,
85        epoch: EpochId,
86        transaction_id: Option<TransactionId>,
87    ) -> Self {
88        self.viewing_epoch = Some(epoch);
89        self.transaction_id = transaction_id;
90        self
91    }
92
93    /// Sets the constraint validator for schema enforcement.
94    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
95        self.validator = Some(validator);
96        self
97    }
98
99    /// Tries to find a matching node.
100    fn find_matching_node(&self) -> Option<NodeId> {
101        let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
102            self.store.nodes_by_label(first_label)
103        } else {
104            self.store.node_ids()
105        };
106
107        for node_id in candidates {
108            if let Some(node) = self.store.get_node(node_id) {
109                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
110                if !has_all_labels {
111                    continue;
112                }
113
114                let has_all_props =
115                    self.config
116                        .match_properties
117                        .iter()
118                        .all(|(key, expected_value)| {
119                            node.properties
120                                .get(&PropertyKey::new(key.as_str()))
121                                .is_some_and(|v| v == expected_value)
122                        });
123
124                if has_all_props {
125                    return Some(node_id);
126                }
127            }
128        }
129
130        None
131    }
132
133    /// Creates a new node with the specified labels and properties.
134    fn create_node(&self) -> Result<NodeId, super::OperatorError> {
135        // Validate constraints before creating the node
136        if let Some(ref validator) = self.validator {
137            validator.validate_node_labels_allowed(&self.config.labels)?;
138
139            let all_props: Vec<(String, Value)> = self
140                .config
141                .match_properties
142                .iter()
143                .chain(self.config.on_create_properties.iter())
144                .map(|(k, v)| (k.clone(), v.clone()))
145                .collect();
146            for (name, value) in &all_props {
147                validator.validate_node_property(&self.config.labels, name, value)?;
148                validator.check_unique_node_property(&self.config.labels, name, value)?;
149            }
150            validator.validate_node_complete(&self.config.labels, &all_props)?;
151        }
152
153        let mut all_props: Vec<(PropertyKey, Value)> = self
154            .config
155            .match_properties
156            .iter()
157            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
158            .collect();
159
160        for (k, v) in &self.config.on_create_properties {
161            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
162                existing.1 = v.clone();
163            } else {
164                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
165            }
166        }
167
168        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
169        Ok(self.store.create_node_with_props(&labels, &all_props))
170    }
171
172    /// Finds or creates a matching node, applying ON MATCH/ON CREATE as appropriate.
173    fn merge_node(&self) -> Result<NodeId, super::OperatorError> {
174        if let Some(existing_id) = self.find_matching_node() {
175            self.apply_on_match(existing_id);
176            Ok(existing_id)
177        } else {
178            self.create_node()
179        }
180    }
181
182    /// Applies ON MATCH properties to an existing node.
183    fn apply_on_match(&self, node_id: NodeId) {
184        for (key, value) in &self.config.on_match_properties {
185            if let Some(tid) = self.transaction_id {
186                self.store
187                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
188            } else {
189                self.store
190                    .set_node_property(node_id, key.as_str(), value.clone());
191            }
192        }
193    }
194}
195
196impl Operator for MergeOperator {
197    fn next(&mut self) -> OperatorResult {
198        // When we have an input operator, pass through input rows with the
199        // merged node ID appended (used for chained inline MERGE patterns).
200        if let Some(ref mut input) = self.input {
201            if let Some(chunk) = input.next()? {
202                // Merge the node (once, same node for all input rows)
203                let node_id = self.merge_node()?;
204
205                let mut builder =
206                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
207
208                for row in chunk.selected_indices() {
209                    // Copy input columns to output
210                    for col_idx in 0..chunk.column_count() {
211                        if let (Some(src), Some(dst)) =
212                            (chunk.column(col_idx), builder.column_mut(col_idx))
213                        {
214                            if let Some(val) = src.get_value(row) {
215                                dst.push_value(val);
216                            } else {
217                                dst.push_value(Value::Null);
218                            }
219                        }
220                    }
221
222                    // Append the merged node ID
223                    if let Some(dst) = builder.column_mut(self.config.output_column) {
224                        dst.push_node_id(node_id);
225                    }
226
227                    builder.advance_row();
228                }
229
230                return Ok(Some(builder.finish()));
231            }
232            return Ok(None);
233        }
234
235        // Standalone mode (no input operator)
236        if self.executed {
237            return Ok(None);
238        }
239        self.executed = true;
240
241        let node_id = self.merge_node()?;
242
243        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
244        if let Some(dst) = builder.column_mut(self.config.output_column) {
245            dst.push_node_id(node_id);
246        }
247        builder.advance_row();
248
249        Ok(Some(builder.finish()))
250    }
251
252    fn reset(&mut self) {
253        self.executed = false;
254        if let Some(ref mut input) = self.input {
255            input.reset();
256        }
257    }
258
259    fn name(&self) -> &'static str {
260        "Merge"
261    }
262}
263
264/// Configuration for a relationship merge operation.
265pub struct MergeRelationshipConfig {
266    /// Column index for the source node ID in the input.
267    pub source_column: usize,
268    /// Column index for the target node ID in the input.
269    pub target_column: usize,
270    /// Relationship type to match/create.
271    pub edge_type: String,
272    /// Properties that must match (also used for creation).
273    pub match_properties: Vec<(String, Value)>,
274    /// Properties to set on CREATE.
275    pub on_create_properties: Vec<(String, Value)>,
276    /// Properties to set on MATCH.
277    pub on_match_properties: Vec<(String, Value)>,
278    /// Output schema (input columns + edge column).
279    pub output_schema: Vec<LogicalType>,
280    /// Column index for the edge variable in the output.
281    pub edge_output_column: usize,
282}
283
284/// Merge operator for relationship patterns.
285///
286/// Takes input rows containing source and target node IDs, then for each row:
287/// 1. Searches for an existing relationship matching the type and properties
288/// 2. If found, applies ON MATCH properties and returns the existing edge
289/// 3. If not found, creates a new relationship and applies ON CREATE properties
290pub struct MergeRelationshipOperator {
291    /// The graph store.
292    store: Arc<dyn GraphStoreMut>,
293    /// Input operator providing rows with source/target node columns.
294    input: Box<dyn Operator>,
295    /// Merge configuration.
296    config: MergeRelationshipConfig,
297    /// Epoch for MVCC versioning.
298    viewing_epoch: Option<EpochId>,
299    /// Transaction ID for undo log tracking.
300    transaction_id: Option<TransactionId>,
301    /// Optional constraint validator for schema enforcement.
302    validator: Option<Arc<dyn ConstraintValidator>>,
303}
304
305impl MergeRelationshipOperator {
306    /// Creates a new merge relationship operator.
307    pub fn new(
308        store: Arc<dyn GraphStoreMut>,
309        input: Box<dyn Operator>,
310        config: MergeRelationshipConfig,
311    ) -> Self {
312        Self {
313            store,
314            input,
315            config,
316            viewing_epoch: None,
317            transaction_id: None,
318            validator: None,
319        }
320    }
321
322    /// Sets the transaction context for versioned mutations.
323    pub fn with_transaction_context(
324        mut self,
325        epoch: EpochId,
326        transaction_id: Option<TransactionId>,
327    ) -> Self {
328        self.viewing_epoch = Some(epoch);
329        self.transaction_id = transaction_id;
330        self
331    }
332
333    /// Sets the constraint validator for schema enforcement.
334    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
335        self.validator = Some(validator);
336        self
337    }
338
339    /// Tries to find a matching relationship between source and target.
340    fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
341        use crate::graph::Direction;
342
343        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
344            if target != dst {
345                continue;
346            }
347
348            if let Some(edge) = self.store.get_edge(edge_id) {
349                if edge.edge_type.as_str() != self.config.edge_type {
350                    continue;
351                }
352
353                let has_all_props =
354                    self.config.match_properties.iter().all(|(key, expected)| {
355                        edge.get_property(key).is_some_and(|v| v == expected)
356                    });
357
358                if has_all_props {
359                    return Some(edge_id);
360                }
361            }
362        }
363
364        None
365    }
366
367    /// Creates a new edge with the match properties and on_create properties.
368    fn create_edge(&self, src: NodeId, dst: NodeId) -> Result<EdgeId, super::OperatorError> {
369        // Validate constraints before creating the edge
370        if let Some(ref validator) = self.validator {
371            validator.validate_edge_type_allowed(&self.config.edge_type)?;
372
373            let all_props: Vec<(String, Value)> = self
374                .config
375                .match_properties
376                .iter()
377                .chain(self.config.on_create_properties.iter())
378                .map(|(k, v)| (k.clone(), v.clone()))
379                .collect();
380            for (name, value) in &all_props {
381                validator.validate_edge_property(&self.config.edge_type, name, value)?;
382            }
383            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
384        }
385
386        let mut all_props: Vec<(PropertyKey, Value)> = self
387            .config
388            .match_properties
389            .iter()
390            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
391            .collect();
392
393        for (k, v) in &self.config.on_create_properties {
394            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
395                existing.1 = v.clone();
396            } else {
397                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
398            }
399        }
400
401        Ok(self
402            .store
403            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
404    }
405
406    /// Applies ON MATCH properties to an existing edge.
407    fn apply_on_match(&self, edge_id: EdgeId) {
408        for (key, value) in &self.config.on_match_properties {
409            if let Some(tid) = self.transaction_id {
410                self.store
411                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
412            } else {
413                self.store
414                    .set_edge_property(edge_id, key.as_str(), value.clone());
415            }
416        }
417    }
418}
419
420impl Operator for MergeRelationshipOperator {
421    fn next(&mut self) -> OperatorResult {
422        use super::OperatorError;
423
424        if let Some(chunk) = self.input.next()? {
425            let mut builder =
426                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
427
428            for row in chunk.selected_indices() {
429                let src_val = chunk
430                    .column(self.config.source_column)
431                    .and_then(|c| c.get_node_id(row))
432                    .ok_or_else(|| OperatorError::TypeMismatch {
433                        expected: "NodeId (source)".to_string(),
434                        found: "None".to_string(),
435                    })?;
436
437                let dst_val = chunk
438                    .column(self.config.target_column)
439                    .and_then(|c| c.get_node_id(row))
440                    .ok_or_else(|| OperatorError::TypeMismatch {
441                        expected: "NodeId (target)".to_string(),
442                        found: "None".to_string(),
443                    })?;
444
445                let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
446                    self.apply_on_match(existing);
447                    existing
448                } else {
449                    self.create_edge(src_val, dst_val)?
450                };
451
452                // Copy input columns to output, then add the edge column
453                for col_idx in 0..self.config.output_schema.len() {
454                    if col_idx == self.config.edge_output_column {
455                        if let Some(dst_col) = builder.column_mut(col_idx) {
456                            dst_col.push_edge_id(edge_id);
457                        }
458                    } else if let (Some(src_col), Some(dst_col)) =
459                        (chunk.column(col_idx), builder.column_mut(col_idx))
460                        && let Some(val) = src_col.get_value(row)
461                    {
462                        dst_col.push_value(val);
463                    }
464                }
465
466                builder.advance_row();
467            }
468
469            return Ok(Some(builder.finish()));
470        }
471
472        Ok(None)
473    }
474
475    fn reset(&mut self) {
476        self.input.reset();
477    }
478
479    fn name(&self) -> &'static str {
480        "MergeRelationship"
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::graph::lpg::LpgStore;
488
489    #[test]
490    fn test_merge_creates_new_node() {
491        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
492
493        // MERGE should create a new node since none exists
494        let mut merge = MergeOperator::new(
495            Arc::clone(&store),
496            None,
497            MergeConfig {
498                variable: "n".to_string(),
499                labels: vec!["Person".to_string()],
500                match_properties: vec![("name".to_string(), Value::String("Alix".into()))],
501                on_create_properties: vec![],
502                on_match_properties: vec![],
503                output_schema: vec![LogicalType::Node],
504                output_column: 0,
505            },
506        );
507
508        let result = merge.next().unwrap();
509        assert!(result.is_some());
510
511        // Verify node was created
512        let nodes = store.nodes_by_label("Person");
513        assert_eq!(nodes.len(), 1);
514
515        let node = store.get_node(nodes[0]).unwrap();
516        assert!(node.has_label("Person"));
517        assert_eq!(
518            node.properties.get(&PropertyKey::new("name")),
519            Some(&Value::String("Alix".into()))
520        );
521    }
522
523    #[test]
524    fn test_merge_matches_existing_node() {
525        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
526
527        // Create an existing node
528        store.create_node_with_props(
529            &["Person"],
530            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
531        );
532
533        // MERGE should find the existing node
534        let mut merge = MergeOperator::new(
535            Arc::clone(&store),
536            None,
537            MergeConfig {
538                variable: "n".to_string(),
539                labels: vec!["Person".to_string()],
540                match_properties: vec![("name".to_string(), Value::String("Gus".into()))],
541                on_create_properties: vec![],
542                on_match_properties: vec![],
543                output_schema: vec![LogicalType::Node],
544                output_column: 0,
545            },
546        );
547
548        let result = merge.next().unwrap();
549        assert!(result.is_some());
550
551        // Verify only one node exists (no new node created)
552        let nodes = store.nodes_by_label("Person");
553        assert_eq!(nodes.len(), 1);
554    }
555
556    #[test]
557    fn test_merge_with_on_create() {
558        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
559
560        // MERGE with ON CREATE SET
561        let mut merge = MergeOperator::new(
562            Arc::clone(&store),
563            None,
564            MergeConfig {
565                variable: "n".to_string(),
566                labels: vec!["Person".to_string()],
567                match_properties: vec![("name".to_string(), Value::String("Vincent".into()))],
568                on_create_properties: vec![("created".to_string(), Value::Bool(true))],
569                on_match_properties: vec![],
570                output_schema: vec![LogicalType::Node],
571                output_column: 0,
572            },
573        );
574
575        let _ = merge.next().unwrap();
576
577        // Verify node has both match properties and on_create properties
578        let nodes = store.nodes_by_label("Person");
579        let node = store.get_node(nodes[0]).unwrap();
580        assert_eq!(
581            node.properties.get(&PropertyKey::new("name")),
582            Some(&Value::String("Vincent".into()))
583        );
584        assert_eq!(
585            node.properties.get(&PropertyKey::new("created")),
586            Some(&Value::Bool(true))
587        );
588    }
589
590    #[test]
591    fn test_merge_with_on_match() {
592        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
593
594        // Create an existing node
595        let node_id = store.create_node_with_props(
596            &["Person"],
597            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
598        );
599
600        // MERGE with ON MATCH SET
601        let mut merge = MergeOperator::new(
602            Arc::clone(&store),
603            None,
604            MergeConfig {
605                variable: "n".to_string(),
606                labels: vec!["Person".to_string()],
607                match_properties: vec![("name".to_string(), Value::String("Jules".into()))],
608                on_create_properties: vec![],
609                on_match_properties: vec![("updated".to_string(), Value::Bool(true))],
610                output_schema: vec![LogicalType::Node],
611                output_column: 0,
612            },
613        );
614
615        let _ = merge.next().unwrap();
616
617        // Verify node has the on_match property added
618        let node = store.get_node(node_id).unwrap();
619        assert_eq!(
620            node.properties.get(&PropertyKey::new("updated")),
621            Some(&Value::Bool(true))
622        );
623    }
624}