Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{EdgeId, LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14/// Merge operator for MERGE clause.
15///
16/// Tries to match a node with the given labels and properties.
17/// If found, returns the existing node. If not found, creates a new node.
18pub struct MergeOperator {
19    /// The graph store.
20    store: Arc<dyn GraphStoreMut>,
21    /// Variable name for the merged node.
22    variable: String,
23    /// Labels to match/create.
24    labels: Vec<String>,
25    /// Properties that must match (also used for creation).
26    match_properties: Vec<(String, Value)>,
27    /// Properties to set on CREATE.
28    on_create_properties: Vec<(String, Value)>,
29    /// Properties to set on MATCH.
30    on_match_properties: Vec<(String, Value)>,
31    /// Whether we've already executed.
32    executed: bool,
33}
34
35impl MergeOperator {
36    /// Creates a new merge operator.
37    pub fn new(
38        store: Arc<dyn GraphStoreMut>,
39        variable: String,
40        labels: Vec<String>,
41        match_properties: Vec<(String, Value)>,
42        on_create_properties: Vec<(String, Value)>,
43        on_match_properties: Vec<(String, Value)>,
44    ) -> Self {
45        Self {
46            store,
47            variable,
48            labels,
49            match_properties,
50            on_create_properties,
51            on_match_properties,
52            executed: false,
53        }
54    }
55
56    /// Returns the variable name for the merged node.
57    #[must_use]
58    pub fn variable(&self) -> &str {
59        &self.variable
60    }
61
62    /// Tries to find a matching node.
63    fn find_matching_node(&self) -> Option<NodeId> {
64        // Get all nodes with the first label (or all nodes if no labels)
65        let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66            self.store.nodes_by_label(first_label)
67        } else {
68            self.store.node_ids()
69        };
70
71        // Filter by all labels and properties
72        for node_id in candidates {
73            if let Some(node) = self.store.get_node(node_id) {
74                // Check all labels
75                let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76                if !has_all_labels {
77                    continue;
78                }
79
80                // Check all match properties
81                let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82                    node.properties
83                        .get(&PropertyKey::new(key.as_str()))
84                        .is_some_and(|v| v == expected_value)
85                });
86
87                if has_all_props {
88                    return Some(node_id);
89                }
90            }
91        }
92
93        None
94    }
95
96    /// Creates a new node with the specified labels and properties.
97    fn create_node(&self) -> NodeId {
98        // Combine match properties with on_create properties
99        let mut all_props: Vec<(PropertyKey, Value)> = self
100            .match_properties
101            .iter()
102            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103            .collect();
104
105        // Add on_create properties (may override match properties)
106        for (k, v) in &self.on_create_properties {
107            // Check if property already exists, if so update it
108            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109                existing.1 = v.clone();
110            } else {
111                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112            }
113        }
114
115        let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116        self.store.create_node_with_props(&labels, &all_props)
117    }
118
119    /// Applies ON MATCH properties to an existing node.
120    fn apply_on_match(&self, node_id: NodeId) {
121        for (key, value) in &self.on_match_properties {
122            self.store
123                .set_node_property(node_id, key.as_str(), value.clone());
124        }
125    }
126}
127
128impl Operator for MergeOperator {
129    fn next(&mut self) -> OperatorResult {
130        if self.executed {
131            return Ok(None);
132        }
133        self.executed = true;
134
135        // Try to find matching node
136        let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137            // Node exists - apply ON MATCH properties
138            self.apply_on_match(existing_id);
139            (existing_id, false)
140        } else {
141            // Node doesn't exist - create it
142            let new_id = self.create_node();
143            (new_id, true)
144        };
145
146        // Build output chunk with the node ID
147        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148        builder
149            .column_mut(0)
150            .expect("column 0 exists: builder created with single-column schema")
151            .push_node_id(node_id);
152        builder.advance_row();
153
154        // Log for debugging (in real code, this would be removed)
155        let _ = was_created; // Suppress unused variable warning
156
157        Ok(Some(builder.finish()))
158    }
159
160    fn reset(&mut self) {
161        self.executed = false;
162    }
163
164    fn name(&self) -> &'static str {
165        "Merge"
166    }
167}
168
169/// Configuration for a relationship merge operation.
170pub struct MergeRelationshipConfig {
171    /// Column index for the source node ID in the input.
172    pub source_column: usize,
173    /// Column index for the target node ID in the input.
174    pub target_column: usize,
175    /// Relationship type to match/create.
176    pub edge_type: String,
177    /// Properties that must match (also used for creation).
178    pub match_properties: Vec<(String, Value)>,
179    /// Properties to set on CREATE.
180    pub on_create_properties: Vec<(String, Value)>,
181    /// Properties to set on MATCH.
182    pub on_match_properties: Vec<(String, Value)>,
183    /// Output schema (input columns + edge column).
184    pub output_schema: Vec<LogicalType>,
185    /// Column index for the edge variable in the output.
186    pub edge_output_column: usize,
187}
188
189/// Merge operator for relationship patterns.
190///
191/// Takes input rows containing source and target node IDs, then for each row:
192/// 1. Searches for an existing relationship matching the type and properties
193/// 2. If found, applies ON MATCH properties and returns the existing edge
194/// 3. If not found, creates a new relationship and applies ON CREATE properties
195pub struct MergeRelationshipOperator {
196    /// The graph store.
197    store: Arc<dyn GraphStoreMut>,
198    /// Input operator providing rows with source/target node columns.
199    input: Box<dyn Operator>,
200    /// Merge configuration.
201    config: MergeRelationshipConfig,
202}
203
204impl MergeRelationshipOperator {
205    /// Creates a new merge relationship operator.
206    pub fn new(
207        store: Arc<dyn GraphStoreMut>,
208        input: Box<dyn Operator>,
209        config: MergeRelationshipConfig,
210    ) -> Self {
211        Self {
212            store,
213            input,
214            config,
215        }
216    }
217
218    /// Tries to find a matching relationship between source and target.
219    fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
220        use crate::graph::Direction;
221
222        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
223            if target != dst {
224                continue;
225            }
226
227            if let Some(edge) = self.store.get_edge(edge_id) {
228                if edge.edge_type.as_str() != self.config.edge_type {
229                    continue;
230                }
231
232                let has_all_props =
233                    self.config.match_properties.iter().all(|(key, expected)| {
234                        edge.get_property(key).is_some_and(|v| v == expected)
235                    });
236
237                if has_all_props {
238                    return Some(edge_id);
239                }
240            }
241        }
242
243        None
244    }
245
246    /// Creates a new edge with the match properties and on_create properties.
247    fn create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
248        let mut all_props: Vec<(PropertyKey, Value)> = self
249            .config
250            .match_properties
251            .iter()
252            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
253            .collect();
254
255        for (k, v) in &self.config.on_create_properties {
256            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
257                existing.1 = v.clone();
258            } else {
259                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
260            }
261        }
262
263        self.store
264            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props)
265    }
266
267    /// Applies ON MATCH properties to an existing edge.
268    fn apply_on_match(&self, edge_id: EdgeId) {
269        for (key, value) in &self.config.on_match_properties {
270            self.store
271                .set_edge_property(edge_id, key.as_str(), value.clone());
272        }
273    }
274}
275
276impl Operator for MergeRelationshipOperator {
277    fn next(&mut self) -> OperatorResult {
278        use super::OperatorError;
279
280        if let Some(chunk) = self.input.next()? {
281            let mut builder =
282                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
283
284            for row in chunk.selected_indices() {
285                let src_val = chunk
286                    .column(self.config.source_column)
287                    .and_then(|c| c.get_node_id(row))
288                    .ok_or_else(|| OperatorError::TypeMismatch {
289                        expected: "NodeId (source)".to_string(),
290                        found: "None".to_string(),
291                    })?;
292
293                let dst_val = chunk
294                    .column(self.config.target_column)
295                    .and_then(|c| c.get_node_id(row))
296                    .ok_or_else(|| OperatorError::TypeMismatch {
297                        expected: "NodeId (target)".to_string(),
298                        found: "None".to_string(),
299                    })?;
300
301                let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
302                    self.apply_on_match(existing);
303                    existing
304                } else {
305                    self.create_edge(src_val, dst_val)
306                };
307
308                // Copy input columns to output, then add the edge column
309                for col_idx in 0..self.config.output_schema.len() {
310                    if col_idx == self.config.edge_output_column {
311                        if let Some(dst_col) = builder.column_mut(col_idx) {
312                            dst_col.push_edge_id(edge_id);
313                        }
314                    } else if let (Some(src_col), Some(dst_col)) =
315                        (chunk.column(col_idx), builder.column_mut(col_idx))
316                        && let Some(val) = src_col.get_value(row)
317                    {
318                        dst_col.push_value(val);
319                    }
320                }
321
322                builder.advance_row();
323            }
324
325            return Ok(Some(builder.finish()));
326        }
327
328        Ok(None)
329    }
330
331    fn reset(&mut self) {
332        self.input.reset();
333    }
334
335    fn name(&self) -> &'static str {
336        "MergeRelationship"
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    use crate::graph::lpg::LpgStore;
344
345    #[test]
346    fn test_merge_creates_new_node() {
347        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
348
349        // MERGE should create a new node since none exists
350        let mut merge = MergeOperator::new(
351            Arc::clone(&store),
352            "n".to_string(),
353            vec!["Person".to_string()],
354            vec![("name".to_string(), Value::String("Alix".into()))],
355            vec![], // no on_create
356            vec![], // no on_match
357        );
358
359        let result = merge.next().unwrap();
360        assert!(result.is_some());
361
362        // Verify node was created
363        let nodes = store.nodes_by_label("Person");
364        assert_eq!(nodes.len(), 1);
365
366        let node = store.get_node(nodes[0]).unwrap();
367        assert!(node.has_label("Person"));
368        assert_eq!(
369            node.properties.get(&PropertyKey::new("name")),
370            Some(&Value::String("Alix".into()))
371        );
372    }
373
374    #[test]
375    fn test_merge_matches_existing_node() {
376        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
377
378        // Create an existing node
379        store.create_node_with_props(
380            &["Person"],
381            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
382        );
383
384        // MERGE should find the existing node
385        let mut merge = MergeOperator::new(
386            Arc::clone(&store),
387            "n".to_string(),
388            vec!["Person".to_string()],
389            vec![("name".to_string(), Value::String("Gus".into()))],
390            vec![], // no on_create
391            vec![], // no on_match
392        );
393
394        let result = merge.next().unwrap();
395        assert!(result.is_some());
396
397        // Verify only one node exists (no new node created)
398        let nodes = store.nodes_by_label("Person");
399        assert_eq!(nodes.len(), 1);
400    }
401
402    #[test]
403    fn test_merge_with_on_create() {
404        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
405
406        // MERGE with ON CREATE SET
407        let mut merge = MergeOperator::new(
408            Arc::clone(&store),
409            "n".to_string(),
410            vec!["Person".to_string()],
411            vec![("name".to_string(), Value::String("Vincent".into()))],
412            vec![("created".to_string(), Value::Bool(true))], // on_create
413            vec![],                                           // no on_match
414        );
415
416        let _ = merge.next().unwrap();
417
418        // Verify node has both match properties and on_create properties
419        let nodes = store.nodes_by_label("Person");
420        let node = store.get_node(nodes[0]).unwrap();
421        assert_eq!(
422            node.properties.get(&PropertyKey::new("name")),
423            Some(&Value::String("Vincent".into()))
424        );
425        assert_eq!(
426            node.properties.get(&PropertyKey::new("created")),
427            Some(&Value::Bool(true))
428        );
429    }
430
431    #[test]
432    fn test_merge_with_on_match() {
433        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
434
435        // Create an existing node
436        let node_id = store.create_node_with_props(
437            &["Person"],
438            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
439        );
440
441        // MERGE with ON MATCH SET
442        let mut merge = MergeOperator::new(
443            Arc::clone(&store),
444            "n".to_string(),
445            vec!["Person".to_string()],
446            vec![("name".to_string(), Value::String("Jules".into()))],
447            vec![],                                           // no on_create
448            vec![("updated".to_string(), Value::Bool(true))], // on_match
449        );
450
451        let _ = merge.next().unwrap();
452
453        // Verify node has the on_match property added
454        let node = store.get_node(node_id).unwrap();
455        assert_eq!(
456            node.properties.get(&PropertyKey::new("updated")),
457            Some(&Value::Bool(true))
458        );
459    }
460}