grafeo_core/execution/operators/
merge.rs1use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use grafeo_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14pub struct MergeOperator {
19 store: Arc<LpgStore>,
21 variable: String,
23 labels: Vec<String>,
25 match_properties: Vec<(String, Value)>,
27 on_create_properties: Vec<(String, Value)>,
29 on_match_properties: Vec<(String, Value)>,
31 executed: bool,
33}
34
35impl MergeOperator {
36 pub fn new(
38 store: Arc<LpgStore>,
39 variable: String,
40 labels: Vec<String>,
41 match_properties: Vec<(String, Value)>,
42 on_create_properties: Vec<(String, Value)>,
43 on_match_properties: Vec<(String, Value)>,
44 ) -> Self {
45 Self {
46 store,
47 variable,
48 labels,
49 match_properties,
50 on_create_properties,
51 on_match_properties,
52 executed: false,
53 }
54 }
55
56 #[must_use]
58 pub fn variable(&self) -> &str {
59 &self.variable
60 }
61
62 fn find_matching_node(&self) -> Option<NodeId> {
64 let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66 self.store.nodes_by_label(first_label)
67 } else {
68 self.store.node_ids()
69 };
70
71 for node_id in candidates {
73 if let Some(node) = self.store.get_node(node_id) {
74 let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76 if !has_all_labels {
77 continue;
78 }
79
80 let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82 node.properties
83 .get(&PropertyKey::new(key.as_str()))
84 .is_some_and(|v| v == expected_value)
85 });
86
87 if has_all_props {
88 return Some(node_id);
89 }
90 }
91 }
92
93 None
94 }
95
96 fn create_node(&self) -> NodeId {
98 let mut all_props: Vec<(PropertyKey, Value)> = self
100 .match_properties
101 .iter()
102 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103 .collect();
104
105 for (k, v) in &self.on_create_properties {
107 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109 existing.1 = v.clone();
110 } else {
111 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112 }
113 }
114
115 let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116 self.store.create_node_with_props(&labels, all_props)
117 }
118
119 fn apply_on_match(&self, node_id: NodeId) {
121 for (key, value) in &self.on_match_properties {
122 self.store
123 .set_node_property(node_id, key.as_str(), value.clone());
124 }
125 }
126}
127
128impl Operator for MergeOperator {
129 fn next(&mut self) -> OperatorResult {
130 if self.executed {
131 return Ok(None);
132 }
133 self.executed = true;
134
135 let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137 self.apply_on_match(existing_id);
139 (existing_id, false)
140 } else {
141 let new_id = self.create_node();
143 (new_id, true)
144 };
145
146 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148 builder.column_mut(0).unwrap().push_node_id(node_id);
149 builder.advance_row();
150
151 let _ = was_created; Ok(Some(builder.finish()))
155 }
156
157 fn reset(&mut self) {
158 self.executed = false;
159 }
160
161 fn name(&self) -> &'static str {
162 "Merge"
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_merge_creates_new_node() {
172 let store = Arc::new(LpgStore::new());
173
174 let mut merge = MergeOperator::new(
176 Arc::clone(&store),
177 "n".to_string(),
178 vec!["Person".to_string()],
179 vec![("name".to_string(), Value::String("Alice".into()))],
180 vec![], vec![], );
183
184 let result = merge.next().unwrap();
185 assert!(result.is_some());
186
187 let nodes = store.nodes_by_label("Person");
189 assert_eq!(nodes.len(), 1);
190
191 let node = store.get_node(nodes[0]).unwrap();
192 assert!(node.has_label("Person"));
193 assert_eq!(
194 node.properties.get(&PropertyKey::new("name")),
195 Some(&Value::String("Alice".into()))
196 );
197 }
198
199 #[test]
200 fn test_merge_matches_existing_node() {
201 let store = Arc::new(LpgStore::new());
202
203 store.create_node_with_props(
205 &["Person"],
206 vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
207 );
208
209 let mut merge = MergeOperator::new(
211 Arc::clone(&store),
212 "n".to_string(),
213 vec!["Person".to_string()],
214 vec![("name".to_string(), Value::String("Bob".into()))],
215 vec![], vec![], );
218
219 let result = merge.next().unwrap();
220 assert!(result.is_some());
221
222 let nodes = store.nodes_by_label("Person");
224 assert_eq!(nodes.len(), 1);
225 }
226
227 #[test]
228 fn test_merge_with_on_create() {
229 let store = Arc::new(LpgStore::new());
230
231 let mut merge = MergeOperator::new(
233 Arc::clone(&store),
234 "n".to_string(),
235 vec!["Person".to_string()],
236 vec![("name".to_string(), Value::String("Charlie".into()))],
237 vec![("created".to_string(), Value::Bool(true))], vec![], );
240
241 let _ = merge.next().unwrap();
242
243 let nodes = store.nodes_by_label("Person");
245 let node = store.get_node(nodes[0]).unwrap();
246 assert_eq!(
247 node.properties.get(&PropertyKey::new("name")),
248 Some(&Value::String("Charlie".into()))
249 );
250 assert_eq!(
251 node.properties.get(&PropertyKey::new("created")),
252 Some(&Value::Bool(true))
253 );
254 }
255
256 #[test]
257 fn test_merge_with_on_match() {
258 let store = Arc::new(LpgStore::new());
259
260 let node_id = store.create_node_with_props(
262 &["Person"],
263 vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
264 );
265
266 let mut merge = MergeOperator::new(
268 Arc::clone(&store),
269 "n".to_string(),
270 vec!["Person".to_string()],
271 vec![("name".to_string(), Value::String("Diana".into()))],
272 vec![], vec![("updated".to_string(), Value::Bool(true))], );
275
276 let _ = merge.next().unwrap();
277
278 let node = store.get_node(node_id).unwrap();
280 assert_eq!(
281 node.properties.get(&PropertyKey::new("updated")),
282 Some(&Value::Bool(true))
283 );
284 }
285}