graphos_core/execution/operators/
merge.rs1use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::lpg::LpgStore;
11use graphos_common::types::{LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14pub struct MergeOperator {
19 store: Arc<LpgStore>,
21 variable: String,
23 labels: Vec<String>,
25 match_properties: Vec<(String, Value)>,
27 on_create_properties: Vec<(String, Value)>,
29 on_match_properties: Vec<(String, Value)>,
31 executed: bool,
33}
34
35impl MergeOperator {
36 pub fn new(
38 store: Arc<LpgStore>,
39 variable: String,
40 labels: Vec<String>,
41 match_properties: Vec<(String, Value)>,
42 on_create_properties: Vec<(String, Value)>,
43 on_match_properties: Vec<(String, Value)>,
44 ) -> Self {
45 Self {
46 store,
47 variable,
48 labels,
49 match_properties,
50 on_create_properties,
51 on_match_properties,
52 executed: false,
53 }
54 }
55
56 #[must_use]
58 pub fn variable(&self) -> &str {
59 &self.variable
60 }
61
62 fn find_matching_node(&self) -> Option<NodeId> {
64 let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66 self.store.nodes_by_label(first_label)
67 } else {
68 self.store.node_ids()
69 };
70
71 for node_id in candidates {
73 if let Some(node) = self.store.get_node(node_id) {
74 let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76 if !has_all_labels {
77 continue;
78 }
79
80 let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82 node.properties
83 .get(&PropertyKey::new(key.as_str()))
84 .map(|v| v == expected_value)
85 .unwrap_or(false)
86 });
87
88 if has_all_props {
89 return Some(node_id);
90 }
91 }
92 }
93
94 None
95 }
96
97 fn create_node(&self) -> NodeId {
99 let mut all_props: Vec<(PropertyKey, Value)> = self
101 .match_properties
102 .iter()
103 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
104 .collect();
105
106 for (k, v) in &self.on_create_properties {
108 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
110 existing.1 = v.clone();
111 } else {
112 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
113 }
114 }
115
116 let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
117 self.store.create_node_with_props(&labels, all_props)
118 }
119
120 fn apply_on_match(&self, node_id: NodeId) {
122 for (key, value) in &self.on_match_properties {
123 self.store
124 .set_node_property(node_id, key.as_str(), value.clone());
125 }
126 }
127}
128
129impl Operator for MergeOperator {
130 fn next(&mut self) -> OperatorResult {
131 if self.executed {
132 return Ok(None);
133 }
134 self.executed = true;
135
136 let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
138 self.apply_on_match(existing_id);
140 (existing_id, false)
141 } else {
142 let new_id = self.create_node();
144 (new_id, true)
145 };
146
147 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
149 builder.column_mut(0).unwrap().push_node_id(node_id);
150 builder.advance_row();
151
152 let _ = was_created; Ok(Some(builder.finish()))
156 }
157
158 fn reset(&mut self) {
159 self.executed = false;
160 }
161
162 fn name(&self) -> &'static str {
163 "Merge"
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn test_merge_creates_new_node() {
173 let store = Arc::new(LpgStore::new());
174
175 let mut merge = MergeOperator::new(
177 Arc::clone(&store),
178 "n".to_string(),
179 vec!["Person".to_string()],
180 vec![("name".to_string(), Value::String("Alice".into()))],
181 vec![], vec![], );
184
185 let result = merge.next().unwrap();
186 assert!(result.is_some());
187
188 let nodes = store.nodes_by_label("Person");
190 assert_eq!(nodes.len(), 1);
191
192 let node = store.get_node(nodes[0]).unwrap();
193 assert!(node.has_label("Person"));
194 assert_eq!(
195 node.properties.get(&PropertyKey::new("name")),
196 Some(&Value::String("Alice".into()))
197 );
198 }
199
200 #[test]
201 fn test_merge_matches_existing_node() {
202 let store = Arc::new(LpgStore::new());
203
204 store.create_node_with_props(
206 &["Person"],
207 vec![(PropertyKey::new("name"), Value::String("Bob".into()))],
208 );
209
210 let mut merge = MergeOperator::new(
212 Arc::clone(&store),
213 "n".to_string(),
214 vec!["Person".to_string()],
215 vec![("name".to_string(), Value::String("Bob".into()))],
216 vec![], vec![], );
219
220 let result = merge.next().unwrap();
221 assert!(result.is_some());
222
223 let nodes = store.nodes_by_label("Person");
225 assert_eq!(nodes.len(), 1);
226 }
227
228 #[test]
229 fn test_merge_with_on_create() {
230 let store = Arc::new(LpgStore::new());
231
232 let mut merge = MergeOperator::new(
234 Arc::clone(&store),
235 "n".to_string(),
236 vec!["Person".to_string()],
237 vec![("name".to_string(), Value::String("Charlie".into()))],
238 vec![("created".to_string(), Value::Bool(true))], vec![], );
241
242 let _ = merge.next().unwrap();
243
244 let nodes = store.nodes_by_label("Person");
246 let node = store.get_node(nodes[0]).unwrap();
247 assert_eq!(
248 node.properties.get(&PropertyKey::new("name")),
249 Some(&Value::String("Charlie".into()))
250 );
251 assert_eq!(
252 node.properties.get(&PropertyKey::new("created")),
253 Some(&Value::Bool(true))
254 );
255 }
256
257 #[test]
258 fn test_merge_with_on_match() {
259 let store = Arc::new(LpgStore::new());
260
261 let node_id = store.create_node_with_props(
263 &["Person"],
264 vec![(PropertyKey::new("name"), Value::String("Diana".into()))],
265 );
266
267 let mut merge = MergeOperator::new(
269 Arc::clone(&store),
270 "n".to_string(),
271 vec!["Person".to_string()],
272 vec![("name".to_string(), Value::String("Diana".into()))],
273 vec![], vec![("updated".to_string(), Value::Bool(true))], );
276
277 let _ = merge.next().unwrap();
278
279 let node = store.get_node(node_id).unwrap();
281 assert_eq!(
282 node.properties.get(&PropertyKey::new("updated")),
283 Some(&Value::Bool(true))
284 );
285 }
286}