Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, Value)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, Value)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, Value)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32}
33
34/// Merge operator for MERGE clause.
35///
36/// Tries to match a node with the given labels and properties.
37/// If found, returns the existing node. If not found, creates a new node.
38///
39/// When an input operator is provided (chained MERGE), input rows are
40/// passed through with the merged node ID appended as an additional column.
41pub struct MergeOperator {
42    /// The graph store.
43    store: Arc<dyn GraphStoreMut>,
44    /// Optional input operator (for chained MERGE patterns).
45    input: Option<Box<dyn Operator>>,
46    /// Merge configuration.
47    config: MergeConfig,
48    /// Whether we've already executed (standalone mode only).
49    executed: bool,
50    /// Epoch for MVCC versioning.
51    viewing_epoch: Option<EpochId>,
52    /// Transaction ID for undo log tracking.
53    transaction_id: Option<TransactionId>,
54}
55
56impl MergeOperator {
57    /// Creates a new merge operator.
58    pub fn new(
59        store: Arc<dyn GraphStoreMut>,
60        input: Option<Box<dyn Operator>>,
61        config: MergeConfig,
62    ) -> Self {
63        Self {
64            store,
65            input,
66            config,
67            executed: false,
68            viewing_epoch: None,
69            transaction_id: None,
70        }
71    }
72
73    /// Returns the variable name for the merged node.
74    #[must_use]
75    pub fn variable(&self) -> &str {
76        &self.config.variable
77    }
78
79    /// Sets the transaction context for versioned mutations.
80    pub fn with_transaction_context(
81        mut self,
82        epoch: EpochId,
83        transaction_id: Option<TransactionId>,
84    ) -> Self {
85        self.viewing_epoch = Some(epoch);
86        self.transaction_id = transaction_id;
87        self
88    }
89
90    /// Tries to find a matching node.
91    fn find_matching_node(&self) -> Option<NodeId> {
92        let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
93            self.store.nodes_by_label(first_label)
94        } else {
95            self.store.node_ids()
96        };
97
98        for node_id in candidates {
99            if let Some(node) = self.store.get_node(node_id) {
100                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
101                if !has_all_labels {
102                    continue;
103                }
104
105                let has_all_props =
106                    self.config
107                        .match_properties
108                        .iter()
109                        .all(|(key, expected_value)| {
110                            node.properties
111                                .get(&PropertyKey::new(key.as_str()))
112                                .is_some_and(|v| v == expected_value)
113                        });
114
115                if has_all_props {
116                    return Some(node_id);
117                }
118            }
119        }
120
121        None
122    }
123
124    /// Creates a new node with the specified labels and properties.
125    fn create_node(&self) -> NodeId {
126        let mut all_props: Vec<(PropertyKey, Value)> = self
127            .config
128            .match_properties
129            .iter()
130            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
131            .collect();
132
133        for (k, v) in &self.config.on_create_properties {
134            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
135                existing.1 = v.clone();
136            } else {
137                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
138            }
139        }
140
141        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
142        self.store.create_node_with_props(&labels, &all_props)
143    }
144
145    /// Finds or creates a matching node, applying ON MATCH/ON CREATE as appropriate.
146    fn merge_node(&self) -> NodeId {
147        if let Some(existing_id) = self.find_matching_node() {
148            self.apply_on_match(existing_id);
149            existing_id
150        } else {
151            self.create_node()
152        }
153    }
154
155    /// Applies ON MATCH properties to an existing node.
156    fn apply_on_match(&self, node_id: NodeId) {
157        for (key, value) in &self.config.on_match_properties {
158            if let Some(tid) = self.transaction_id {
159                self.store
160                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
161            } else {
162                self.store
163                    .set_node_property(node_id, key.as_str(), value.clone());
164            }
165        }
166    }
167}
168
169impl Operator for MergeOperator {
170    fn next(&mut self) -> OperatorResult {
171        // When we have an input operator, pass through input rows with the
172        // merged node ID appended (used for chained inline MERGE patterns).
173        if let Some(ref mut input) = self.input {
174            if let Some(chunk) = input.next()? {
175                // Merge the node (once, same node for all input rows)
176                let node_id = self.merge_node();
177
178                let mut builder =
179                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
180
181                for row in chunk.selected_indices() {
182                    // Copy input columns to output
183                    for col_idx in 0..chunk.column_count() {
184                        if let (Some(src), Some(dst)) =
185                            (chunk.column(col_idx), builder.column_mut(col_idx))
186                        {
187                            if let Some(val) = src.get_value(row) {
188                                dst.push_value(val);
189                            } else {
190                                dst.push_value(Value::Null);
191                            }
192                        }
193                    }
194
195                    // Append the merged node ID
196                    if let Some(dst) = builder.column_mut(self.config.output_column) {
197                        dst.push_node_id(node_id);
198                    }
199
200                    builder.advance_row();
201                }
202
203                return Ok(Some(builder.finish()));
204            }
205            return Ok(None);
206        }
207
208        // Standalone mode (no input operator)
209        if self.executed {
210            return Ok(None);
211        }
212        self.executed = true;
213
214        let node_id = self.merge_node();
215
216        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
217        if let Some(dst) = builder.column_mut(self.config.output_column) {
218            dst.push_node_id(node_id);
219        }
220        builder.advance_row();
221
222        Ok(Some(builder.finish()))
223    }
224
225    fn reset(&mut self) {
226        self.executed = false;
227        if let Some(ref mut input) = self.input {
228            input.reset();
229        }
230    }
231
232    fn name(&self) -> &'static str {
233        "Merge"
234    }
235}
236
237/// Configuration for a relationship merge operation.
238pub struct MergeRelationshipConfig {
239    /// Column index for the source node ID in the input.
240    pub source_column: usize,
241    /// Column index for the target node ID in the input.
242    pub target_column: usize,
243    /// Relationship type to match/create.
244    pub edge_type: String,
245    /// Properties that must match (also used for creation).
246    pub match_properties: Vec<(String, Value)>,
247    /// Properties to set on CREATE.
248    pub on_create_properties: Vec<(String, Value)>,
249    /// Properties to set on MATCH.
250    pub on_match_properties: Vec<(String, Value)>,
251    /// Output schema (input columns + edge column).
252    pub output_schema: Vec<LogicalType>,
253    /// Column index for the edge variable in the output.
254    pub edge_output_column: usize,
255}
256
257/// Merge operator for relationship patterns.
258///
259/// Takes input rows containing source and target node IDs, then for each row:
260/// 1. Searches for an existing relationship matching the type and properties
261/// 2. If found, applies ON MATCH properties and returns the existing edge
262/// 3. If not found, creates a new relationship and applies ON CREATE properties
263pub struct MergeRelationshipOperator {
264    /// The graph store.
265    store: Arc<dyn GraphStoreMut>,
266    /// Input operator providing rows with source/target node columns.
267    input: Box<dyn Operator>,
268    /// Merge configuration.
269    config: MergeRelationshipConfig,
270    /// Epoch for MVCC versioning.
271    viewing_epoch: Option<EpochId>,
272    /// Transaction ID for undo log tracking.
273    transaction_id: Option<TransactionId>,
274}
275
276impl MergeRelationshipOperator {
277    /// Creates a new merge relationship operator.
278    pub fn new(
279        store: Arc<dyn GraphStoreMut>,
280        input: Box<dyn Operator>,
281        config: MergeRelationshipConfig,
282    ) -> Self {
283        Self {
284            store,
285            input,
286            config,
287            viewing_epoch: None,
288            transaction_id: None,
289        }
290    }
291
292    /// Sets the transaction context for versioned mutations.
293    pub fn with_transaction_context(
294        mut self,
295        epoch: EpochId,
296        transaction_id: Option<TransactionId>,
297    ) -> Self {
298        self.viewing_epoch = Some(epoch);
299        self.transaction_id = transaction_id;
300        self
301    }
302
303    /// Tries to find a matching relationship between source and target.
304    fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
305        use crate::graph::Direction;
306
307        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
308            if target != dst {
309                continue;
310            }
311
312            if let Some(edge) = self.store.get_edge(edge_id) {
313                if edge.edge_type.as_str() != self.config.edge_type {
314                    continue;
315                }
316
317                let has_all_props =
318                    self.config.match_properties.iter().all(|(key, expected)| {
319                        edge.get_property(key).is_some_and(|v| v == expected)
320                    });
321
322                if has_all_props {
323                    return Some(edge_id);
324                }
325            }
326        }
327
328        None
329    }
330
331    /// Creates a new edge with the match properties and on_create properties.
332    fn create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
333        let mut all_props: Vec<(PropertyKey, Value)> = self
334            .config
335            .match_properties
336            .iter()
337            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
338            .collect();
339
340        for (k, v) in &self.config.on_create_properties {
341            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
342                existing.1 = v.clone();
343            } else {
344                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
345            }
346        }
347
348        self.store
349            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props)
350    }
351
352    /// Applies ON MATCH properties to an existing edge.
353    fn apply_on_match(&self, edge_id: EdgeId) {
354        for (key, value) in &self.config.on_match_properties {
355            if let Some(tid) = self.transaction_id {
356                self.store
357                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
358            } else {
359                self.store
360                    .set_edge_property(edge_id, key.as_str(), value.clone());
361            }
362        }
363    }
364}
365
366impl Operator for MergeRelationshipOperator {
367    fn next(&mut self) -> OperatorResult {
368        use super::OperatorError;
369
370        if let Some(chunk) = self.input.next()? {
371            let mut builder =
372                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
373
374            for row in chunk.selected_indices() {
375                let src_val = chunk
376                    .column(self.config.source_column)
377                    .and_then(|c| c.get_node_id(row))
378                    .ok_or_else(|| OperatorError::TypeMismatch {
379                        expected: "NodeId (source)".to_string(),
380                        found: "None".to_string(),
381                    })?;
382
383                let dst_val = chunk
384                    .column(self.config.target_column)
385                    .and_then(|c| c.get_node_id(row))
386                    .ok_or_else(|| OperatorError::TypeMismatch {
387                        expected: "NodeId (target)".to_string(),
388                        found: "None".to_string(),
389                    })?;
390
391                let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
392                    self.apply_on_match(existing);
393                    existing
394                } else {
395                    self.create_edge(src_val, dst_val)
396                };
397
398                // Copy input columns to output, then add the edge column
399                for col_idx in 0..self.config.output_schema.len() {
400                    if col_idx == self.config.edge_output_column {
401                        if let Some(dst_col) = builder.column_mut(col_idx) {
402                            dst_col.push_edge_id(edge_id);
403                        }
404                    } else if let (Some(src_col), Some(dst_col)) =
405                        (chunk.column(col_idx), builder.column_mut(col_idx))
406                        && let Some(val) = src_col.get_value(row)
407                    {
408                        dst_col.push_value(val);
409                    }
410                }
411
412                builder.advance_row();
413            }
414
415            return Ok(Some(builder.finish()));
416        }
417
418        Ok(None)
419    }
420
421    fn reset(&mut self) {
422        self.input.reset();
423    }
424
425    fn name(&self) -> &'static str {
426        "MergeRelationship"
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::graph::lpg::LpgStore;
434
435    #[test]
436    fn test_merge_creates_new_node() {
437        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
438
439        // MERGE should create a new node since none exists
440        let mut merge = MergeOperator::new(
441            Arc::clone(&store),
442            None,
443            MergeConfig {
444                variable: "n".to_string(),
445                labels: vec!["Person".to_string()],
446                match_properties: vec![("name".to_string(), Value::String("Alix".into()))],
447                on_create_properties: vec![],
448                on_match_properties: vec![],
449                output_schema: vec![LogicalType::Node],
450                output_column: 0,
451            },
452        );
453
454        let result = merge.next().unwrap();
455        assert!(result.is_some());
456
457        // Verify node was created
458        let nodes = store.nodes_by_label("Person");
459        assert_eq!(nodes.len(), 1);
460
461        let node = store.get_node(nodes[0]).unwrap();
462        assert!(node.has_label("Person"));
463        assert_eq!(
464            node.properties.get(&PropertyKey::new("name")),
465            Some(&Value::String("Alix".into()))
466        );
467    }
468
469    #[test]
470    fn test_merge_matches_existing_node() {
471        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
472
473        // Create an existing node
474        store.create_node_with_props(
475            &["Person"],
476            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
477        );
478
479        // MERGE should find the existing node
480        let mut merge = MergeOperator::new(
481            Arc::clone(&store),
482            None,
483            MergeConfig {
484                variable: "n".to_string(),
485                labels: vec!["Person".to_string()],
486                match_properties: vec![("name".to_string(), Value::String("Gus".into()))],
487                on_create_properties: vec![],
488                on_match_properties: vec![],
489                output_schema: vec![LogicalType::Node],
490                output_column: 0,
491            },
492        );
493
494        let result = merge.next().unwrap();
495        assert!(result.is_some());
496
497        // Verify only one node exists (no new node created)
498        let nodes = store.nodes_by_label("Person");
499        assert_eq!(nodes.len(), 1);
500    }
501
502    #[test]
503    fn test_merge_with_on_create() {
504        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
505
506        // MERGE with ON CREATE SET
507        let mut merge = MergeOperator::new(
508            Arc::clone(&store),
509            None,
510            MergeConfig {
511                variable: "n".to_string(),
512                labels: vec!["Person".to_string()],
513                match_properties: vec![("name".to_string(), Value::String("Vincent".into()))],
514                on_create_properties: vec![("created".to_string(), Value::Bool(true))],
515                on_match_properties: vec![],
516                output_schema: vec![LogicalType::Node],
517                output_column: 0,
518            },
519        );
520
521        let _ = merge.next().unwrap();
522
523        // Verify node has both match properties and on_create properties
524        let nodes = store.nodes_by_label("Person");
525        let node = store.get_node(nodes[0]).unwrap();
526        assert_eq!(
527            node.properties.get(&PropertyKey::new("name")),
528            Some(&Value::String("Vincent".into()))
529        );
530        assert_eq!(
531            node.properties.get(&PropertyKey::new("created")),
532            Some(&Value::Bool(true))
533        );
534    }
535
536    #[test]
537    fn test_merge_with_on_match() {
538        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
539
540        // Create an existing node
541        let node_id = store.create_node_with_props(
542            &["Person"],
543            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
544        );
545
546        // MERGE with ON MATCH SET
547        let mut merge = MergeOperator::new(
548            Arc::clone(&store),
549            None,
550            MergeConfig {
551                variable: "n".to_string(),
552                labels: vec!["Person".to_string()],
553                match_properties: vec![("name".to_string(), Value::String("Jules".into()))],
554                on_create_properties: vec![],
555                on_match_properties: vec![("updated".to_string(), Value::Bool(true))],
556                output_schema: vec![LogicalType::Node],
557                output_column: 0,
558            },
559        );
560
561        let _ = merge.next().unwrap();
562
563        // Verify node has the on_match property added
564        let node = store.get_node(node_id).unwrap();
565        assert_eq!(
566            node.properties.get(&PropertyKey::new("updated")),
567            Some(&Value::Bool(true))
568        );
569    }
570}