grafeo_core/execution/operators/
merge.rs1use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use grafeo_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14pub struct MergeOperator {
19 store: Arc<LpgStore>,
21 variable: String,
23 labels: Vec<String>,
25 match_properties: Vec<(String, Value)>,
27 on_create_properties: Vec<(String, Value)>,
29 on_match_properties: Vec<(String, Value)>,
31 executed: bool,
33}
34
35impl MergeOperator {
36 pub fn new(
38 store: Arc<LpgStore>,
39 variable: String,
40 labels: Vec<String>,
41 match_properties: Vec<(String, Value)>,
42 on_create_properties: Vec<(String, Value)>,
43 on_match_properties: Vec<(String, Value)>,
44 ) -> Self {
45 Self {
46 store,
47 variable,
48 labels,
49 match_properties,
50 on_create_properties,
51 on_match_properties,
52 executed: false,
53 }
54 }
55
56 #[must_use]
58 pub fn variable(&self) -> &str {
59 &self.variable
60 }
61
62 fn find_matching_node(&self) -> Option<NodeId> {
64 let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66 self.store.nodes_by_label(first_label)
67 } else {
68 self.store.node_ids()
69 };
70
71 for node_id in candidates {
73 if let Some(node) = self.store.get_node(node_id) {
74 let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76 if !has_all_labels {
77 continue;
78 }
79
80 let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82 node.properties
83 .get(&PropertyKey::new(key.as_str()))
84 .is_some_and(|v| v == expected_value)
85 });
86
87 if has_all_props {
88 return Some(node_id);
89 }
90 }
91 }
92
93 None
94 }
95
96 fn create_node(&self) -> NodeId {
98 let mut all_props: Vec<(PropertyKey, Value)> = self
100 .match_properties
101 .iter()
102 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103 .collect();
104
105 for (k, v) in &self.on_create_properties {
107 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109 existing.1 = v.clone();
110 } else {
111 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112 }
113 }
114
115 let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116 self.store.create_node_with_props(&labels, all_props)
117 }
118
119 fn apply_on_match(&self, node_id: NodeId) {
121 for (key, value) in &self.on_match_properties {
122 self.store
123 .set_node_property(node_id, key.as_str(), value.clone());
124 }
125 }
126}
127
128impl Operator for MergeOperator {
129 fn next(&mut self) -> OperatorResult {
130 if self.executed {
131 return Ok(None);
132 }
133 self.executed = true;
134
135 let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137 self.apply_on_match(existing_id);
139 (existing_id, false)
140 } else {
141 let new_id = self.create_node();
143 (new_id, true)
144 };
145
146 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148 builder
149 .column_mut(0)
150 .expect("column 0 exists: builder created with single-column schema")
151 .push_node_id(node_id);
152 builder.advance_row();
153
154 let _ = was_created; Ok(Some(builder.finish()))
158 }
159
160 fn reset(&mut self) {
161 self.executed = false;
162 }
163
164 fn name(&self) -> &'static str {
165 "Merge"
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[test]
174 fn test_merge_creates_new_node() {
175 let store = Arc::new(LpgStore::new());
176
177 let mut merge = MergeOperator::new(
179 Arc::clone(&store),
180 "n".to_string(),
181 vec!["Person".to_string()],
182 vec![("name".to_string(), Value::String("Alice".into()))],
183 vec![], vec![], );
186
187 let result = merge.next().unwrap();
188 assert!(result.is_some());
189
190 let nodes = store.nodes_by_label("Person");
192 assert_eq!(nodes.len(), 1);
193
194 let node = store.get_node(nodes[0]).unwrap();
195 assert!(node.has_label("Person"));
196 assert_eq!(
197 node.properties.get(&PropertyKey::new("name")),
198 Some(&Value::String("Alice".into()))
199 );
200 }
201
202 #[test]
203 fn test_merge_matches_existing_node() {
204 let store = Arc::new(LpgStore::new());
205
206 store.create_node_with_props(
208 &["Person"],
209 vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
210 );
211
212 let mut merge = MergeOperator::new(
214 Arc::clone(&store),
215 "n".to_string(),
216 vec!["Person".to_string()],
217 vec![("name".to_string(), Value::String("Bob".into()))],
218 vec![], vec![], );
221
222 let result = merge.next().unwrap();
223 assert!(result.is_some());
224
225 let nodes = store.nodes_by_label("Person");
227 assert_eq!(nodes.len(), 1);
228 }
229
230 #[test]
231 fn test_merge_with_on_create() {
232 let store = Arc::new(LpgStore::new());
233
234 let mut merge = MergeOperator::new(
236 Arc::clone(&store),
237 "n".to_string(),
238 vec!["Person".to_string()],
239 vec![("name".to_string(), Value::String("Charlie".into()))],
240 vec![("created".to_string(), Value::Bool(true))], vec![], );
243
244 let _ = merge.next().unwrap();
245
246 let nodes = store.nodes_by_label("Person");
248 let node = store.get_node(nodes[0]).unwrap();
249 assert_eq!(
250 node.properties.get(&PropertyKey::new("name")),
251 Some(&Value::String("Charlie".into()))
252 );
253 assert_eq!(
254 node.properties.get(&PropertyKey::new("created")),
255 Some(&Value::Bool(true))
256 );
257 }
258
259 #[test]
260 fn test_merge_with_on_match() {
261 let store = Arc::new(LpgStore::new());
262
263 let node_id = store.create_node_with_props(
265 &["Person"],
266 vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
267 );
268
269 let mut merge = MergeOperator::new(
271 Arc::clone(&store),
272 "n".to_string(),
273 vec!["Person".to_string()],
274 vec![("name".to_string(), Value::String("Diana".into()))],
275 vec![], vec![("updated".to_string(), Value::Bool(true))], );
278
279 let _ = merge.next().unwrap();
280
281 let node = store.get_node(node_id).unwrap();
283 assert_eq!(
284 node.properties.get(&PropertyKey::new("updated")),
285 Some(&Value::Bool(true))
286 );
287 }
288}