Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
9use crate::execution::chunk::{DataChunk, DataChunkBuilder};
10use crate::graph::{GraphStore, GraphStoreMut};
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14use std::sync::Arc;
15
16/// Configuration for a node merge operation.
17pub struct MergeConfig {
18    /// Variable name for the merged node.
19    pub variable: String,
20    /// Labels to match/create.
21    pub labels: Vec<String>,
22    /// Properties that must match (also used for creation).
23    pub match_properties: Vec<(String, PropertySource)>,
24    /// Properties to set on CREATE.
25    pub on_create_properties: Vec<(String, PropertySource)>,
26    /// Properties to set on MATCH.
27    pub on_match_properties: Vec<(String, PropertySource)>,
28    /// Output schema (input columns + node column).
29    pub output_schema: Vec<LogicalType>,
30    /// Column index where the merged node ID is placed.
31    pub output_column: usize,
32    /// If the merge variable was already bound in the input, this column index
33    /// is used to detect NULL references (e.g., from unmatched OPTIONAL MATCH).
34    /// `None` for standalone MERGE that introduces a new variable.
35    pub bound_variable_column: Option<usize>,
36}
37
38/// Merge operator for MERGE clause.
39///
40/// Tries to match a node with the given labels and properties.
41/// If found, returns the existing node. If not found, creates a new node.
42///
43/// When an input operator is provided (chained MERGE), input rows are
44/// passed through with the merged node ID appended as an additional column.
45pub struct MergeOperator {
46    /// The graph store.
47    store: Arc<dyn GraphStoreMut>,
48    /// Optional input operator (for chained MERGE patterns).
49    input: Option<Box<dyn Operator>>,
50    /// Merge configuration.
51    config: MergeConfig,
52    /// Whether we've already executed (standalone mode only).
53    executed: bool,
54    /// Epoch for MVCC versioning.
55    viewing_epoch: Option<EpochId>,
56    /// Transaction ID for undo log tracking.
57    transaction_id: Option<TransactionId>,
58    /// Optional constraint validator for schema enforcement.
59    validator: Option<Arc<dyn ConstraintValidator>>,
60}
61
62impl MergeOperator {
63    /// Creates a new merge operator.
64    pub fn new(
65        store: Arc<dyn GraphStoreMut>,
66        input: Option<Box<dyn Operator>>,
67        config: MergeConfig,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            config,
73            executed: false,
74            viewing_epoch: None,
75            transaction_id: None,
76            validator: None,
77        }
78    }
79
80    /// Returns the variable name for the merged node.
81    #[must_use]
82    pub fn variable(&self) -> &str {
83        &self.config.variable
84    }
85
86    /// Sets the transaction context for versioned mutations.
87    pub fn with_transaction_context(
88        mut self,
89        epoch: EpochId,
90        transaction_id: Option<TransactionId>,
91    ) -> Self {
92        self.viewing_epoch = Some(epoch);
93        self.transaction_id = transaction_id;
94        self
95    }
96
97    /// Sets the constraint validator for schema enforcement.
98    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
99        self.validator = Some(validator);
100        self
101    }
102
103    /// Resolves property sources to concrete values for a given row.
104    fn resolve_properties(
105        props: &[(String, PropertySource)],
106        chunk: Option<&DataChunk>,
107        row: usize,
108        store: &dyn GraphStore,
109    ) -> Vec<(String, Value)> {
110        props
111            .iter()
112            .map(|(name, source)| {
113                let value = if let Some(chunk) = chunk {
114                    source.resolve(chunk, row, store)
115                } else {
116                    // Standalone mode: only constants are valid
117                    match source {
118                        PropertySource::Constant(v) => v.clone(),
119                        _ => Value::Null,
120                    }
121                };
122                (name.clone(), value)
123            })
124            .collect()
125    }
126
127    /// Tries to find a matching node with the given resolved properties.
128    fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
129        let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
130            self.store.nodes_by_label(first_label)
131        } else {
132            self.store.node_ids()
133        };
134
135        for node_id in candidates {
136            if let Some(node) = self.store.get_node(node_id) {
137                let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
138                if !has_all_labels {
139                    continue;
140                }
141
142                let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
143                    let prop = node.properties.get(&PropertyKey::new(key.as_str()));
144                    if expected_value.is_null() {
145                        // Null in a MERGE pattern matches both absent and explicitly null properties
146                        prop.map_or(true, |v| v.is_null())
147                    } else {
148                        prop.is_some_and(|v| v == expected_value)
149                    }
150                });
151
152                if has_all_props {
153                    return Some(node_id);
154                }
155            }
156        }
157
158        None
159    }
160
161    /// Creates a new node with the specified labels and resolved properties.
162    fn create_node(
163        &self,
164        resolved_match_props: &[(String, Value)],
165        resolved_create_props: &[(String, Value)],
166    ) -> Result<NodeId, super::OperatorError> {
167        // Validate constraints before creating the node
168        if let Some(ref validator) = self.validator {
169            validator.validate_node_labels_allowed(&self.config.labels)?;
170
171            let all_props: Vec<(String, Value)> = resolved_match_props
172                .iter()
173                .chain(resolved_create_props.iter())
174                .map(|(k, v)| (k.clone(), v.clone()))
175                .collect();
176            for (name, value) in &all_props {
177                validator.validate_node_property(&self.config.labels, name, value)?;
178                validator.check_unique_node_property(&self.config.labels, name, value)?;
179            }
180            validator.validate_node_complete(&self.config.labels, &all_props)?;
181        }
182
183        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
184            .iter()
185            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
186            .collect();
187
188        for (k, v) in resolved_create_props {
189            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
190                existing.1 = v.clone();
191            } else {
192                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
193            }
194        }
195
196        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
197        Ok(self.store.create_node_with_props(&labels, &all_props))
198    }
199
200    /// Finds or creates a matching node for a single row, applying ON MATCH/ON CREATE.
201    fn merge_node_for_row(
202        &self,
203        chunk: Option<&DataChunk>,
204        row: usize,
205    ) -> Result<NodeId, super::OperatorError> {
206        let store_ref: &dyn GraphStore = self.store.as_ref();
207        let resolved_match =
208            Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
209
210        if let Some(existing_id) = self.find_matching_node(&resolved_match) {
211            let resolved_on_match =
212                Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
213            self.apply_on_match(existing_id, &resolved_on_match)?;
214            Ok(existing_id)
215        } else {
216            let resolved_on_create =
217                Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
218            self.create_node(&resolved_match, &resolved_on_create)
219        }
220    }
221
222    /// Applies ON MATCH properties to an existing node.
223    fn apply_on_match(
224        &self,
225        node_id: NodeId,
226        resolved_on_match: &[(String, Value)],
227    ) -> Result<(), super::OperatorError> {
228        for (key, value) in resolved_on_match {
229            if let Some(ref validator) = self.validator {
230                validator.validate_node_property(&self.config.labels, key, value)?;
231            }
232            if let Some(tid) = self.transaction_id {
233                self.store
234                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
235            } else {
236                self.store
237                    .set_node_property(node_id, key.as_str(), value.clone());
238            }
239        }
240        Ok(())
241    }
242}
243
244impl Operator for MergeOperator {
245    fn next(&mut self) -> OperatorResult {
246        // When we have an input operator, pass through input rows with the
247        // merged node ID appended (used for chained inline MERGE patterns).
248        if let Some(ref mut input) = self.input {
249            if let Some(chunk) = input.next()? {
250                let mut builder =
251                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
252
253                for row in chunk.selected_indices() {
254                    // Reject NULL bound variables (e.g., from unmatched OPTIONAL MATCH)
255                    if let Some(bound_col) = self.config.bound_variable_column {
256                        let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
257                        if is_null {
258                            return Err(super::OperatorError::TypeMismatch {
259                                expected: format!(
260                                    "non-null node for MERGE variable '{}'",
261                                    self.config.variable
262                                ),
263                                found: "NULL".to_string(),
264                            });
265                        }
266                    }
267
268                    // Merge the node per-row: resolve properties from this row
269                    let node_id = self.merge_node_for_row(Some(&chunk), row)?;
270
271                    // Copy input columns to output
272                    for col_idx in 0..chunk.column_count() {
273                        if let (Some(src), Some(dst)) =
274                            (chunk.column(col_idx), builder.column_mut(col_idx))
275                        {
276                            if let Some(val) = src.get_value(row) {
277                                dst.push_value(val);
278                            } else {
279                                dst.push_value(Value::Null);
280                            }
281                        }
282                    }
283
284                    // Append the merged node ID
285                    if let Some(dst) = builder.column_mut(self.config.output_column) {
286                        dst.push_node_id(node_id);
287                    }
288
289                    builder.advance_row();
290                }
291
292                return Ok(Some(builder.finish()));
293            }
294            return Ok(None);
295        }
296
297        // Standalone mode (no input operator)
298        if self.executed {
299            return Ok(None);
300        }
301        self.executed = true;
302
303        let node_id = self.merge_node_for_row(None, 0)?;
304
305        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
306        if let Some(dst) = builder.column_mut(self.config.output_column) {
307            dst.push_node_id(node_id);
308        }
309        builder.advance_row();
310
311        Ok(Some(builder.finish()))
312    }
313
314    fn reset(&mut self) {
315        self.executed = false;
316        if let Some(ref mut input) = self.input {
317            input.reset();
318        }
319    }
320
321    fn name(&self) -> &'static str {
322        "Merge"
323    }
324
325    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
326        self
327    }
328}
329
330/// Configuration for a relationship merge operation.
331pub struct MergeRelationshipConfig {
332    /// Column index for the source node ID in the input.
333    pub source_column: usize,
334    /// Column index for the target node ID in the input.
335    pub target_column: usize,
336    /// Variable name for the source node (for error messages).
337    pub source_variable: String,
338    /// Variable name for the target node (for error messages).
339    pub target_variable: String,
340    /// Relationship type to match/create.
341    pub edge_type: String,
342    /// Properties that must match (also used for creation).
343    pub match_properties: Vec<(String, PropertySource)>,
344    /// Properties to set on CREATE.
345    pub on_create_properties: Vec<(String, PropertySource)>,
346    /// Properties to set on MATCH.
347    pub on_match_properties: Vec<(String, PropertySource)>,
348    /// Output schema (input columns + edge column).
349    pub output_schema: Vec<LogicalType>,
350    /// Column index for the edge variable in the output.
351    pub edge_output_column: usize,
352}
353
354/// Merge operator for relationship patterns.
355///
356/// Takes input rows containing source and target node IDs, then for each row:
357/// 1. Searches for an existing relationship matching the type and properties
358/// 2. If found, applies ON MATCH properties and returns the existing edge
359/// 3. If not found, creates a new relationship and applies ON CREATE properties
360pub struct MergeRelationshipOperator {
361    /// The graph store.
362    store: Arc<dyn GraphStoreMut>,
363    /// Input operator providing rows with source/target node columns.
364    input: Box<dyn Operator>,
365    /// Merge configuration.
366    config: MergeRelationshipConfig,
367    /// Epoch for MVCC versioning.
368    viewing_epoch: Option<EpochId>,
369    /// Transaction ID for undo log tracking.
370    transaction_id: Option<TransactionId>,
371    /// Optional constraint validator for schema enforcement.
372    validator: Option<Arc<dyn ConstraintValidator>>,
373}
374
375impl MergeRelationshipOperator {
376    /// Creates a new merge relationship operator.
377    pub fn new(
378        store: Arc<dyn GraphStoreMut>,
379        input: Box<dyn Operator>,
380        config: MergeRelationshipConfig,
381    ) -> Self {
382        Self {
383            store,
384            input,
385            config,
386            viewing_epoch: None,
387            transaction_id: None,
388            validator: None,
389        }
390    }
391
392    /// Sets the transaction context for versioned mutations.
393    pub fn with_transaction_context(
394        mut self,
395        epoch: EpochId,
396        transaction_id: Option<TransactionId>,
397    ) -> Self {
398        self.viewing_epoch = Some(epoch);
399        self.transaction_id = transaction_id;
400        self
401    }
402
403    /// Sets the constraint validator for schema enforcement.
404    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
405        self.validator = Some(validator);
406        self
407    }
408
409    /// Tries to find a matching relationship between source and target.
410    fn find_matching_edge(
411        &self,
412        src: NodeId,
413        dst: NodeId,
414        resolved_match_props: &[(String, Value)],
415    ) -> Option<EdgeId> {
416        use crate::graph::Direction;
417
418        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
419            if target != dst {
420                continue;
421            }
422
423            if let Some(edge) = self.store.get_edge(edge_id) {
424                if edge.edge_type.as_str() != self.config.edge_type {
425                    continue;
426                }
427
428                let has_all_props = resolved_match_props
429                    .iter()
430                    .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
431
432                if has_all_props {
433                    return Some(edge_id);
434                }
435            }
436        }
437
438        None
439    }
440
441    /// Creates a new edge with resolved match and on_create properties.
442    fn create_edge(
443        &self,
444        src: NodeId,
445        dst: NodeId,
446        resolved_match_props: &[(String, Value)],
447        resolved_create_props: &[(String, Value)],
448    ) -> Result<EdgeId, super::OperatorError> {
449        // Validate constraints before creating the edge
450        if let Some(ref validator) = self.validator {
451            validator.validate_edge_type_allowed(&self.config.edge_type)?;
452
453            let all_props: Vec<(String, Value)> = resolved_match_props
454                .iter()
455                .chain(resolved_create_props.iter())
456                .map(|(k, v)| (k.clone(), v.clone()))
457                .collect();
458            for (name, value) in &all_props {
459                validator.validate_edge_property(&self.config.edge_type, name, value)?;
460            }
461            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
462        }
463
464        let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
465            .iter()
466            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
467            .collect();
468
469        for (k, v) in resolved_create_props {
470            if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
471                existing.1 = v.clone();
472            } else {
473                all_props.push((PropertyKey::new(k.as_str()), v.clone()));
474            }
475        }
476
477        Ok(self
478            .store
479            .create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
480    }
481
482    /// Applies ON MATCH properties to an existing edge.
483    fn apply_on_match_edge(
484        &self,
485        edge_id: EdgeId,
486        resolved_on_match: &[(String, Value)],
487    ) -> Result<(), super::OperatorError> {
488        for (key, value) in resolved_on_match {
489            if let Some(ref validator) = self.validator {
490                validator.validate_edge_property(&self.config.edge_type, key, value)?;
491            }
492            if let Some(tid) = self.transaction_id {
493                self.store
494                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
495            } else {
496                self.store
497                    .set_edge_property(edge_id, key.as_str(), value.clone());
498            }
499        }
500        Ok(())
501    }
502}
503
504impl Operator for MergeRelationshipOperator {
505    fn next(&mut self) -> OperatorResult {
506        use super::OperatorError;
507
508        if let Some(chunk) = self.input.next()? {
509            let mut builder =
510                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
511
512            for row in chunk.selected_indices() {
513                let src_val = chunk
514                    .column(self.config.source_column)
515                    .and_then(|c| c.get_node_id(row))
516                    .ok_or_else(|| OperatorError::TypeMismatch {
517                        expected: format!(
518                            "non-null node for MERGE variable '{}'",
519                            self.config.source_variable
520                        ),
521                        found: "NULL".to_string(),
522                    })?;
523
524                let dst_val = chunk
525                    .column(self.config.target_column)
526                    .and_then(|c| c.get_node_id(row))
527                    .ok_or_else(|| OperatorError::TypeMismatch {
528                        expected: format!(
529                            "non-null node for MERGE variable '{}'",
530                            self.config.target_variable
531                        ),
532                        found: "None".to_string(),
533                    })?;
534
535                let store_ref: &dyn GraphStore = self.store.as_ref();
536                let resolved_match = MergeOperator::resolve_properties(
537                    &self.config.match_properties,
538                    Some(&chunk),
539                    row,
540                    store_ref,
541                );
542
543                let edge_id = if let Some(existing) =
544                    self.find_matching_edge(src_val, dst_val, &resolved_match)
545                {
546                    let resolved_on_match = MergeOperator::resolve_properties(
547                        &self.config.on_match_properties,
548                        Some(&chunk),
549                        row,
550                        store_ref,
551                    );
552                    self.apply_on_match_edge(existing, &resolved_on_match)?;
553                    existing
554                } else {
555                    let resolved_on_create = MergeOperator::resolve_properties(
556                        &self.config.on_create_properties,
557                        Some(&chunk),
558                        row,
559                        store_ref,
560                    );
561                    self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
562                };
563
564                // Copy input columns to output, then add the edge column
565                for col_idx in 0..self.config.output_schema.len() {
566                    if col_idx == self.config.edge_output_column {
567                        if let Some(dst_col) = builder.column_mut(col_idx) {
568                            dst_col.push_edge_id(edge_id);
569                        }
570                    } else if let (Some(src_col), Some(dst_col)) =
571                        (chunk.column(col_idx), builder.column_mut(col_idx))
572                        && let Some(val) = src_col.get_value(row)
573                    {
574                        dst_col.push_value(val);
575                    }
576                }
577
578                builder.advance_row();
579            }
580
581            return Ok(Some(builder.finish()));
582        }
583
584        Ok(None)
585    }
586
587    fn reset(&mut self) {
588        self.input.reset();
589    }
590
591    fn name(&self) -> &'static str {
592        "MergeRelationship"
593    }
594
595    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
596        self
597    }
598}
599
600#[cfg(all(test, feature = "lpg"))]
601mod tests {
602    use super::*;
603    use crate::graph::lpg::LpgStore;
604
605    fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
606        props
607            .into_iter()
608            .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
609            .collect()
610    }
611
612    #[test]
613    fn test_merge_creates_new_node() {
614        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
615
616        // MERGE should create a new node since none exists
617        let mut merge = MergeOperator::new(
618            Arc::clone(&store),
619            None,
620            MergeConfig {
621                variable: "n".to_string(),
622                labels: vec!["Person".to_string()],
623                match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
624                on_create_properties: vec![],
625                on_match_properties: vec![],
626                output_schema: vec![LogicalType::Node],
627                output_column: 0,
628                bound_variable_column: None,
629            },
630        );
631
632        let result = merge.next().unwrap();
633        assert!(result.is_some());
634
635        // Verify node was created
636        let nodes = store.nodes_by_label("Person");
637        assert_eq!(nodes.len(), 1);
638
639        let node = store.get_node(nodes[0]).unwrap();
640        assert!(node.has_label("Person"));
641        assert_eq!(
642            node.properties.get(&PropertyKey::new("name")),
643            Some(&Value::String("Alix".into()))
644        );
645    }
646
647    #[test]
648    fn test_merge_matches_existing_node() {
649        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
650
651        // Create an existing node
652        store.create_node_with_props(
653            &["Person"],
654            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
655        );
656
657        // MERGE should find the existing node
658        let mut merge = MergeOperator::new(
659            Arc::clone(&store),
660            None,
661            MergeConfig {
662                variable: "n".to_string(),
663                labels: vec!["Person".to_string()],
664                match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
665                on_create_properties: vec![],
666                on_match_properties: vec![],
667                output_schema: vec![LogicalType::Node],
668                output_column: 0,
669                bound_variable_column: None,
670            },
671        );
672
673        let result = merge.next().unwrap();
674        assert!(result.is_some());
675
676        // Verify only one node exists (no new node created)
677        let nodes = store.nodes_by_label("Person");
678        assert_eq!(nodes.len(), 1);
679    }
680
681    #[test]
682    fn test_merge_with_on_create() {
683        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
684
685        // MERGE with ON CREATE SET
686        let mut merge = MergeOperator::new(
687            Arc::clone(&store),
688            None,
689            MergeConfig {
690                variable: "n".to_string(),
691                labels: vec!["Person".to_string()],
692                match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
693                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
694                on_match_properties: vec![],
695                output_schema: vec![LogicalType::Node],
696                output_column: 0,
697                bound_variable_column: None,
698            },
699        );
700
701        let _ = merge.next().unwrap();
702
703        // Verify node has both match properties and on_create properties
704        let nodes = store.nodes_by_label("Person");
705        let node = store.get_node(nodes[0]).unwrap();
706        assert_eq!(
707            node.properties.get(&PropertyKey::new("name")),
708            Some(&Value::String("Vincent".into()))
709        );
710        assert_eq!(
711            node.properties.get(&PropertyKey::new("created")),
712            Some(&Value::Bool(true))
713        );
714    }
715
716    #[test]
717    fn test_merge_with_on_match() {
718        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
719
720        // Create an existing node
721        let node_id = store.create_node_with_props(
722            &["Person"],
723            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
724        );
725
726        // MERGE with ON MATCH SET
727        let mut merge = MergeOperator::new(
728            Arc::clone(&store),
729            None,
730            MergeConfig {
731                variable: "n".to_string(),
732                labels: vec!["Person".to_string()],
733                match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
734                on_create_properties: vec![],
735                on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
736                output_schema: vec![LogicalType::Node],
737                output_column: 0,
738                bound_variable_column: None,
739            },
740        );
741
742        let _ = merge.next().unwrap();
743
744        // Verify node has the on_match property added
745        let node = store.get_node(node_id).unwrap();
746        assert_eq!(
747            node.properties.get(&PropertyKey::new("updated")),
748            Some(&Value::Bool(true))
749        );
750    }
751
752    #[test]
753    fn test_merge_into_any() {
754        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
755        let op = MergeOperator::new(
756            Arc::clone(&store),
757            None,
758            MergeConfig {
759                variable: "n".to_string(),
760                labels: vec!["Person".to_string()],
761                match_properties: vec![],
762                on_create_properties: vec![],
763                on_match_properties: vec![],
764                output_schema: vec![LogicalType::Node],
765                output_column: 0,
766                bound_variable_column: None,
767            },
768        );
769        let any = Box::new(op).into_any();
770        assert!(any.downcast::<MergeOperator>().is_ok());
771    }
772}