grafeo_core/execution/operators/
merge.rs1use super::{Operator, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use crate::graph::GraphStoreMut;
11use grafeo_common::types::{EdgeId, LogicalType, NodeId, PropertyKey, Value};
12use std::sync::Arc;
13
14pub struct MergeOperator {
19 store: Arc<dyn GraphStoreMut>,
21 variable: String,
23 labels: Vec<String>,
25 match_properties: Vec<(String, Value)>,
27 on_create_properties: Vec<(String, Value)>,
29 on_match_properties: Vec<(String, Value)>,
31 executed: bool,
33}
34
35impl MergeOperator {
36 pub fn new(
38 store: Arc<dyn GraphStoreMut>,
39 variable: String,
40 labels: Vec<String>,
41 match_properties: Vec<(String, Value)>,
42 on_create_properties: Vec<(String, Value)>,
43 on_match_properties: Vec<(String, Value)>,
44 ) -> Self {
45 Self {
46 store,
47 variable,
48 labels,
49 match_properties,
50 on_create_properties,
51 on_match_properties,
52 executed: false,
53 }
54 }
55
56 #[must_use]
58 pub fn variable(&self) -> &str {
59 &self.variable
60 }
61
62 fn find_matching_node(&self) -> Option<NodeId> {
64 let candidates: Vec<NodeId> = if let Some(first_label) = self.labels.first() {
66 self.store.nodes_by_label(first_label)
67 } else {
68 self.store.node_ids()
69 };
70
71 for node_id in candidates {
73 if let Some(node) = self.store.get_node(node_id) {
74 let has_all_labels = self.labels.iter().all(|label| node.has_label(label));
76 if !has_all_labels {
77 continue;
78 }
79
80 let has_all_props = self.match_properties.iter().all(|(key, expected_value)| {
82 node.properties
83 .get(&PropertyKey::new(key.as_str()))
84 .is_some_and(|v| v == expected_value)
85 });
86
87 if has_all_props {
88 return Some(node_id);
89 }
90 }
91 }
92
93 None
94 }
95
96 fn create_node(&self) -> NodeId {
98 let mut all_props: Vec<(PropertyKey, Value)> = self
100 .match_properties
101 .iter()
102 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
103 .collect();
104
105 for (k, v) in &self.on_create_properties {
107 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
109 existing.1 = v.clone();
110 } else {
111 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
112 }
113 }
114
115 let labels: Vec<&str> = self.labels.iter().map(String::as_str).collect();
116 self.store.create_node_with_props(&labels, &all_props)
117 }
118
119 fn apply_on_match(&self, node_id: NodeId) {
121 for (key, value) in &self.on_match_properties {
122 self.store
123 .set_node_property(node_id, key.as_str(), value.clone());
124 }
125 }
126}
127
128impl Operator for MergeOperator {
129 fn next(&mut self) -> OperatorResult {
130 if self.executed {
131 return Ok(None);
132 }
133 self.executed = true;
134
135 let (node_id, was_created) = if let Some(existing_id) = self.find_matching_node() {
137 self.apply_on_match(existing_id);
139 (existing_id, false)
140 } else {
141 let new_id = self.create_node();
143 (new_id, true)
144 };
145
146 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
148 builder
149 .column_mut(0)
150 .expect("column 0 exists: builder created with single-column schema")
151 .push_node_id(node_id);
152 builder.advance_row();
153
154 let _ = was_created; Ok(Some(builder.finish()))
158 }
159
160 fn reset(&mut self) {
161 self.executed = false;
162 }
163
164 fn name(&self) -> &'static str {
165 "Merge"
166 }
167}
168
169pub struct MergeRelationshipConfig {
171 pub source_column: usize,
173 pub target_column: usize,
175 pub edge_type: String,
177 pub match_properties: Vec<(String, Value)>,
179 pub on_create_properties: Vec<(String, Value)>,
181 pub on_match_properties: Vec<(String, Value)>,
183 pub output_schema: Vec<LogicalType>,
185 pub edge_output_column: usize,
187}
188
189pub struct MergeRelationshipOperator {
196 store: Arc<dyn GraphStoreMut>,
198 input: Box<dyn Operator>,
200 config: MergeRelationshipConfig,
202}
203
204impl MergeRelationshipOperator {
205 pub fn new(
207 store: Arc<dyn GraphStoreMut>,
208 input: Box<dyn Operator>,
209 config: MergeRelationshipConfig,
210 ) -> Self {
211 Self {
212 store,
213 input,
214 config,
215 }
216 }
217
218 fn find_matching_edge(&self, src: NodeId, dst: NodeId) -> Option<EdgeId> {
220 use crate::graph::Direction;
221
222 for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
223 if target != dst {
224 continue;
225 }
226
227 if let Some(edge) = self.store.get_edge(edge_id) {
228 if edge.edge_type.as_str() != self.config.edge_type {
229 continue;
230 }
231
232 let has_all_props =
233 self.config.match_properties.iter().all(|(key, expected)| {
234 edge.get_property(key).is_some_and(|v| v == expected)
235 });
236
237 if has_all_props {
238 return Some(edge_id);
239 }
240 }
241 }
242
243 None
244 }
245
246 fn create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
248 let mut all_props: Vec<(PropertyKey, Value)> = self
249 .config
250 .match_properties
251 .iter()
252 .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
253 .collect();
254
255 for (k, v) in &self.config.on_create_properties {
256 if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
257 existing.1 = v.clone();
258 } else {
259 all_props.push((PropertyKey::new(k.as_str()), v.clone()));
260 }
261 }
262
263 self.store
264 .create_edge_with_props(src, dst, &self.config.edge_type, &all_props)
265 }
266
267 fn apply_on_match(&self, edge_id: EdgeId) {
269 for (key, value) in &self.config.on_match_properties {
270 self.store
271 .set_edge_property(edge_id, key.as_str(), value.clone());
272 }
273 }
274}
275
276impl Operator for MergeRelationshipOperator {
277 fn next(&mut self) -> OperatorResult {
278 use super::OperatorError;
279
280 if let Some(chunk) = self.input.next()? {
281 let mut builder =
282 DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
283
284 for row in chunk.selected_indices() {
285 let src_val = chunk
286 .column(self.config.source_column)
287 .and_then(|c| c.get_node_id(row))
288 .ok_or_else(|| OperatorError::TypeMismatch {
289 expected: "NodeId (source)".to_string(),
290 found: "None".to_string(),
291 })?;
292
293 let dst_val = chunk
294 .column(self.config.target_column)
295 .and_then(|c| c.get_node_id(row))
296 .ok_or_else(|| OperatorError::TypeMismatch {
297 expected: "NodeId (target)".to_string(),
298 found: "None".to_string(),
299 })?;
300
301 let edge_id = if let Some(existing) = self.find_matching_edge(src_val, dst_val) {
302 self.apply_on_match(existing);
303 existing
304 } else {
305 self.create_edge(src_val, dst_val)
306 };
307
308 for col_idx in 0..self.config.output_schema.len() {
310 if col_idx == self.config.edge_output_column {
311 if let Some(dst_col) = builder.column_mut(col_idx) {
312 dst_col.push_edge_id(edge_id);
313 }
314 } else if let (Some(src_col), Some(dst_col)) =
315 (chunk.column(col_idx), builder.column_mut(col_idx))
316 && let Some(val) = src_col.get_value(row)
317 {
318 dst_col.push_value(val);
319 }
320 }
321
322 builder.advance_row();
323 }
324
325 return Ok(Some(builder.finish()));
326 }
327
328 Ok(None)
329 }
330
331 fn reset(&mut self) {
332 self.input.reset();
333 }
334
335 fn name(&self) -> &'static str {
336 "MergeRelationship"
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 use crate::graph::lpg::LpgStore;
344
345 #[test]
346 fn test_merge_creates_new_node() {
347 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
348
349 let mut merge = MergeOperator::new(
351 Arc::clone(&store),
352 "n".to_string(),
353 vec!["Person".to_string()],
354 vec![("name".to_string(), Value::String("Alix".into()))],
355 vec![], vec![], );
358
359 let result = merge.next().unwrap();
360 assert!(result.is_some());
361
362 let nodes = store.nodes_by_label("Person");
364 assert_eq!(nodes.len(), 1);
365
366 let node = store.get_node(nodes[0]).unwrap();
367 assert!(node.has_label("Person"));
368 assert_eq!(
369 node.properties.get(&PropertyKey::new("name")),
370 Some(&Value::String("Alix".into()))
371 );
372 }
373
374 #[test]
375 fn test_merge_matches_existing_node() {
376 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
377
378 store.create_node_with_props(
380 &["Person"],
381 &[(PropertyKey::new("name"), Value::String("Gus".into()))],
382 );
383
384 let mut merge = MergeOperator::new(
386 Arc::clone(&store),
387 "n".to_string(),
388 vec!["Person".to_string()],
389 vec![("name".to_string(), Value::String("Gus".into()))],
390 vec![], vec![], );
393
394 let result = merge.next().unwrap();
395 assert!(result.is_some());
396
397 let nodes = store.nodes_by_label("Person");
399 assert_eq!(nodes.len(), 1);
400 }
401
402 #[test]
403 fn test_merge_with_on_create() {
404 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
405
406 let mut merge = MergeOperator::new(
408 Arc::clone(&store),
409 "n".to_string(),
410 vec!["Person".to_string()],
411 vec![("name".to_string(), Value::String("Vincent".into()))],
412 vec![("created".to_string(), Value::Bool(true))], vec![], );
415
416 let _ = merge.next().unwrap();
417
418 let nodes = store.nodes_by_label("Person");
420 let node = store.get_node(nodes[0]).unwrap();
421 assert_eq!(
422 node.properties.get(&PropertyKey::new("name")),
423 Some(&Value::String("Vincent".into()))
424 );
425 assert_eq!(
426 node.properties.get(&PropertyKey::new("created")),
427 Some(&Value::Bool(true))
428 );
429 }
430
431 #[test]
432 fn test_merge_with_on_match() {
433 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
434
435 let node_id = store.create_node_with_props(
437 &["Person"],
438 &[(PropertyKey::new("name"), Value::String("Jules".into()))],
439 );
440
441 let mut merge = MergeOperator::new(
443 Arc::clone(&store),
444 "n".to_string(),
445 vec!["Person".to_string()],
446 vec![("name".to_string(), Value::String("Jules".into()))],
447 vec![], vec![("updated".to_string(), Value::Bool(true))], );
450
451 let _ = merge.next().unwrap();
452
453 let node = store.get_node(node_id).unwrap();
455 assert_eq!(
456 node.properties.get(&PropertyKey::new("updated")),
457 Some(&Value::Bool(true))
458 );
459 }
460}