Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use grafeo_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14/// Merge operator for MERGE clause.
15///
16/// Tries to match a node with the given labels and properties.
17/// If found, returns the existing node. If not found, creates a new node.
18pub struct MergeOperator {
19    /// The graph store.
20    store: Arc<LpgStore>,
21    /// Variable name for the merged node.
22    variable: String,
23    /// Labels to match/create.
24    labels: Vec<String>,
25    /// Properties that must match (also used for creation).
26    match_properties: Vec<(String, Value)>,
27    /// Properties to set on CREATE.
28    on_create_properties: Vec<(String, Value)>,
29    /// Properties to set on MATCH.
30    on_match_properties: Vec<(String, Value)>,
31    /// Whether we've already executed.
32    executed: bool,
33}
34
35impl MergeOperator {
36    /// Creates a new merge operator.
37    pub fn new(
38        store: Arc<LpgStore>,
39        variable: String,
40        labels: Vec<String>,
41        match_properties: Vec<(String, Value)>,
42        on_create_properties: Vec<(String, Value)>,
43        on_match_properties: Vec<(String, Value)>,
44    ) -> Self {
45        Self {
46            store,
47            variable,
48            labels,
49            match_properties,
50            on_create_properties,
51            on_match_properties,
52            executed: false,
53        }
54    }
55
56    /// Returns the variable name for the merged node.
57    #[must_use]
58    pub fn variable(&self) -> &str {
59        &self.variable
60    }
61
62    /// Tries to find a matching node.
63    fn find_matching_node(&self) -> Option<NodeId> {
64        // Get all nodes with the first label (or all nodes if no labels)
65        let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66            self.store.nodes_by_label(first_label)
67        } else {
68            self.store.node_ids()
69        };
70
71        // Filter by all labels and properties
72        for node_id in candidates {
73            if let Some(node) = self.store.get_node(node_id) {
74                // Check all labels
75                let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76                if !has_all_labels {
77                    continue;
78                }
79
80                // Check all match properties
81                let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82                    node.properties
83                        .get(&PropertyKey::new(key.as_str()))
84                        .is_some_and(|v| v == expected_value)
85                });
86
87                if has_all_props {
88                    return Some(node_id);
89                }
90            }
91        }
92
93        None
94    }
95
96    /// Creates a new node with the specified labels and properties.
97    fn create_node(&self) -> NodeId {
98        // Combine match properties with on_create properties
99        let mut all_props: Vec<(PropertyKey, Value)> = self
100            .match_properties
101            .iter()
102            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103            .collect();
104
105        // Add on_create properties (may override match properties)
106        for (k, v) in &self.on_create_properties {
107            // Check if property already exists, if so update it
108            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109                existing.1 = v.clone();
110            } else {
111                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112            }
113        }
114
115        let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116        self.store.create_node_with_props(&labels, all_props)
117    }
118
119    /// Applies ON MATCH properties to an existing node.
120    fn apply_on_match(&self, node_id: NodeId) {
121        for (key, value) in &self.on_match_properties {
122            self.store
123                .set_node_property(node_id, key.as_str(), value.clone());
124        }
125    }
126}
127
128impl Operator for MergeOperator {
129    fn next(&mut self) -> OperatorResult {
130        if self.executed {
131            return Ok(None);
132        }
133        self.executed = true;
134
135        // Try to find matching node
136        let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137            // Node exists - apply ON MATCH properties
138            self.apply_on_match(existing_id);
139            (existing_id, false)
140        } else {
141            // Node doesn't exist - create it
142            let new_id = self.create_node();
143            (new_id, true)
144        };
145
146        // Build output chunk with the node ID
147        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148        builder.column_mut(0).unwrap().push_node_id(node_id);
149        builder.advance_row();
150
151        // Log for debugging (in real code, this would be removed)
152        let _ = was_created; // Suppress unused variable warning
153
154        Ok(Some(builder.finish()))
155    }
156
157    fn reset(&mut self) {
158        self.executed = false;
159    }
160
161    fn name(&self) -> &'static str {
162        "Merge"
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_merge_creates_new_node() {
172        let store = Arc::new(LpgStore::new());
173
174        // MERGE should create a new node since none exists
175        let mut merge = MergeOperator::new(
176            Arc::clone(&store),
177            "n".to_string(),
178            vec!["Person".to_string()],
179            vec![("name".to_string(), Value::String("Alice".into()))],
180            vec![], // no on_create
181            vec![], // no on_match
182        );
183
184        let result = merge.next().unwrap();
185        assert!(result.is_some());
186
187        // Verify node was created
188        let nodes = store.nodes_by_label("Person");
189        assert_eq!(nodes.len(), 1);
190
191        let node = store.get_node(nodes[0]).unwrap();
192        assert!(node.has_label("Person"));
193        assert_eq!(
194            node.properties.get(&PropertyKey::new("name")),
195            Some(&Value::String("Alice".into()))
196        );
197    }
198
199    #[test]
200    fn test_merge_matches_existing_node() {
201        let store = Arc::new(LpgStore::new());
202
203        // Create an existing node
204        store.create_node_with_props(
205            &["Person"],
206            vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
207        );
208
209        // MERGE should find the existing node
210        let mut merge = MergeOperator::new(
211            Arc::clone(&store),
212            "n".to_string(),
213            vec!["Person".to_string()],
214            vec![("name".to_string(), Value::String("Bob".into()))],
215            vec![], // no on_create
216            vec![], // no on_match
217        );
218
219        let result = merge.next().unwrap();
220        assert!(result.is_some());
221
222        // Verify only one node exists (no new node created)
223        let nodes = store.nodes_by_label("Person");
224        assert_eq!(nodes.len(), 1);
225    }
226
227    #[test]
228    fn test_merge_with_on_create() {
229        let store = Arc::new(LpgStore::new());
230
231        // MERGE with ON CREATE SET
232        let mut merge = MergeOperator::new(
233            Arc::clone(&store),
234            "n".to_string(),
235            vec!["Person".to_string()],
236            vec![("name".to_string(), Value::String("Charlie".into()))],
237            vec![("created".to_string(), Value::Bool(true))], // on_create
238            vec![],                                           // no on_match
239        );
240
241        let _ = merge.next().unwrap();
242
243        // Verify node has both match properties and on_create properties
244        let nodes = store.nodes_by_label("Person");
245        let node = store.get_node(nodes[0]).unwrap();
246        assert_eq!(
247            node.properties.get(&PropertyKey::new("name")),
248            Some(&Value::String("Charlie".into()))
249        );
250        assert_eq!(
251            node.properties.get(&PropertyKey::new("created")),
252            Some(&Value::Bool(true))
253        );
254    }
255
256    #[test]
257    fn test_merge_with_on_match() {
258        let store = Arc::new(LpgStore::new());
259
260        // Create an existing node
261        let node_id = store.create_node_with_props(
262            &["Person"],
263            vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
264        );
265
266        // MERGE with ON MATCH SET
267        let mut merge = MergeOperator::new(
268            Arc::clone(&store),
269            "n".to_string(),
270            vec!["Person".to_string()],
271            vec![("name".to_string(), Value::String("Diana".into()))],
272            vec![],                                           // no on_create
273            vec![("updated".to_string(), Value::Bool(true))], // on_match
274        );
275
276        let _ = merge.next().unwrap();
277
278        // Verify node has the on_match property added
279        let node = store.get_node(node_id).unwrap();
280        assert_eq!(
281            node.properties.get(&PropertyKey::new("updated")),
282            Some(&Value::Bool(true))
283        );
284    }
285}