Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use grafeo_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14/// Merge operator for MERGE clause.
15///
16/// Tries to match a node with the given labels and properties.
17/// If found, returns the existing node. If not found, creates a new node.
18pub struct MergeOperator {
19    /// The graph store.
20    store: Arc<LpgStore>,
21    /// Variable name for the merged node.
22    variable: String,
23    /// Labels to match/create.
24    labels: Vec<String>,
25    /// Properties that must match (also used for creation).
26    match_properties: Vec<(String, Value)>,
27    /// Properties to set on CREATE.
28    on_create_properties: Vec<(String, Value)>,
29    /// Properties to set on MATCH.
30    on_match_properties: Vec<(String, Value)>,
31    /// Whether we've already executed.
32    executed: bool,
33}
34
35impl MergeOperator {
36    /// Creates a new merge operator.
37    pub fn new(
38        store: Arc<LpgStore>,
39        variable: String,
40        labels: Vec<String>,
41        match_properties: Vec<(String, Value)>,
42        on_create_properties: Vec<(String, Value)>,
43        on_match_properties: Vec<(String, Value)>,
44    ) -> Self {
45        Self {
46            store,
47            variable,
48            labels,
49            match_properties,
50            on_create_properties,
51            on_match_properties,
52            executed: false,
53        }
54    }
55
56    /// Returns the variable name for the merged node.
57    #[must_use]
58    pub fn variable(&self) -> &str {
59        &self.variable
60    }
61
62    /// Tries to find a matching node.
63    fn find_matching_node(&self) -> Option<NodeId> {
64        // Get all nodes with the first label (or all nodes if no labels)
65        let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66            self.store.nodes_by_label(first_label)
67        } else {
68            self.store.node_ids()
69        };
70
71        // Filter by all labels and properties
72        for node_id in candidates {
73            if let Some(node) = self.store.get_node(node_id) {
74                // Check all labels
75                let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76                if !has_all_labels {
77                    continue;
78                }
79
80                // Check all match properties
81                let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82                    node.properties
83                        .get(&PropertyKey::new(key.as_str()))
84                        .map(|v| v == expected_value)
85                        .unwrap_or(false)
86                });
87
88                if has_all_props {
89                    return Some(node_id);
90                }
91            }
92        }
93
94        None
95    }
96
97    /// Creates a new node with the specified labels and properties.
98    fn create_node(&self) -> NodeId {
99        // Combine match properties with on_create properties
100        let mut all_props: Vec<(PropertyKey, Value)> = self
101            .match_properties
102            .iter()
103            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
104            .collect();
105
106        // Add on_create properties (may override match properties)
107        for (k, v) in &self.on_create_properties {
108            // Check if property already exists, if so update it
109            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
110                existing.1 = v.clone();
111            } else {
112                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
113            }
114        }
115
116        let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
117        self.store.create_node_with_props(&labels, all_props)
118    }
119
120    /// Applies ON MATCH properties to an existing node.
121    fn apply_on_match(&self, node_id: NodeId) {
122        for (key, value) in &self.on_match_properties {
123            self.store
124                .set_node_property(node_id, key.as_str(), value.clone());
125        }
126    }
127}
128
129impl Operator for MergeOperator {
130    fn next(&mut self) -> OperatorResult {
131        if self.executed {
132            return Ok(None);
133        }
134        self.executed = true;
135
136        // Try to find matching node
137        let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
138            // Node exists - apply ON MATCH properties
139            self.apply_on_match(existing_id);
140            (existing_id, false)
141        } else {
142            // Node doesn't exist - create it
143            let new_id = self.create_node();
144            (new_id, true)
145        };
146
147        // Build output chunk with the node ID
148        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
149        builder.column_mut(0).unwrap().push_node_id(node_id);
150        builder.advance_row();
151
152        // Log for debugging (in real code, this would be removed)
153        let _ = was_created; // Suppress unused variable warning
154
155        Ok(Some(builder.finish()))
156    }
157
158    fn reset(&mut self) {
159        self.executed = false;
160    }
161
162    fn name(&self) -> &'static str {
163        "Merge"
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_merge_creates_new_node() {
173        let store = Arc::new(LpgStore::new());
174
175        // MERGE should create a new node since none exists
176        let mut merge = MergeOperator::new(
177            Arc::clone(&store),
178            "n".to_string(),
179            vec!["Person".to_string()],
180            vec![("name".to_string(), Value::String("Alice".into()))],
181            vec![], // no on_create
182            vec![], // no on_match
183        );
184
185        let result = merge.next().unwrap();
186        assert!(result.is_some());
187
188        // Verify node was created
189        let nodes = store.nodes_by_label("Person");
190        assert_eq!(nodes.len(), 1);
191
192        let node = store.get_node(nodes[0]).unwrap();
193        assert!(node.has_label("Person"));
194        assert_eq!(
195            node.properties.get(&PropertyKey::new("name")),
196            Some(&Value::String("Alice".into()))
197        );
198    }
199
200    #[test]
201    fn test_merge_matches_existing_node() {
202        let store = Arc::new(LpgStore::new());
203
204        // Create an existing node
205        store.create_node_with_props(
206            &["Person"],
207            vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
208        );
209
210        // MERGE should find the existing node
211        let mut merge = MergeOperator::new(
212            Arc::clone(&store),
213            "n".to_string(),
214            vec!["Person".to_string()],
215            vec![("name".to_string(), Value::String("Bob".into()))],
216            vec![], // no on_create
217            vec![], // no on_match
218        );
219
220        let result = merge.next().unwrap();
221        assert!(result.is_some());
222
223        // Verify only one node exists (no new node created)
224        let nodes = store.nodes_by_label("Person");
225        assert_eq!(nodes.len(), 1);
226    }
227
228    #[test]
229    fn test_merge_with_on_create() {
230        let store = Arc::new(LpgStore::new());
231
232        // MERGE with ON CREATE SET
233        let mut merge = MergeOperator::new(
234            Arc::clone(&store),
235            "n".to_string(),
236            vec!["Person".to_string()],
237            vec![("name".to_string(), Value::String("Charlie".into()))],
238            vec![("created".to_string(), Value::Bool(true))], // on_create
239            vec![],                                           // no on_match
240        );
241
242        let _ = merge.next().unwrap();
243
244        // Verify node has both match properties and on_create properties
245        let nodes = store.nodes_by_label("Person");
246        let node = store.get_node(nodes[0]).unwrap();
247        assert_eq!(
248            node.properties.get(&PropertyKey::new("name")),
249            Some(&Value::String("Charlie".into()))
250        );
251        assert_eq!(
252            node.properties.get(&PropertyKey::new("created")),
253            Some(&Value::Bool(true))
254        );
255    }
256
257    #[test]
258    fn test_merge_with_on_match() {
259        let store = Arc::new(LpgStore::new());
260
261        // Create an existing node
262        let node_id = store.create_node_with_props(
263            &["Person"],
264            vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
265        );
266
267        // MERGE with ON MATCH SET
268        let mut merge = MergeOperator::new(
269            Arc::clone(&store),
270            "n".to_string(),
271            vec!["Person".to_string()],
272            vec![("name".to_string(), Value::String("Diana".into()))],
273            vec![],                                           // no on_create
274            vec![("updated".to_string(), Value::Bool(true))], // on_match
275        );
276
277        let _ = merge.next().unwrap();
278
279        // Verify node has the on_match property added
280        let node = store.get_node(node_id).unwrap();
281        assert_eq!(
282            node.properties.get(&PropertyKey::new("updated")),
283            Some(&Value::Bool(true))
284        );
285    }
286}