Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use grafeo_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14/// Merge operator for MERGE clause.
15///
16/// Tries to match a node with the given labels and properties.
17/// If found, returns the existing node. If not found, creates a new node.
18pub struct MergeOperator {
19    /// The graph store.
20    store: Arc<LpgStore>,
21    /// Variable name for the merged node.
22    variable: String,
23    /// Labels to match/create.
24    labels: Vec<String>,
25    /// Properties that must match (also used for creation).
26    match_properties: Vec<(String, Value)>,
27    /// Properties to set on CREATE.
28    on_create_properties: Vec<(String, Value)>,
29    /// Properties to set on MATCH.
30    on_match_properties: Vec<(String, Value)>,
31    /// Whether we've already executed.
32    executed: bool,
33}
34
35impl MergeOperator {
36    /// Creates a new merge operator.
37    pub fn new(
38        store: Arc<LpgStore>,
39        variable: String,
40        labels: Vec<String>,
41        match_properties: Vec<(String, Value)>,
42        on_create_properties: Vec<(String, Value)>,
43        on_match_properties: Vec<(String, Value)>,
44    ) -> Self {
45        Self {
46            store,
47            variable,
48            labels,
49            match_properties,
50            on_create_properties,
51            on_match_properties,
52            executed: false,
53        }
54    }
55
56    /// Returns the variable name for the merged node.
57    #[must_use]
58    pub fn variable(&self) -> &str {
59        &self.variable
60    }
61
62    /// Tries to find a matching node.
63    fn find_matching_node(&self) -> Option<NodeId> {
64        // Get all nodes with the first label (or all nodes if no labels)
65        let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66            self.store.nodes_by_label(first_label)
67        } else {
68            self.store.node_ids()
69        };
70
71        // Filter by all labels and properties
72        for node_id in candidates {
73            if let Some(node) = self.store.get_node(node_id) {
74                // Check all labels
75                let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76                if !has_all_labels {
77                    continue;
78                }
79
80                // Check all match properties
81                let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82                    node.properties
83                        .get(&PropertyKey::new(key.as_str()))
84                        .is_some_and(|v| v == expected_value)
85                });
86
87                if has_all_props {
88                    return Some(node_id);
89                }
90            }
91        }
92
93        None
94    }
95
96    /// Creates a new node with the specified labels and properties.
97    fn create_node(&self) -> NodeId {
98        // Combine match properties with on_create properties
99        let mut all_props: Vec<(PropertyKey, Value)> = self
100            .match_properties
101            .iter()
102            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103            .collect();
104
105        // Add on_create properties (may override match properties)
106        for (k, v) in &self.on_create_properties {
107            // Check if property already exists, if so update it
108            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109                existing.1 = v.clone();
110            } else {
111                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112            }
113        }
114
115        let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116        self.store.create_node_with_props(&labels, all_props)
117    }
118
119    /// Applies ON MATCH properties to an existing node.
120    fn apply_on_match(&self, node_id: NodeId) {
121        for (key, value) in &self.on_match_properties {
122            self.store
123                .set_node_property(node_id, key.as_str(), value.clone());
124        }
125    }
126}
127
128impl Operator for MergeOperator {
129    fn next(&mut self) -> OperatorResult {
130        if self.executed {
131            return Ok(None);
132        }
133        self.executed = true;
134
135        // Try to find matching node
136        let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137            // Node exists - apply ON MATCH properties
138            self.apply_on_match(existing_id);
139            (existing_id, false)
140        } else {
141            // Node doesn't exist - create it
142            let new_id = self.create_node();
143            (new_id, true)
144        };
145
146        // Build output chunk with the node ID
147        let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148        builder
149            .column_mut(0)
150            .expect("column 0 exists: builder created with single-column schema")
151            .push_node_id(node_id);
152        builder.advance_row();
153
154        // Log for debugging (in real code, this would be removed)
155        let _ = was_created; // Suppress unused variable warning
156
157        Ok(Some(builder.finish()))
158    }
159
160    fn reset(&mut self) {
161        self.executed = false;
162    }
163
164    fn name(&self) -> &'static str {
165        "Merge"
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn test_merge_creates_new_node() {
175        let store = Arc::new(LpgStore::new());
176
177        // MERGE should create a new node since none exists
178        let mut merge = MergeOperator::new(
179            Arc::clone(&store),
180            "n".to_string(),
181            vec!["Person".to_string()],
182            vec![("name".to_string(), Value::String("Alice".into()))],
183            vec![], // no on_create
184            vec![], // no on_match
185        );
186
187        let result = merge.next().unwrap();
188        assert!(result.is_some());
189
190        // Verify node was created
191        let nodes = store.nodes_by_label("Person");
192        assert_eq!(nodes.len(), 1);
193
194        let node = store.get_node(nodes[0]).unwrap();
195        assert!(node.has_label("Person"));
196        assert_eq!(
197            node.properties.get(&PropertyKey::new("name")),
198            Some(&Value::String("Alice".into()))
199        );
200    }
201
202    #[test]
203    fn test_merge_matches_existing_node() {
204        let store = Arc::new(LpgStore::new());
205
206        // Create an existing node
207        store.create_node_with_props(
208            &["Person"],
209            vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
210        );
211
212        // MERGE should find the existing node
213        let mut merge = MergeOperator::new(
214            Arc::clone(&store),
215            "n".to_string(),
216            vec!["Person".to_string()],
217            vec![("name".to_string(), Value::String("Bob".into()))],
218            vec![], // no on_create
219            vec![], // no on_match
220        );
221
222        let result = merge.next().unwrap();
223        assert!(result.is_some());
224
225        // Verify only one node exists (no new node created)
226        let nodes = store.nodes_by_label("Person");
227        assert_eq!(nodes.len(), 1);
228    }
229
230    #[test]
231    fn test_merge_with_on_create() {
232        let store = Arc::new(LpgStore::new());
233
234        // MERGE with ON CREATE SET
235        let mut merge = MergeOperator::new(
236            Arc::clone(&store),
237            "n".to_string(),
238            vec!["Person".to_string()],
239            vec![("name".to_string(), Value::String("Charlie".into()))],
240            vec![("created".to_string(), Value::Bool(true))], // on_create
241            vec![],                                           // no on_match
242        );
243
244        let _ = merge.next().unwrap();
245
246        // Verify node has both match properties and on_create properties
247        let nodes = store.nodes_by_label("Person");
248        let node = store.get_node(nodes[0]).unwrap();
249        assert_eq!(
250            node.properties.get(&PropertyKey::new("name")),
251            Some(&Value::String("Charlie".into()))
252        );
253        assert_eq!(
254            node.properties.get(&PropertyKey::new("created")),
255            Some(&Value::Bool(true))
256        );
257    }
258
259    #[test]
260    fn test_merge_with_on_match() {
261        let store = Arc::new(LpgStore::new());
262
263        // Create an existing node
264        let node_id = store.create_node_with_props(
265            &["Person"],
266            vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
267        );
268
269        // MERGE with ON MATCH SET
270        let mut merge = MergeOperator::new(
271            Arc::clone(&store),
272            "n".to_string(),
273            vec!["Person".to_string()],
274            vec![("name".to_string(), Value::String("Diana".into()))],
275            vec![],                                           // no on_create
276            vec![("updated".to_string(), Value::Bool(true))], // on_match
277        );
278
279        let _ = merge.next().unwrap();
280
281        // Verify node has the on_match property added
282        let node = store.get_node(node_id).unwrap();
283        assert_eq!(
284            node.properties.get(&PropertyKey::new("updated")),
285            Some(&Value::Bool(true))
286        );
287    }
288}