Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, PropertySource)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, PropertySource)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, PropertySource)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32    /// If the merge variable was already bound in the input, this column index
33    /// is used to detect NULL references (e.g., from unmatched OPTIONAL MATCH).
34    /// `None` for standalone MERGE that introduces a new variable.
35    pub bound_variable_column: Option<usize>,
36}
37
38/// Merge operator for MERGE clause.
39///
40/// Tries to match a node with the given labels and properties.
41/// If found, returns the existing node. If not found, creates a new node.
42///
43/// When an input operator is provided (chained MERGE), input rows are
44/// passed through with the merged node ID appended as an additional column.
45pub struct MergeOperator {
46    /// The graph store.
47    store: Arc<dyn GraphStoreMut>,
48    /// Optional input operator (for chained MERGE patterns).
49    input: Option<Box<dyn Operator>>,
50    /// Merge configuration.
51    config: MergeConfig,
52    /// Whether we've already executed (standalone mode only).
53    executed: bool,
54    /// Epoch for MVCC versioning.
55    viewing_epoch: Option<EpochId>,
56    /// Transaction ID for undo log tracking.
57    transaction_id: Option<TransactionId>,
58    /// Optional constraint validator for schema enforcement.
59    validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63    /// Creates a new merge operator.
64    pub fn new(
65        store: Arc<dyn GraphStoreMut>,
66        input: Option<Box<dyn Operator>>,
67        config: MergeConfig,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            config,
73            executed: false,
74            viewing_epoch: None,
75            transaction_id: None,
76            validator: None,
77        }
78    }
79
80    /// Returns the variable name for the merged node.
81    #[must_use]
82    pub fn variable(&self) -> &str {
83        &self.config.variable
84    }
85
86    /// Sets the transaction context for versioned mutations.
87    pub fn with_transaction_context(
88        mut self,
89        epoch: EpochId,
90        transaction_id: Option<TransactionId>,
91    ) -> Self {
92        self.viewing_epoch = Some(epoch);
93        self.transaction_id = transaction_id;
94        self
95    }
96
97    /// Sets the constraint validator for schema enforcement.
98    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99        self.validator = Some(validator);
100        self
101    }
102
103    /// Resolves property sources to concrete values for a given row.
104    fn resolve_properties(
105        props: &[(String, PropertySource)],
106        chunk: Option<&DataChunk>,
107        row: usize,
108        store: &dyn GraphStore,
109    ) -> Vec<(String, Value)> {
110        props
111            .iter()
112            .map(|(name, source)| {
113                let value = if let Some(chunk) = chunk {
114                    source.resolve(chunk, row, store)
115                } else {
116                    // Standalone mode: only constants are valid
117                    match source {
118                        PropertySource::Constant(v) => v.clone(),
119                        _ => Value::Null,
120                    }
121                };
122                (name.clone(), value)
123            })
124            .collect()
125    }
126
127    /// Tries to find a matching node with the given resolved properties.
128    fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129        // Use a property index when available to avoid a full label scan.
130        // Null conditions are excluded from the index query and verified in the loop.
131        let use_index = resolved_match_props
132            .iter()
133            .any(|(k, v)| !v.is_null() && self.store.has_property_index(k));
134
135        let candidates: Vec<NodeId> = if use_index {
136            let conditions: Vec<(&str, Value)> = resolved_match_props
137                .iter()
138                .filter(|(_, v)| !v.is_null())
139                .map(|(k, v)| (k.as_str(), v.clone()))
140                .collect();
141            self.store.find_nodes_by_properties(&conditions)
142        } else if let Some(first_label) = self.config.labels.first() {
143            self.store.nodes_by_label(first_label)
144        } else {
145            self.store.node_ids()
146        };
147
148        for node_id in candidates {
149            if let Some(node) = self.store.get_node(node_id) {
150                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
151                if !has_all_labels {
152                    continue;
153                }
154
155                let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
156                    let prop = node.properties.get(&PropertyKey::new(key.as_str()));
157                    if expected_value.is_null() {
158                        // Null in a MERGE pattern matches both absent and explicitly null properties
159                        prop.map_or(true, |v| v.is_null())
160                    } else {
161                        prop.is_some_and(|v| v == expected_value)
162                    }
163                });
164
165                if has_all_props {
166                    return Some(node_id);
167                }
168            }
169        }
170
171        None
172    }
173
174    /// Creates a new node with the specified labels and resolved properties.
175    fn create_node(
176        &self,
177        resolved_match_props: &[(String, Value)],
178        resolved_create_props: &[(String, Value)],
179    ) -> Result<NodeId, super::OperatorError> {
180        // Validate constraints before creating the node
181        if let Some(ref validator) = self.validator {
182            validator.validate_node_labels_allowed(&self.config.labels)?;
183
184            let all_props: Vec<(String, Value)> = resolved_match_props
185                .iter()
186                .chain(resolved_create_props.iter())
187                .map(|(k, v)| (k.clone(), v.clone()))
188                .collect();
189            for (name, value) in &all_props {
190                validator.validate_node_property(&self.config.labels, name, value)?;
191                validator.check_unique_node_property(&self.config.labels, name, value)?;
192            }
193            validator.validate_node_complete(&self.config.labels, &all_props)?;
194        }
195
196        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
197            .iter()
198            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
199            .collect();
200
201        for (k, v) in resolved_create_props {
202            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
203                existing.1 = v.clone();
204            } else {
205                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
206            }
207        }
208
209        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
210        Ok(self.store.create_node_with_props(&labels, &all_props))
211    }
212
213    /// Finds or creates a matching node for a single row, applying ON MATCH/ON CREATE.
214    fn merge_node_for_row(
215        &self,
216        chunk: Option<&DataChunk>,
217        row: usize,
218    ) -> Result<NodeId, super::OperatorError> {
219        let store_ref: &dyn GraphStore = self.store.as_ref();
220        let resolved_match =
221            Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
222
223        if let Some(existing_id) = self.find_matching_node(&resolved_match) {
224            let resolved_on_match =
225                Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
226            self.apply_on_match(existing_id, &resolved_on_match)?;
227            Ok(existing_id)
228        } else {
229            let resolved_on_create =
230                Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
231            self.create_node(&resolved_match, &resolved_on_create)
232        }
233    }
234
235    /// Applies ON MATCH properties to an existing node.
236    fn apply_on_match(
237        &self,
238        node_id: NodeId,
239        resolved_on_match: &[(String, Value)],
240    ) -> Result<(), super::OperatorError> {
241        for (key, value) in resolved_on_match {
242            if let Some(ref validator) = self.validator {
243                validator.validate_node_property(&self.config.labels, key, value)?;
244            }
245            if let Some(tid) = self.transaction_id {
246                self.store
247                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
248            } else {
249                self.store
250                    .set_node_property(node_id, key.as_str(), value.clone());
251            }
252        }
253        Ok(())
254    }
255}
256
257impl Operator for MergeOperator {
258    fn next(&mut self) -> OperatorResult {
259        // When we have an input operator, pass through input rows with the
260        // merged node ID appended (used for chained inline MERGE patterns).
261        if let Some(ref mut input) = self.input {
262            if let Some(chunk) = input.next()? {
263                let mut builder =
264                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
265
266                for row in chunk.selected_indices() {
267                    // Reject NULL bound variables (e.g., from unmatched OPTIONAL MATCH)
268                    if let Some(bound_col) = self.config.bound_variable_column {
269                        let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
270                        if is_null {
271                            return Err(super::OperatorError::TypeMismatch {
272                                expected: format!(
273                                    "non-null node for MERGE variable '{}'",
274                                    self.config.variable
275                                ),
276                                found: "NULL".to_string(),
277                            });
278                        }
279                    }
280
281                    // Merge the node per-row: resolve properties from this row
282                    let node_id = self.merge_node_for_row(Some(&chunk), row)?;
283
284                    // Copy input columns to output
285                    for col_idx in 0..chunk.column_count() {
286                        if let (Some(src), Some(dst)) =
287                            (chunk.column(col_idx), builder.column_mut(col_idx))
288                        {
289                            if let Some(val) = src.get_value(row) {
290                                dst.push_value(val);
291                            } else {
292                                dst.push_value(Value::Null);
293                            }
294                        }
295                    }
296
297                    // Append the merged node ID
298                    if let Some(dst) = builder.column_mut(self.config.output_column) {
299                        dst.push_node_id(node_id);
300                    }
301
302                    builder.advance_row();
303                }
304
305                return Ok(Some(builder.finish()));
306            }
307            return Ok(None);
308        }
309
310        // Standalone mode (no input operator)
311        if self.executed {
312            return Ok(None);
313        }
314        self.executed = true;
315
316        let node_id = self.merge_node_for_row(None, 0)?;
317
318        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
319        if let Some(dst) = builder.column_mut(self.config.output_column) {
320            dst.push_node_id(node_id);
321        }
322        builder.advance_row();
323
324        Ok(Some(builder.finish()))
325    }
326
327    fn reset(&mut self) {
328        self.executed = false;
329        if let Some(ref mut input) = self.input {
330            input.reset();
331        }
332    }
333
334    fn name(&self) -> &'static str {
335        "Merge"
336    }
337
338    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
339        self
340    }
341}
342
343/// Configuration for a relationship merge operation.
344pub struct MergeRelationshipConfig {
345    /// Column index for the source node ID in the input.
346    pub source_column: usize,
347    /// Column index for the target node ID in the input.
348    pub target_column: usize,
349    /// Variable name for the source node (for error messages).
350    pub source_variable: String,
351    /// Variable name for the target node (for error messages).
352    pub target_variable: String,
353    /// Relationship type to match/create.
354    pub edge_type: String,
355    /// Properties that must match (also used for creation).
356    pub match_properties: Vec<(String, PropertySource)>,
357    /// Properties to set on CREATE.
358    pub on_create_properties: Vec<(String, PropertySource)>,
359    /// Properties to set on MATCH.
360    pub on_match_properties: Vec<(String, PropertySource)>,
361    /// Output schema (input columns + edge column).
362    pub output_schema: Vec<LogicalType>,
363    /// Column index for the edge variable in the output.
364    pub edge_output_column: usize,
365}
366
367/// Merge operator for relationship patterns.
368///
369/// Takes input rows containing source and target node IDs, then for each row:
370/// 1. Searches for an existing relationship matching the type and properties
371/// 2. If found, applies ON MATCH properties and returns the existing edge
372/// 3. If not found, creates a new relationship and applies ON CREATE properties
373pub struct MergeRelationshipOperator {
374    /// The graph store.
375    store: Arc<dyn GraphStoreMut>,
376    /// Input operator providing rows with source/target node columns.
377    input: Box<dyn Operator>,
378    /// Merge configuration.
379    config: MergeRelationshipConfig,
380    /// Epoch for MVCC versioning.
381    viewing_epoch: Option<EpochId>,
382    /// Transaction ID for undo log tracking.
383    transaction_id: Option<TransactionId>,
384    /// Optional constraint validator for schema enforcement.
385    validator: Option<Arc<dyn ConstraintValidator>>,
386}
387
388impl MergeRelationshipOperator {
389    /// Creates a new merge relationship operator.
390    pub fn new(
391        store: Arc<dyn GraphStoreMut>,
392        input: Box<dyn Operator>,
393        config: MergeRelationshipConfig,
394    ) -> Self {
395        Self {
396            store,
397            input,
398            config,
399            viewing_epoch: None,
400            transaction_id: None,
401            validator: None,
402        }
403    }
404
405    /// Sets the transaction context for versioned mutations.
406    pub fn with_transaction_context(
407        mut self,
408        epoch: EpochId,
409        transaction_id: Option<TransactionId>,
410    ) -> Self {
411        self.viewing_epoch = Some(epoch);
412        self.transaction_id = transaction_id;
413        self
414    }
415
416    /// Sets the constraint validator for schema enforcement.
417    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
418        self.validator = Some(validator);
419        self
420    }
421
422    /// Tries to find a matching relationship between source and target.
423    fn find_matching_edge(
424        &self,
425        src: NodeId,
426        dst: NodeId,
427        resolved_match_props: &[(String, Value)],
428    ) -> Option<EdgeId> {
429        use crate::graph::Direction;
430
431        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
432            if target != dst {
433                continue;
434            }
435
436            if let Some(edge) = self.store.get_edge(edge_id) {
437                if edge.edge_type.as_str() != self.config.edge_type {
438                    continue;
439                }
440
441                let has_all_props = resolved_match_props
442                    .iter()
443                    .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
444
445                if has_all_props {
446                    return Some(edge_id);
447                }
448            }
449        }
450
451        None
452    }
453
454    /// Creates a new edge with resolved match and on_create properties.
455    fn create_edge(
456        &self,
457        src: NodeId,
458        dst: NodeId,
459        resolved_match_props: &[(String, Value)],
460        resolved_create_props: &[(String, Value)],
461    ) -> Result<EdgeId, super::OperatorError> {
462        // Validate constraints before creating the edge
463        if let Some(ref validator) = self.validator {
464            validator.validate_edge_type_allowed(&self.config.edge_type)?;
465
466            let all_props: Vec<(String, Value)> = resolved_match_props
467                .iter()
468                .chain(resolved_create_props.iter())
469                .map(|(k, v)| (k.clone(), v.clone()))
470                .collect();
471            for (name, value) in &all_props {
472                validator.validate_edge_property(&self.config.edge_type, name, value)?;
473            }
474            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
475        }
476
477        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
478            .iter()
479            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
480            .collect();
481
482        for (k, v) in resolved_create_props {
483            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
484                existing.1 = v.clone();
485            } else {
486                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
487            }
488        }
489
490        Ok(self
491            .store
492            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
493    }
494
495    /// Applies ON MATCH properties to an existing edge.
496    fn apply_on_match_edge(
497        &self,
498        edge_id: EdgeId,
499        resolved_on_match: &[(String, Value)],
500    ) -> Result<(), super::OperatorError> {
501        for (key, value) in resolved_on_match {
502            if let Some(ref validator) = self.validator {
503                validator.validate_edge_property(&self.config.edge_type, key, value)?;
504            }
505            if let Some(tid) = self.transaction_id {
506                self.store
507                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
508            } else {
509                self.store
510                    .set_edge_property(edge_id, key.as_str(), value.clone());
511            }
512        }
513        Ok(())
514    }
515}
516
517impl Operator for MergeRelationshipOperator {
518    fn next(&mut self) -> OperatorResult {
519        use super::OperatorError;
520
521        if let Some(chunk) = self.input.next()? {
522            let mut builder =
523                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
524
525            for row in chunk.selected_indices() {
526                let src_val = chunk
527                    .column(self.config.source_column)
528                    .and_then(|c| c.get_node_id(row))
529                    .ok_or_else(|| OperatorError::TypeMismatch {
530                        expected: format!(
531                            "non-null node for MERGE variable '{}'",
532                            self.config.source_variable
533                        ),
534                        found: "NULL".to_string(),
535                    })?;
536
537                let dst_val = chunk
538                    .column(self.config.target_column)
539                    .and_then(|c| c.get_node_id(row))
540                    .ok_or_else(|| OperatorError::TypeMismatch {
541                        expected: format!(
542                            "non-null node for MERGE variable '{}'",
543                            self.config.target_variable
544                        ),
545                        found: "None".to_string(),
546                    })?;
547
548                let store_ref: &dyn GraphStore = self.store.as_ref();
549                let resolved_match = MergeOperator::resolve_properties(
550                    &self.config.match_properties,
551                    Some(&chunk),
552                    row,
553                    store_ref,
554                );
555
556                let edge_id = if let Some(existing) =
557                    self.find_matching_edge(src_val, dst_val, &resolved_match)
558                {
559                    let resolved_on_match = MergeOperator::resolve_properties(
560                        &self.config.on_match_properties,
561                        Some(&chunk),
562                        row,
563                        store_ref,
564                    );
565                    self.apply_on_match_edge(existing, &resolved_on_match)?;
566                    existing
567                } else {
568                    let resolved_on_create = MergeOperator::resolve_properties(
569                        &self.config.on_create_properties,
570                        Some(&chunk),
571                        row,
572                        store_ref,
573                    );
574                    self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
575                };
576
577                // Copy input columns to output, then add the edge column
578                for col_idx in 0..self.config.output_schema.len() {
579                    if col_idx == self.config.edge_output_column {
580                        if let Some(dst_col) = builder.column_mut(col_idx) {
581                            dst_col.push_edge_id(edge_id);
582                        }
583                    } else if let (Some(src_col), Some(dst_col)) =
584                        (chunk.column(col_idx), builder.column_mut(col_idx))
585                        && let Some(val) = src_col.get_value(row)
586                    {
587                        dst_col.push_value(val);
588                    }
589                }
590
591                builder.advance_row();
592            }
593
594            return Ok(Some(builder.finish()));
595        }
596
597        Ok(None)
598    }
599
600    fn reset(&mut self) {
601        self.input.reset();
602    }
603
604    fn name(&self) -> &'static str {
605        "MergeRelationship"
606    }
607
608    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
609        self
610    }
611}
612
613#[cfg(all(test, feature = "lpg"))]
614mod tests {
615    use super::*;
616    use crate::graph::lpg::LpgStore;
617
618    fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
619        props
620            .into_iter()
621            .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
622            .collect()
623    }
624
625    #[test]
626    fn test_merge_creates_new_node() {
627        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
628
629        // MERGE should create a new node since none exists
630        let mut merge = MergeOperator::new(
631            Arc::clone(&store),
632            None,
633            MergeConfig {
634                variable: "n".to_string(),
635                labels: vec!["Person".to_string()],
636                match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
637                on_create_properties: vec![],
638                on_match_properties: vec![],
639                output_schema: vec![LogicalType::Node],
640                output_column: 0,
641                bound_variable_column: None,
642            },
643        );
644
645        let result = merge.next().unwrap();
646        assert!(result.is_some());
647
648        // Verify node was created
649        let nodes = store.nodes_by_label("Person");
650        assert_eq!(nodes.len(), 1);
651
652        let node = store.get_node(nodes[0]).unwrap();
653        assert!(node.has_label("Person"));
654        assert_eq!(
655            node.properties.get(&PropertyKey::new("name")),
656            Some(&Value::String("Alix".into()))
657        );
658    }
659
660    #[test]
661    fn test_merge_matches_existing_node() {
662        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
663
664        // Create an existing node
665        store.create_node_with_props(
666            &["Person"],
667            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
668        );
669
670        // MERGE should find the existing node
671        let mut merge = MergeOperator::new(
672            Arc::clone(&store),
673            None,
674            MergeConfig {
675                variable: "n".to_string(),
676                labels: vec!["Person".to_string()],
677                match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
678                on_create_properties: vec![],
679                on_match_properties: vec![],
680                output_schema: vec![LogicalType::Node],
681                output_column: 0,
682                bound_variable_column: None,
683            },
684        );
685
686        let result = merge.next().unwrap();
687        assert!(result.is_some());
688
689        // Verify only one node exists (no new node created)
690        let nodes = store.nodes_by_label("Person");
691        assert_eq!(nodes.len(), 1);
692    }
693
694    #[test]
695    fn test_merge_with_on_create() {
696        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
697
698        // MERGE with ON CREATE SET
699        let mut merge = MergeOperator::new(
700            Arc::clone(&store),
701            None,
702            MergeConfig {
703                variable: "n".to_string(),
704                labels: vec!["Person".to_string()],
705                match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
706                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
707                on_match_properties: vec![],
708                output_schema: vec![LogicalType::Node],
709                output_column: 0,
710                bound_variable_column: None,
711            },
712        );
713
714        let _ = merge.next().unwrap();
715
716        // Verify node has both match properties and on_create properties
717        let nodes = store.nodes_by_label("Person");
718        let node = store.get_node(nodes[0]).unwrap();
719        assert_eq!(
720            node.properties.get(&PropertyKey::new("name")),
721            Some(&Value::String("Vincent".into()))
722        );
723        assert_eq!(
724            node.properties.get(&PropertyKey::new("created")),
725            Some(&Value::Bool(true))
726        );
727    }
728
729    #[test]
730    fn test_merge_with_on_match() {
731        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
732
733        // Create an existing node
734        let node_id = store.create_node_with_props(
735            &["Person"],
736            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
737        );
738
739        // MERGE with ON MATCH SET
740        let mut merge = MergeOperator::new(
741            Arc::clone(&store),
742            None,
743            MergeConfig {
744                variable: "n".to_string(),
745                labels: vec!["Person".to_string()],
746                match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
747                on_create_properties: vec![],
748                on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
749                output_schema: vec![LogicalType::Node],
750                output_column: 0,
751                bound_variable_column: None,
752            },
753        );
754
755        let _ = merge.next().unwrap();
756
757        // Verify node has the on_match property added
758        let node = store.get_node(node_id).unwrap();
759        assert_eq!(
760            node.properties.get(&PropertyKey::new("updated")),
761            Some(&Value::Bool(true))
762        );
763    }
764
765    #[test]
766    fn test_merge_uses_property_index() {
767        let lpg_store = Arc::new(LpgStore::new().unwrap());
768        lpg_store.create_property_index("name");
769        assert!(lpg_store.has_property_index("name"));
770
771        // Use the trait object for node creation so the &[(PropertyKey, Value)] signature applies.
772        let store: Arc<dyn GraphStoreMut> = lpg_store;
773
774        for i in 0..50u32 {
775            store.create_node_with_props(
776                &["Person"],
777                &[(
778                    PropertyKey::new("name"),
779                    Value::String(format!("person_{i}").into()),
780                )],
781            );
782        }
783
784        let target_id = store.create_node_with_props(
785            &["Person"],
786            &[(PropertyKey::new("name"), Value::String("Beatrix".into()))],
787        );
788
789        // MERGE should find the existing node via index lookup
790        let mut merge = MergeOperator::new(
791            Arc::clone(&store),
792            None,
793            MergeConfig {
794                variable: "n".to_string(),
795                labels: vec!["Person".to_string()],
796                match_properties: const_props(vec![("name", Value::String("Beatrix".into()))]),
797                on_create_properties: vec![],
798                on_match_properties: const_props(vec![("found", Value::Bool(true))]),
799                output_schema: vec![LogicalType::Node],
800                output_column: 0,
801                bound_variable_column: None,
802            },
803        );
804
805        let result = merge.next().unwrap();
806        assert!(result.is_some());
807
808        // ON MATCH should have fired on the correct node
809        let node = store.get_node(target_id).unwrap();
810        assert_eq!(
811            node.properties.get(&PropertyKey::new("found")),
812            Some(&Value::Bool(true))
813        );
814
815        // No new node should have been created
816        let persons = store.nodes_by_label("Person");
817        assert_eq!(persons.len(), 51);
818    }
819
820    #[test]
821    fn test_merge_creates_via_index_miss() {
822        let lpg_store = Arc::new(LpgStore::new().unwrap());
823        lpg_store.create_property_index("name");
824
825        let store: Arc<dyn GraphStoreMut> = lpg_store;
826
827        store.create_node_with_props(
828            &["Person"],
829            &[(PropertyKey::new("name"), Value::String("Django".into()))],
830        );
831
832        // MERGE for a name not in the index — should create
833        let mut merge = MergeOperator::new(
834            Arc::clone(&store),
835            None,
836            MergeConfig {
837                variable: "n".to_string(),
838                labels: vec!["Person".to_string()],
839                match_properties: const_props(vec![("name", Value::String("Shosanna".into()))]),
840                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
841                on_match_properties: vec![],
842                output_schema: vec![LogicalType::Node],
843                output_column: 0,
844                bound_variable_column: None,
845            },
846        );
847
848        let result = merge.next().unwrap();
849        assert!(result.is_some());
850
851        let persons = store.nodes_by_label("Person");
852        assert_eq!(persons.len(), 2);
853
854        let new_nodes: Vec<_> = persons
855            .iter()
856            .filter_map(|&id| store.get_node(id))
857            .filter(|n| {
858                n.properties.get(&PropertyKey::new("name"))
859                    == Some(&Value::String("Shosanna".into()))
860            })
861            .collect();
862        assert_eq!(new_nodes.len(), 1);
863        assert_eq!(
864            new_nodes[0].properties.get(&PropertyKey::new("created")),
865            Some(&Value::Bool(true))
866        );
867    }
868
869    #[test]
870    fn test_merge_into_any() {
871        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
872        let op = MergeOperator::new(
873            Arc::clone(&store),
874            None,
875            MergeConfig {
876                variable: "n".to_string(),
877                labels: vec!["Person".to_string()],
878                match_properties: vec![],
879                on_create_properties: vec![],
880                on_match_properties: vec![],
881                output_schema: vec![LogicalType::Node],
882                output_column: 0,
883                bound_variable_column: None,
884            },
885        );
886        let any = Box::new(op).into_any();
887        assert!(any.downcast::<MergeOperator>().is_ok());
888    }
889}