Skip to main content

grafeo_core/execution/operators/
merge.rs

1//! Merge operator for MERGE clause execution.
2//!
3//! The MERGE operator implements the Cypher MERGE semantics:
4//! 1. Try to match the pattern in the graph
5//! 2. If found, return existing element (optionally apply ON MATCH SET)
6//! 3. If not found, create the element (optionally apply ON CREATE SET)
7
8use super::{
9    ConstraintValidator, ExpressionPredicate, Operator, OperatorResult, PropertySource,
10    SessionContext,
11};
12use crate::execution::chunk::{DataChunk, DataChunkBuilder};
13use crate::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
14use grafeo_common::types::{
15    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
16};
17use std::sync::Arc;
18
19/// Configuration for a node merge operation.
20pub struct MergeConfig {
21    /// Variable name for the merged node.
22    pub variable: String,
23    /// Labels to match/create.
24    pub labels: Vec<String>,
25    /// Properties that must match (also used for creation).
26    pub match_properties: Vec<(String, PropertySource)>,
27    /// Properties to set on CREATE.
28    pub on_create_properties: Vec<(String, PropertySource)>,
29    /// Properties to set on MATCH.
30    pub on_match_properties: Vec<(String, PropertySource)>,
31    /// Output schema (input columns + node column).
32    pub output_schema: Vec<LogicalType>,
33    /// Column index where the merged node ID is placed.
34    pub output_column: usize,
35    /// If the merge variable was already bound in the input, this column index
36    /// is used to detect NULL references (e.g., from unmatched OPTIONAL MATCH).
37    /// `None` for standalone MERGE that introduces a new variable.
38    pub bound_variable_column: Option<usize>,
39}
40
41/// Merge operator for MERGE clause.
42///
43/// Tries to match a node with the given labels and properties.
44/// If found, returns the existing node. If not found, creates a new node.
45///
46/// When an input operator is provided (chained MERGE), input rows are
47/// passed through with the merged node ID appended as an additional column.
48pub struct MergeOperator {
49    /// The graph store.
50    store: Arc<dyn GraphStoreMut>,
51    /// Optional input operator (for chained MERGE patterns).
52    input: Option<Box<dyn Operator>>,
53    /// Merge configuration.
54    config: MergeConfig,
55    /// Whether we've already executed (standalone mode only).
56    executed: bool,
57    /// Epoch for MVCC versioning.
58    viewing_epoch: Option<EpochId>,
59    /// Transaction ID for undo log tracking.
60    transaction_id: Option<TransactionId>,
61    /// Optional constraint validator for schema enforcement.
62    validator: Option<Arc<dyn ConstraintValidator>>,
63    /// Search-store handle used to evaluate `PropertySource::Expression`
64    /// runtime expressions in `ON CREATE` / `ON MATCH SET`. None when no
65    /// expression sources are present (the planner skips threading it).
66    search_store: Option<Arc<dyn GraphStoreSearch>>,
67    /// Session context for expression evaluation (info, schema, etc.).
68    session_context: SessionContext,
69}
70
71impl MergeOperator {
72    /// Creates a new merge operator.
73    pub fn new(
74        store: Arc<dyn GraphStoreMut>,
75        input: Option<Box<dyn Operator>>,
76        config: MergeConfig,
77    ) -> Self {
78        Self {
79            store,
80            input,
81            config,
82            executed: false,
83            viewing_epoch: None,
84            transaction_id: None,
85            validator: None,
86            search_store: None,
87            session_context: SessionContext::default(),
88        }
89    }
90
91    /// Returns the variable name for the merged node.
92    #[must_use]
93    pub fn variable(&self) -> &str {
94        &self.config.variable
95    }
96
97    /// Sets the transaction context for versioned mutations.
98    pub fn with_transaction_context(
99        mut self,
100        epoch: EpochId,
101        transaction_id: Option<TransactionId>,
102    ) -> Self {
103        self.viewing_epoch = Some(epoch);
104        self.transaction_id = transaction_id;
105        self
106    }
107
108    /// Sets the constraint validator for schema enforcement.
109    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
110        self.validator = Some(validator);
111        self
112    }
113
114    /// Provides a search-store handle so `PropertySource::Expression`
115    /// sources in `ON CREATE` / `ON MATCH SET` can be evaluated.
116    #[must_use]
117    pub fn with_search_store(mut self, search_store: Arc<dyn GraphStoreSearch>) -> Self {
118        self.search_store = Some(search_store);
119        self
120    }
121
122    /// Sets the session context used during expression evaluation.
123    #[must_use]
124    pub fn with_session_context(mut self, context: SessionContext) -> Self {
125        self.session_context = context;
126        self
127    }
128
129    /// Resolves property sources to concrete values for a given row.
130    ///
131    /// Skips [`PropertySource::Expression`] sources: those need an augmented
132    /// row containing the merged node/edge and are evaluated separately by
133    /// [`Self::resolve_action_properties`].
134    fn resolve_properties(
135        props: &[(String, PropertySource)],
136        chunk: Option<&DataChunk>,
137        row: usize,
138        store: &dyn GraphStore,
139    ) -> Vec<(String, Value)> {
140        props
141            .iter()
142            .map(|(name, source)| {
143                let value = if let Some(chunk) = chunk {
144                    source.resolve(chunk, row, store)
145                } else {
146                    // Standalone mode: only constants are valid
147                    match source {
148                        PropertySource::Constant(v) => v.clone(),
149                        _ => Value::Null,
150                    }
151                };
152                (name.clone(), value)
153            })
154            .collect()
155    }
156
157    /// True when at least one property source in the slice requires the
158    /// augmented-row evaluation path.
159    fn has_expression_source(props: &[(String, PropertySource)]) -> bool {
160        props
161            .iter()
162            .any(|(_, src)| matches!(src, PropertySource::Expression { .. }))
163    }
164
165    /// Builds a one-row chunk containing the input row plus the merged node
166    /// in the column reserved for the MERGE variable.
167    ///
168    /// Used to evaluate `PropertySource::Expression` sources for ON CREATE /
169    /// ON MATCH SET. The augmented chunk's schema matches `output_schema`.
170    fn build_augmented_node_chunk(
171        &self,
172        chunk: Option<&DataChunk>,
173        row: usize,
174        merged_node: NodeId,
175    ) -> DataChunk {
176        let mut builder = DataChunkBuilder::with_capacity(&self.config.output_schema, 1);
177        if let Some(input) = chunk {
178            for col_idx in 0..input.column_count() {
179                let val = input
180                    .column(col_idx)
181                    .and_then(|c| c.get_value(row))
182                    .unwrap_or(Value::Null);
183                if let Some(dst) = builder.column_mut(col_idx) {
184                    dst.push_value(val);
185                }
186            }
187        }
188        if let Some(dst) = builder.column_mut(self.config.output_column) {
189            dst.push_node_id(merged_node);
190        }
191        builder.advance_row();
192        builder.finish()
193    }
194
195    /// Resolves an action-property source list (ON CREATE or ON MATCH) given
196    /// the merged node id. Lazily builds the augmented chunk only if at least
197    /// one source needs it.
198    ///
199    /// Returns an error only when an expression source is present but no
200    /// search store was attached, which would be a planner/wiring bug.
201    fn resolve_action_properties(
202        &self,
203        props: &[(String, PropertySource)],
204        chunk: Option<&DataChunk>,
205        row: usize,
206        merged_node: NodeId,
207    ) -> Result<Vec<(String, Value)>, super::OperatorError> {
208        if !Self::has_expression_source(props) {
209            // Fast path: no runtime expressions, fall through to the existing
210            // resolver which understands Column/Constant/PropertyAccess.
211            return Ok(Self::resolve_properties(
212                props,
213                chunk,
214                row,
215                self.store.as_ref(),
216            ));
217        }
218
219        let augmented = self.build_augmented_node_chunk(chunk, row, merged_node);
220        let mut out = Vec::with_capacity(props.len());
221        for (name, source) in props {
222            let value = match source {
223                PropertySource::Expression {
224                    expr,
225                    variable_columns,
226                } => {
227                    let search_store = self.search_store.as_ref().ok_or_else(|| {
228                        super::OperatorError::Execution(
229                            "MERGE expression source requires search store; planner did not attach one"
230                                .to_string(),
231                        )
232                    })?;
233                    let mut predicate = ExpressionPredicate::new(
234                        (**expr).clone(),
235                        variable_columns.clone(),
236                        Arc::clone(search_store),
237                    )
238                    .with_session_context(self.session_context.clone());
239                    if let Some(epoch) = self.viewing_epoch {
240                        predicate = predicate.with_transaction_context(epoch, self.transaction_id);
241                    }
242                    predicate.eval_at(&augmented, 0).unwrap_or(Value::Null)
243                }
244                _ => source.resolve(&augmented, 0, self.store.as_ref()),
245            };
246            out.push((name.clone(), value));
247        }
248        Ok(out)
249    }
250
251    /// Tries to find a matching node with the given resolved properties.
252    fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
253        // Use a property index when available to avoid a full label scan.
254        // Null conditions are excluded from the index query and verified in the loop.
255        let use_index = resolved_match_props
256            .iter()
257            .any(|(k, v)| !v.is_null() && self.store.has_property_index(k));
258
259        let candidates: Vec<NodeId> = if use_index {
260            let conditions: Vec<(&str, Value)> = resolved_match_props
261                .iter()
262                .filter(|(_, v)| !v.is_null())
263                .map(|(k, v)| (k.as_str(), v.clone()))
264                .collect();
265            self.store.find_nodes_by_properties(&conditions)
266        } else if let Some(first_label) = self.config.labels.first() {
267            self.store.nodes_by_label(first_label)
268        } else {
269            self.store.node_ids()
270        };
271
272        for node_id in candidates {
273            // Transactional creates write their version at `EpochId::PENDING`,
274            // so the unversioned `get_node` (which checks visibility against
275            // the current real epoch) hides nodes this same transaction has
276            // just created. UNWIND-driven MERGE relies on seeing those rows
277            // to dedupe, so route through the versioned read when we have a
278            // transaction context attached.
279            let node_opt = match (self.viewing_epoch, self.transaction_id) {
280                (Some(epoch), Some(tid)) => self.store.get_node_versioned(node_id, epoch, tid),
281                _ => self.store.get_node(node_id),
282            };
283            let Some(node) = node_opt else { continue };
284
285            let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
286            if !has_all_labels {
287                continue;
288            }
289
290            let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
291                let prop = node.properties.get(&PropertyKey::new(key.as_str()));
292                if expected_value.is_null() {
293                    // Null in a MERGE pattern matches both absent and explicitly null properties
294                    prop.map_or(true, |v| v.is_null())
295                } else {
296                    prop.is_some_and(|v| v == expected_value)
297                }
298            });
299
300            if has_all_props {
301                return Some(node_id);
302            }
303        }
304
305        None
306    }
307
308    /// Merges match and ON CREATE property lists, with ON CREATE values
309    /// overriding match values for the same key.
310    fn merge_node_props(
311        resolved_match_props: &[(String, Value)],
312        resolved_create_props: &[(String, Value)],
313    ) -> Vec<(String, Value)> {
314        let mut merged: Vec<(String, Value)> = resolved_match_props.to_vec();
315        for (k, v) in resolved_create_props {
316            if let Some(existing) = merged.iter_mut().find(|(key, _)| key == k) {
317                existing.1 = v.clone();
318            } else {
319                merged.push((k.clone(), v.clone()));
320            }
321        }
322        merged
323    }
324
325    /// Writes a freshly-created node's properties through the versioned
326    /// API when the operator is participating in a transaction, so that
327    /// rollback can undo them via the MVCC undo log. Falls back to the
328    /// non-versioned setter only when no transaction context is attached
329    /// (test paths and standalone operator construction).
330    fn write_node_props(&self, id: NodeId, props: &[(PropertyKey, Value)]) {
331        if let Some(tid) = self.transaction_id {
332            for (key, value) in props {
333                self.store
334                    .set_node_property_versioned(id, key.as_str(), value.clone(), tid);
335            }
336        } else {
337            for (key, value) in props {
338                self.store
339                    .set_node_property(id, key.as_str(), value.clone());
340            }
341        }
342    }
343
344    /// Creates a node through the versioned API so the create itself is
345    /// tagged with the operator's transaction (when one is attached) and
346    /// can be undone by transaction rollback. The non-versioned
347    /// `create_node_with_props` would tag the create with
348    /// [`TransactionId::SYSTEM`], leaving the node visible after the
349    /// surrounding session transaction rolls back.
350    fn store_create_node(&self, label_refs: &[&str]) -> NodeId {
351        let epoch = self
352            .viewing_epoch
353            .unwrap_or_else(|| self.store.current_epoch());
354        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
355        self.store.create_node_versioned(label_refs, epoch, tx)
356    }
357
358    /// Creates a new node with the specified labels and resolved properties.
359    fn create_node(
360        &self,
361        resolved_match_props: &[(String, Value)],
362        resolved_create_props: &[(String, Value)],
363    ) -> Result<NodeId, super::OperatorError> {
364        let all_props = Self::merge_node_props(resolved_match_props, resolved_create_props);
365
366        // Validate constraints before creating the node
367        if let Some(ref validator) = self.validator {
368            validator.validate_node_labels_allowed(&self.config.labels)?;
369            for (name, value) in &all_props {
370                validator.validate_node_property(&self.config.labels, name, value)?;
371                validator.check_unique_node_property(&self.config.labels, name, value)?;
372            }
373            validator.validate_node_complete(&self.config.labels, &all_props)?;
374        }
375
376        let prop_pairs: Vec<(PropertyKey, Value)> = all_props
377            .into_iter()
378            .map(|(k, v)| (PropertyKey::new(k.as_str()), v))
379            .collect();
380
381        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
382        let id = self.store_create_node(&labels);
383        self.write_node_props(id, &prop_pairs);
384        Ok(id)
385    }
386
387    /// Phase one of the two-phase create path: creates the node from match
388    /// properties only, deferring the completeness check until ON CREATE
389    /// expression properties are resolved (since those properties may
390    /// satisfy NOT NULL / PRIMARY KEY requirements that match props alone
391    /// would fail). Per-property type checks and uniqueness checks for the
392    /// match properties still run here.
393    ///
394    /// Both the create and the property writes go through the versioned
395    /// API, so a failure in phase two (or in `apply_on_match`) is undone
396    /// when the surrounding session transaction rolls back. Without that,
397    /// the node would persist as an orphan visible to later queries.
398    fn create_node_phase_one(
399        &self,
400        resolved_match_props: &[(String, Value)],
401    ) -> Result<NodeId, super::OperatorError> {
402        if let Some(ref validator) = self.validator {
403            validator.validate_node_labels_allowed(&self.config.labels)?;
404            for (name, value) in resolved_match_props {
405                validator.validate_node_property(&self.config.labels, name, value)?;
406                validator.check_unique_node_property(&self.config.labels, name, value)?;
407            }
408        }
409
410        let prop_pairs: Vec<(PropertyKey, Value)> = resolved_match_props
411            .iter()
412            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
413            .collect();
414
415        let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
416        let id = self.store_create_node(&labels);
417        self.write_node_props(id, &prop_pairs);
418        Ok(id)
419    }
420
421    /// Phase two of the two-phase create path: validates ON CREATE
422    /// properties (type, uniqueness) and the full property set
423    /// (completeness) after expressions have been evaluated against the
424    /// freshly created node, but before the values are written. The just
425    /// created node holds only match properties at this point, so a
426    /// uniqueness check on an ON CREATE property cannot conflict with the
427    /// node itself.
428    fn validate_on_create_phase_two(
429        &self,
430        resolved_match_props: &[(String, Value)],
431        resolved_create_props: &[(String, Value)],
432    ) -> Result<(), super::OperatorError> {
433        let Some(ref validator) = self.validator else {
434            return Ok(());
435        };
436        for (name, value) in resolved_create_props {
437            validator.validate_node_property(&self.config.labels, name, value)?;
438            validator.check_unique_node_property(&self.config.labels, name, value)?;
439        }
440        let all_props = Self::merge_node_props(resolved_match_props, resolved_create_props);
441        validator.validate_node_complete(&self.config.labels, &all_props)?;
442        Ok(())
443    }
444
445    /// Finds or creates a matching node for a single row, applying ON MATCH/ON CREATE.
446    fn merge_node_for_row(
447        &self,
448        chunk: Option<&DataChunk>,
449        row: usize,
450    ) -> Result<NodeId, super::OperatorError> {
451        let store_ref: &dyn GraphStore = self.store.as_ref();
452        // Match properties cannot reference the MERGE variable (ISO §15.5),
453        // so they resolve against the input chunk directly.
454        let resolved_match =
455            Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
456
457        if let Some(existing_id) = self.find_matching_node(&resolved_match) {
458            // Resolve ON MATCH SET against an augmented row containing the
459            // matched node id, so `coalesce(n.x, 0)` can read the live value.
460            let resolved_on_match = self.resolve_action_properties(
461                &self.config.on_match_properties,
462                chunk,
463                row,
464                existing_id,
465            )?;
466            self.apply_on_match(existing_id, &resolved_on_match)?;
467            Ok(existing_id)
468        } else if Self::has_expression_source(&self.config.on_create_properties) {
469            // Two-phase create: build the node from match properties first so
470            // the new id exists, then evaluate ON CREATE against an augmented
471            // row referencing it, then write those properties via the same
472            // path used for ON MATCH SET. Completeness and uniqueness on the
473            // ON CREATE properties are validated between phases via
474            // `validate_on_create_phase_two` so neither premature rejection
475            // (when ON CREATE supplies a NOT NULL / PRIMARY KEY property) nor
476            // silent constraint bypass (UNIQUE on an ON CREATE property)
477            // occurs.
478            let new_id = self.create_node_phase_one(&resolved_match)?;
479            let resolved_on_create = self.resolve_action_properties(
480                &self.config.on_create_properties,
481                chunk,
482                row,
483                new_id,
484            )?;
485            self.validate_on_create_phase_two(&resolved_match, &resolved_on_create)?;
486            self.apply_on_match(new_id, &resolved_on_create)?;
487            Ok(new_id)
488        } else {
489            // Fast path: no runtime expressions; create with all properties at once.
490            let resolved_on_create =
491                Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
492            self.create_node(&resolved_match, &resolved_on_create)
493        }
494    }
495
496    /// Applies ON MATCH properties to an existing node.
497    fn apply_on_match(
498        &self,
499        node_id: NodeId,
500        resolved_on_match: &[(String, Value)],
501    ) -> Result<(), super::OperatorError> {
502        for (key, value) in resolved_on_match {
503            if let Some(ref validator) = self.validator {
504                validator.validate_node_property(&self.config.labels, key, value)?;
505            }
506            if let Some(tid) = self.transaction_id {
507                self.store
508                    .set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
509            } else {
510                self.store
511                    .set_node_property(node_id, key.as_str(), value.clone());
512            }
513        }
514        Ok(())
515    }
516}
517
518impl Operator for MergeOperator {
519    fn next(&mut self) -> OperatorResult {
520        // When we have an input operator, pass through input rows with the
521        // merged node ID appended (used for chained inline MERGE patterns).
522        if let Some(ref mut input) = self.input {
523            if let Some(chunk) = input.next()? {
524                let mut builder =
525                    DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
526
527                for row in chunk.selected_indices() {
528                    // Reject NULL bound variables (e.g., from unmatched OPTIONAL MATCH)
529                    if let Some(bound_col) = self.config.bound_variable_column {
530                        let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
531                        if is_null {
532                            return Err(super::OperatorError::TypeMismatch {
533                                expected: format!(
534                                    "non-null node for MERGE variable '{}'",
535                                    self.config.variable
536                                ),
537                                found: "NULL".to_string(),
538                            });
539                        }
540                    }
541
542                    // Merge the node per-row: resolve properties from this row
543                    let node_id = self.merge_node_for_row(Some(&chunk), row)?;
544
545                    // Copy input columns to output
546                    for col_idx in 0..chunk.column_count() {
547                        if let (Some(src), Some(dst)) =
548                            (chunk.column(col_idx), builder.column_mut(col_idx))
549                        {
550                            if let Some(val) = src.get_value(row) {
551                                dst.push_value(val);
552                            } else {
553                                dst.push_value(Value::Null);
554                            }
555                        }
556                    }
557
558                    // Append the merged node ID
559                    if let Some(dst) = builder.column_mut(self.config.output_column) {
560                        dst.push_node_id(node_id);
561                    }
562
563                    builder.advance_row();
564                }
565
566                return Ok(Some(builder.finish()));
567            }
568            return Ok(None);
569        }
570
571        // Standalone mode (no input operator)
572        if self.executed {
573            return Ok(None);
574        }
575        self.executed = true;
576
577        let node_id = self.merge_node_for_row(None, 0)?;
578
579        let mut builder = DataChunkBuilder::new(&self.config.output_schema);
580        if let Some(dst) = builder.column_mut(self.config.output_column) {
581            dst.push_node_id(node_id);
582        }
583        builder.advance_row();
584
585        Ok(Some(builder.finish()))
586    }
587
588    fn reset(&mut self) {
589        self.executed = false;
590        if let Some(ref mut input) = self.input {
591            input.reset();
592        }
593    }
594
595    fn name(&self) -> &'static str {
596        "Merge"
597    }
598
599    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
600        self
601    }
602}
603
604/// Configuration for a relationship merge operation.
605pub struct MergeRelationshipConfig {
606    /// Column index for the source node ID in the input.
607    pub source_column: usize,
608    /// Column index for the target node ID in the input.
609    pub target_column: usize,
610    /// Variable name for the source node (for error messages).
611    pub source_variable: String,
612    /// Variable name for the target node (for error messages).
613    pub target_variable: String,
614    /// Relationship type to match/create.
615    pub edge_type: String,
616    /// Properties that must match (also used for creation).
617    pub match_properties: Vec<(String, PropertySource)>,
618    /// Properties to set on CREATE.
619    pub on_create_properties: Vec<(String, PropertySource)>,
620    /// Properties to set on MATCH.
621    pub on_match_properties: Vec<(String, PropertySource)>,
622    /// Output schema (input columns + edge column).
623    pub output_schema: Vec<LogicalType>,
624    /// Column index for the edge variable in the output.
625    pub edge_output_column: usize,
626}
627
628/// Merge operator for relationship patterns.
629///
630/// Takes input rows containing source and target node IDs, then for each row:
631/// 1. Searches for an existing relationship matching the type and properties
632/// 2. If found, applies ON MATCH properties and returns the existing edge
633/// 3. If not found, creates a new relationship and applies ON CREATE properties
634pub struct MergeRelationshipOperator {
635    /// The graph store.
636    store: Arc<dyn GraphStoreMut>,
637    /// Input operator providing rows with source/target node columns.
638    input: Box<dyn Operator>,
639    /// Merge configuration.
640    config: MergeRelationshipConfig,
641    /// Epoch for MVCC versioning.
642    viewing_epoch: Option<EpochId>,
643    /// Transaction ID for undo log tracking.
644    transaction_id: Option<TransactionId>,
645    /// Optional constraint validator for schema enforcement.
646    validator: Option<Arc<dyn ConstraintValidator>>,
647    /// Search-store handle for evaluating `PropertySource::Expression`.
648    search_store: Option<Arc<dyn GraphStoreSearch>>,
649    /// Session context for expression evaluation.
650    session_context: SessionContext,
651}
652
653impl MergeRelationshipOperator {
654    /// Creates a new merge relationship operator.
655    pub fn new(
656        store: Arc<dyn GraphStoreMut>,
657        input: Box<dyn Operator>,
658        config: MergeRelationshipConfig,
659    ) -> Self {
660        Self {
661            store,
662            input,
663            config,
664            viewing_epoch: None,
665            transaction_id: None,
666            validator: None,
667            search_store: None,
668            session_context: SessionContext::default(),
669        }
670    }
671
672    /// Sets the transaction context for versioned mutations.
673    pub fn with_transaction_context(
674        mut self,
675        epoch: EpochId,
676        transaction_id: Option<TransactionId>,
677    ) -> Self {
678        self.viewing_epoch = Some(epoch);
679        self.transaction_id = transaction_id;
680        self
681    }
682
683    /// Sets the constraint validator for schema enforcement.
684    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
685        self.validator = Some(validator);
686        self
687    }
688
689    /// Provides a search-store handle for runtime expression evaluation.
690    #[must_use]
691    pub fn with_search_store(mut self, search_store: Arc<dyn GraphStoreSearch>) -> Self {
692        self.search_store = Some(search_store);
693        self
694    }
695
696    /// Sets the session context used during expression evaluation.
697    #[must_use]
698    pub fn with_session_context(mut self, context: SessionContext) -> Self {
699        self.session_context = context;
700        self
701    }
702
703    /// Builds a one-row chunk containing the input row plus the merged edge
704    /// in the column reserved for the MERGE relationship variable.
705    fn build_augmented_edge_chunk(
706        &self,
707        chunk: &DataChunk,
708        row: usize,
709        merged_edge: EdgeId,
710    ) -> DataChunk {
711        let mut builder = DataChunkBuilder::with_capacity(&self.config.output_schema, 1);
712        for col_idx in 0..chunk.column_count() {
713            let val = chunk
714                .column(col_idx)
715                .and_then(|c| c.get_value(row))
716                .unwrap_or(Value::Null);
717            if let Some(dst) = builder.column_mut(col_idx) {
718                dst.push_value(val);
719            }
720        }
721        if let Some(dst) = builder.column_mut(self.config.edge_output_column) {
722            dst.push_edge_id(merged_edge);
723        }
724        builder.advance_row();
725        builder.finish()
726    }
727
728    /// Resolves an action-property list (ON CREATE / ON MATCH SET) against
729    /// an augmented row that includes the merged edge id. Falls back to the
730    /// fast path when no expression sources are present.
731    fn resolve_action_properties(
732        &self,
733        props: &[(String, PropertySource)],
734        chunk: &DataChunk,
735        row: usize,
736        merged_edge: EdgeId,
737    ) -> Result<Vec<(String, Value)>, super::OperatorError> {
738        if !MergeOperator::has_expression_source(props) {
739            return Ok(MergeOperator::resolve_properties(
740                props,
741                Some(chunk),
742                row,
743                self.store.as_ref(),
744            ));
745        }
746
747        let augmented = self.build_augmented_edge_chunk(chunk, row, merged_edge);
748        let mut out = Vec::with_capacity(props.len());
749        for (name, source) in props {
750            let value = match source {
751                PropertySource::Expression {
752                    expr,
753                    variable_columns,
754                } => {
755                    let search_store = self.search_store.as_ref().ok_or_else(|| {
756                        super::OperatorError::Execution(
757                            "MERGE expression source requires search store; planner did not attach one"
758                                .to_string(),
759                        )
760                    })?;
761                    let mut predicate = ExpressionPredicate::new(
762                        (**expr).clone(),
763                        variable_columns.clone(),
764                        Arc::clone(search_store),
765                    )
766                    .with_session_context(self.session_context.clone());
767                    if let Some(epoch) = self.viewing_epoch {
768                        predicate = predicate.with_transaction_context(epoch, self.transaction_id);
769                    }
770                    predicate.eval_at(&augmented, 0).unwrap_or(Value::Null)
771                }
772                _ => source.resolve(&augmented, 0, self.store.as_ref()),
773            };
774            out.push((name.clone(), value));
775        }
776        Ok(out)
777    }
778
779    /// Tries to find a matching relationship between source and target.
780    fn find_matching_edge(
781        &self,
782        src: NodeId,
783        dst: NodeId,
784        resolved_match_props: &[(String, Value)],
785    ) -> Option<EdgeId> {
786        use crate::graph::Direction;
787
788        for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
789            if target != dst {
790                continue;
791            }
792
793            if let Some(edge) = self.store.get_edge(edge_id) {
794                if edge.edge_type.as_str() != self.config.edge_type {
795                    continue;
796                }
797
798                let has_all_props = resolved_match_props
799                    .iter()
800                    .all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
801
802                if has_all_props {
803                    return Some(edge_id);
804                }
805            }
806        }
807
808        None
809    }
810
811    /// Versioned-API edge create. See [`MergeOperator::store_create_node`]
812    /// for the rationale: the create itself must be tagged with the
813    /// operator's transaction so that rollback can undo it.
814    fn store_create_edge(&self, src: NodeId, dst: NodeId) -> EdgeId {
815        let epoch = self
816            .viewing_epoch
817            .unwrap_or_else(|| self.store.current_epoch());
818        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
819        self.store
820            .create_edge_versioned(src, dst, &self.config.edge_type, epoch, tx)
821    }
822
823    /// Writes a freshly-created edge's properties through the versioned
824    /// setter when a transaction is attached, mirroring
825    /// [`MergeOperator::write_node_props`].
826    fn write_edge_props(&self, id: EdgeId, props: &[(PropertyKey, Value)]) {
827        if let Some(tid) = self.transaction_id {
828            for (key, value) in props {
829                self.store
830                    .set_edge_property_versioned(id, key.as_str(), value.clone(), tid);
831            }
832        } else {
833            for (key, value) in props {
834                self.store
835                    .set_edge_property(id, key.as_str(), value.clone());
836            }
837        }
838    }
839
840    /// Creates a new edge with resolved match and on_create properties.
841    fn create_edge(
842        &self,
843        src: NodeId,
844        dst: NodeId,
845        resolved_match_props: &[(String, Value)],
846        resolved_create_props: &[(String, Value)],
847    ) -> Result<EdgeId, super::OperatorError> {
848        let all_props =
849            MergeOperator::merge_node_props(resolved_match_props, resolved_create_props);
850
851        // Validate constraints before creating the edge
852        if let Some(ref validator) = self.validator {
853            validator.validate_edge_type_allowed(&self.config.edge_type)?;
854            for (name, value) in &all_props {
855                validator.validate_edge_property(&self.config.edge_type, name, value)?;
856            }
857            validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
858        }
859
860        let prop_pairs: Vec<(PropertyKey, Value)> = all_props
861            .into_iter()
862            .map(|(k, v)| (PropertyKey::new(k.as_str()), v))
863            .collect();
864
865        let id = self.store_create_edge(src, dst);
866        self.write_edge_props(id, &prop_pairs);
867        Ok(id)
868    }
869
870    /// Phase one of the two-phase edge create path: validates per-property
871    /// types on match props and writes the edge, deferring the completeness
872    /// check until ON CREATE expression properties are resolved. See
873    /// [`MergeOperator::create_node_phase_one`] for the rationale.
874    ///
875    /// Both the create and the property writes go through the versioned
876    /// API, so a failure in phase two (or in `apply_on_match_edge`) is
877    /// undone when the surrounding session transaction rolls back.
878    fn create_edge_phase_one(
879        &self,
880        src: NodeId,
881        dst: NodeId,
882        resolved_match_props: &[(String, Value)],
883    ) -> Result<EdgeId, super::OperatorError> {
884        if let Some(ref validator) = self.validator {
885            validator.validate_edge_type_allowed(&self.config.edge_type)?;
886            for (name, value) in resolved_match_props {
887                validator.validate_edge_property(&self.config.edge_type, name, value)?;
888            }
889        }
890
891        let prop_pairs: Vec<(PropertyKey, Value)> = resolved_match_props
892            .iter()
893            .map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
894            .collect();
895
896        let id = self.store_create_edge(src, dst);
897        self.write_edge_props(id, &prop_pairs);
898        Ok(id)
899    }
900
901    /// Phase two of the two-phase edge create path: validates ON CREATE
902    /// edge properties and the full property set for completeness after
903    /// expressions have been evaluated against the freshly created edge.
904    fn validate_on_create_edge_phase_two(
905        &self,
906        resolved_match_props: &[(String, Value)],
907        resolved_create_props: &[(String, Value)],
908    ) -> Result<(), super::OperatorError> {
909        let Some(ref validator) = self.validator else {
910            return Ok(());
911        };
912        for (name, value) in resolved_create_props {
913            validator.validate_edge_property(&self.config.edge_type, name, value)?;
914        }
915        let all_props =
916            MergeOperator::merge_node_props(resolved_match_props, resolved_create_props);
917        validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
918        Ok(())
919    }
920
921    /// Applies ON MATCH properties to an existing edge.
922    fn apply_on_match_edge(
923        &self,
924        edge_id: EdgeId,
925        resolved_on_match: &[(String, Value)],
926    ) -> Result<(), super::OperatorError> {
927        for (key, value) in resolved_on_match {
928            if let Some(ref validator) = self.validator {
929                validator.validate_edge_property(&self.config.edge_type, key, value)?;
930            }
931            if let Some(tid) = self.transaction_id {
932                self.store
933                    .set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
934            } else {
935                self.store
936                    .set_edge_property(edge_id, key.as_str(), value.clone());
937            }
938        }
939        Ok(())
940    }
941}
942
943impl Operator for MergeRelationshipOperator {
944    fn next(&mut self) -> OperatorResult {
945        use super::OperatorError;
946
947        if let Some(chunk) = self.input.next()? {
948            let mut builder =
949                DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
950
951            for row in chunk.selected_indices() {
952                let src_val = chunk
953                    .column(self.config.source_column)
954                    .and_then(|c| c.get_node_id(row))
955                    .ok_or_else(|| OperatorError::TypeMismatch {
956                        expected: format!(
957                            "non-null node for MERGE variable '{}'",
958                            self.config.source_variable
959                        ),
960                        found: "NULL".to_string(),
961                    })?;
962
963                let dst_val = chunk
964                    .column(self.config.target_column)
965                    .and_then(|c| c.get_node_id(row))
966                    .ok_or_else(|| OperatorError::TypeMismatch {
967                        expected: format!(
968                            "non-null node for MERGE variable '{}'",
969                            self.config.target_variable
970                        ),
971                        found: "None".to_string(),
972                    })?;
973
974                let store_ref: &dyn GraphStore = self.store.as_ref();
975                let resolved_match = MergeOperator::resolve_properties(
976                    &self.config.match_properties,
977                    Some(&chunk),
978                    row,
979                    store_ref,
980                );
981
982                let edge_id = if let Some(existing) =
983                    self.find_matching_edge(src_val, dst_val, &resolved_match)
984                {
985                    let resolved_on_match = self.resolve_action_properties(
986                        &self.config.on_match_properties,
987                        &chunk,
988                        row,
989                        existing,
990                    )?;
991                    self.apply_on_match_edge(existing, &resolved_on_match)?;
992                    existing
993                } else if MergeOperator::has_expression_source(&self.config.on_create_properties) {
994                    // Two-phase create so ON CREATE expressions can reference
995                    // the new edge. Completeness validation is deferred to
996                    // `validate_on_create_edge_phase_two` so an ON CREATE
997                    // property is allowed to satisfy a NOT NULL constraint
998                    // that match properties alone would fail.
999                    let new_id = self.create_edge_phase_one(src_val, dst_val, &resolved_match)?;
1000                    let resolved_on_create = self.resolve_action_properties(
1001                        &self.config.on_create_properties,
1002                        &chunk,
1003                        row,
1004                        new_id,
1005                    )?;
1006                    self.validate_on_create_edge_phase_two(&resolved_match, &resolved_on_create)?;
1007                    self.apply_on_match_edge(new_id, &resolved_on_create)?;
1008                    new_id
1009                } else {
1010                    let resolved_on_create = MergeOperator::resolve_properties(
1011                        &self.config.on_create_properties,
1012                        Some(&chunk),
1013                        row,
1014                        store_ref,
1015                    );
1016                    self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
1017                };
1018
1019                // Copy input columns to output, then add the edge column
1020                for col_idx in 0..self.config.output_schema.len() {
1021                    if col_idx == self.config.edge_output_column {
1022                        if let Some(dst_col) = builder.column_mut(col_idx) {
1023                            dst_col.push_edge_id(edge_id);
1024                        }
1025                    } else if let (Some(src_col), Some(dst_col)) =
1026                        (chunk.column(col_idx), builder.column_mut(col_idx))
1027                        && let Some(val) = src_col.get_value(row)
1028                    {
1029                        dst_col.push_value(val);
1030                    }
1031                }
1032
1033                builder.advance_row();
1034            }
1035
1036            return Ok(Some(builder.finish()));
1037        }
1038
1039        Ok(None)
1040    }
1041
1042    fn reset(&mut self) {
1043        self.input.reset();
1044    }
1045
1046    fn name(&self) -> &'static str {
1047        "MergeRelationship"
1048    }
1049
1050    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1051        self
1052    }
1053}
1054
1055#[cfg(all(test, feature = "lpg"))]
1056mod tests {
1057    use super::*;
1058    use crate::graph::lpg::LpgStore;
1059
1060    fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
1061        props
1062            .into_iter()
1063            .map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
1064            .collect()
1065    }
1066
1067    #[test]
1068    fn test_merge_creates_new_node() {
1069        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1070
1071        // MERGE should create a new node since none exists
1072        let mut merge = MergeOperator::new(
1073            Arc::clone(&store),
1074            None,
1075            MergeConfig {
1076                variable: "n".to_string(),
1077                labels: vec!["Person".to_string()],
1078                match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
1079                on_create_properties: vec![],
1080                on_match_properties: vec![],
1081                output_schema: vec![LogicalType::Node],
1082                output_column: 0,
1083                bound_variable_column: None,
1084            },
1085        );
1086
1087        let result = merge.next().unwrap();
1088        assert!(result.is_some());
1089
1090        // Verify node was created
1091        let nodes = store.nodes_by_label("Person");
1092        assert_eq!(nodes.len(), 1);
1093
1094        let node = store.get_node(nodes[0]).unwrap();
1095        assert!(node.has_label("Person"));
1096        assert_eq!(
1097            node.properties.get(&PropertyKey::new("name")),
1098            Some(&Value::String("Alix".into()))
1099        );
1100    }
1101
1102    #[test]
1103    fn test_merge_matches_existing_node() {
1104        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1105
1106        // Create an existing node
1107        store.create_node_with_props(
1108            &["Person"],
1109            &[(PropertyKey::new("name"), Value::String("Gus".into()))],
1110        );
1111
1112        // MERGE should find the existing node
1113        let mut merge = MergeOperator::new(
1114            Arc::clone(&store),
1115            None,
1116            MergeConfig {
1117                variable: "n".to_string(),
1118                labels: vec!["Person".to_string()],
1119                match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
1120                on_create_properties: vec![],
1121                on_match_properties: vec![],
1122                output_schema: vec![LogicalType::Node],
1123                output_column: 0,
1124                bound_variable_column: None,
1125            },
1126        );
1127
1128        let result = merge.next().unwrap();
1129        assert!(result.is_some());
1130
1131        // Verify only one node exists (no new node created)
1132        let nodes = store.nodes_by_label("Person");
1133        assert_eq!(nodes.len(), 1);
1134    }
1135
1136    #[test]
1137    fn test_merge_with_on_create() {
1138        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1139
1140        // MERGE with ON CREATE SET
1141        let mut merge = MergeOperator::new(
1142            Arc::clone(&store),
1143            None,
1144            MergeConfig {
1145                variable: "n".to_string(),
1146                labels: vec!["Person".to_string()],
1147                match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
1148                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
1149                on_match_properties: vec![],
1150                output_schema: vec![LogicalType::Node],
1151                output_column: 0,
1152                bound_variable_column: None,
1153            },
1154        );
1155
1156        let _ = merge.next().unwrap();
1157
1158        // Verify node has both match properties and on_create properties
1159        let nodes = store.nodes_by_label("Person");
1160        let node = store.get_node(nodes[0]).unwrap();
1161        assert_eq!(
1162            node.properties.get(&PropertyKey::new("name")),
1163            Some(&Value::String("Vincent".into()))
1164        );
1165        assert_eq!(
1166            node.properties.get(&PropertyKey::new("created")),
1167            Some(&Value::Bool(true))
1168        );
1169    }
1170
1171    #[test]
1172    fn test_merge_with_on_match() {
1173        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1174
1175        // Create an existing node
1176        let node_id = store.create_node_with_props(
1177            &["Person"],
1178            &[(PropertyKey::new("name"), Value::String("Jules".into()))],
1179        );
1180
1181        // MERGE with ON MATCH SET
1182        let mut merge = MergeOperator::new(
1183            Arc::clone(&store),
1184            None,
1185            MergeConfig {
1186                variable: "n".to_string(),
1187                labels: vec!["Person".to_string()],
1188                match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
1189                on_create_properties: vec![],
1190                on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
1191                output_schema: vec![LogicalType::Node],
1192                output_column: 0,
1193                bound_variable_column: None,
1194            },
1195        );
1196
1197        let _ = merge.next().unwrap();
1198
1199        // Verify node has the on_match property added
1200        let node = store.get_node(node_id).unwrap();
1201        assert_eq!(
1202            node.properties.get(&PropertyKey::new("updated")),
1203            Some(&Value::Bool(true))
1204        );
1205    }
1206
1207    #[test]
1208    fn test_merge_uses_property_index() {
1209        let lpg_store = Arc::new(LpgStore::new().unwrap());
1210        lpg_store.create_property_index("name");
1211        assert!(lpg_store.has_property_index("name"));
1212
1213        // Use the trait object for node creation so the &[(PropertyKey, Value)] signature applies.
1214        let store: Arc<dyn GraphStoreMut> = lpg_store;
1215
1216        for i in 0..50u32 {
1217            store.create_node_with_props(
1218                &["Person"],
1219                &[(
1220                    PropertyKey::new("name"),
1221                    Value::String(format!("person_{i}").into()),
1222                )],
1223            );
1224        }
1225
1226        let target_id = store.create_node_with_props(
1227            &["Person"],
1228            &[(PropertyKey::new("name"), Value::String("Beatrix".into()))],
1229        );
1230
1231        // MERGE should find the existing node via index lookup
1232        let mut merge = MergeOperator::new(
1233            Arc::clone(&store),
1234            None,
1235            MergeConfig {
1236                variable: "n".to_string(),
1237                labels: vec!["Person".to_string()],
1238                match_properties: const_props(vec![("name", Value::String("Beatrix".into()))]),
1239                on_create_properties: vec![],
1240                on_match_properties: const_props(vec![("found", Value::Bool(true))]),
1241                output_schema: vec![LogicalType::Node],
1242                output_column: 0,
1243                bound_variable_column: None,
1244            },
1245        );
1246
1247        let result = merge.next().unwrap();
1248        assert!(result.is_some());
1249
1250        // ON MATCH should have fired on the correct node
1251        let node = store.get_node(target_id).unwrap();
1252        assert_eq!(
1253            node.properties.get(&PropertyKey::new("found")),
1254            Some(&Value::Bool(true))
1255        );
1256
1257        // No new node should have been created
1258        let persons = store.nodes_by_label("Person");
1259        assert_eq!(persons.len(), 51);
1260    }
1261
1262    #[test]
1263    fn test_merge_creates_via_index_miss() {
1264        let lpg_store = Arc::new(LpgStore::new().unwrap());
1265        lpg_store.create_property_index("name");
1266
1267        let store: Arc<dyn GraphStoreMut> = lpg_store;
1268
1269        store.create_node_with_props(
1270            &["Person"],
1271            &[(PropertyKey::new("name"), Value::String("Django".into()))],
1272        );
1273
1274        // MERGE for a name not in the index — should create
1275        let mut merge = MergeOperator::new(
1276            Arc::clone(&store),
1277            None,
1278            MergeConfig {
1279                variable: "n".to_string(),
1280                labels: vec!["Person".to_string()],
1281                match_properties: const_props(vec![("name", Value::String("Shosanna".into()))]),
1282                on_create_properties: const_props(vec![("created", Value::Bool(true))]),
1283                on_match_properties: vec![],
1284                output_schema: vec![LogicalType::Node],
1285                output_column: 0,
1286                bound_variable_column: None,
1287            },
1288        );
1289
1290        let result = merge.next().unwrap();
1291        assert!(result.is_some());
1292
1293        let persons = store.nodes_by_label("Person");
1294        assert_eq!(persons.len(), 2);
1295
1296        let new_nodes: Vec<_> = persons
1297            .iter()
1298            .filter_map(|&id| store.get_node(id))
1299            .filter(|n| {
1300                n.properties.get(&PropertyKey::new("name"))
1301                    == Some(&Value::String("Shosanna".into()))
1302            })
1303            .collect();
1304        assert_eq!(new_nodes.len(), 1);
1305        assert_eq!(
1306            new_nodes[0].properties.get(&PropertyKey::new("created")),
1307            Some(&Value::Bool(true))
1308        );
1309    }
1310
1311    // GrafeoDB/grafeo#317. Operator-level test: a `PropertySource::Expression`
1312    // for ON CREATE / ON MATCH SET must evaluate against an augmented row that
1313    // contains the merged node, not against the (potentially absent) input row.
1314
1315    #[test]
1316    fn test_merge_on_match_resolves_expression_against_merged_node() {
1317        use super::super::filter::FilterExpression;
1318        use crate::graph::lpg::LpgStore;
1319        use std::collections::HashMap;
1320
1321        let lpg = Arc::new(LpgStore::new().unwrap());
1322        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1323        let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1324
1325        // Pre-create the matching node so the MERGE goes into the ON MATCH branch.
1326        let id = store.create_node_with_props(
1327            &["Item"],
1328            &[
1329                (PropertyKey::new("val"), Value::Int64(1)),
1330                (PropertyKey::new("x"), Value::Int64(7)),
1331            ],
1332        );
1333
1334        // ON MATCH SET n.x = n.x + 5
1335        let expr = FilterExpression::Binary {
1336            left: Box::new(FilterExpression::Property {
1337                variable: "n".to_string(),
1338                property: "x".to_string(),
1339            }),
1340            op: super::super::filter::BinaryFilterOp::Add,
1341            right: Box::new(FilterExpression::Literal(Value::Int64(5))),
1342        };
1343        let mut variable_columns = HashMap::new();
1344        // Standalone MERGE: input is None, so the augmented row only has the
1345        // MERGE variable column at index 0.
1346        variable_columns.insert("n".to_string(), 0_usize);
1347
1348        let mut merge = MergeOperator::new(
1349            Arc::clone(&store),
1350            None,
1351            MergeConfig {
1352                variable: "n".to_string(),
1353                labels: vec!["Item".to_string()],
1354                match_properties: const_props(vec![("val", Value::Int64(1))]),
1355                on_create_properties: vec![],
1356                on_match_properties: vec![(
1357                    "x".to_string(),
1358                    PropertySource::Expression {
1359                        expr: Box::new(expr),
1360                        variable_columns,
1361                    },
1362                )],
1363                output_schema: vec![LogicalType::Node],
1364                output_column: 0,
1365                bound_variable_column: None,
1366            },
1367        )
1368        .with_search_store(Arc::clone(&search));
1369
1370        merge.next().unwrap();
1371
1372        let node = store.get_node(id).unwrap();
1373        assert_eq!(
1374            node.properties.get(&PropertyKey::new("x")),
1375            Some(&Value::Int64(12)),
1376            "ON MATCH expression must read the merged node, not NULL"
1377        );
1378    }
1379
1380    #[test]
1381    fn test_merge_on_create_resolves_expression_against_new_node() {
1382        // ON CREATE coalesce(n.x, 99) must see the freshly-created node and
1383        // fall back to 99 because `x` is not yet set on it.
1384        use super::super::filter::FilterExpression;
1385        use crate::graph::lpg::LpgStore;
1386        use std::collections::HashMap;
1387
1388        let lpg = Arc::new(LpgStore::new().unwrap());
1389        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1390        let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1391
1392        let coalesce = FilterExpression::FunctionCall {
1393            name: "coalesce".to_string(),
1394            args: vec![
1395                FilterExpression::Property {
1396                    variable: "n".to_string(),
1397                    property: "x".to_string(),
1398                },
1399                FilterExpression::Literal(Value::Int64(99)),
1400            ],
1401        };
1402        let mut variable_columns = HashMap::new();
1403        variable_columns.insert("n".to_string(), 0_usize);
1404
1405        let mut merge = MergeOperator::new(
1406            Arc::clone(&store),
1407            None,
1408            MergeConfig {
1409                variable: "n".to_string(),
1410                labels: vec!["Item".to_string()],
1411                match_properties: const_props(vec![("val", Value::Int64(1))]),
1412                on_create_properties: vec![(
1413                    "x".to_string(),
1414                    PropertySource::Expression {
1415                        expr: Box::new(coalesce),
1416                        variable_columns,
1417                    },
1418                )],
1419                on_match_properties: vec![],
1420                output_schema: vec![LogicalType::Node],
1421                output_column: 0,
1422                bound_variable_column: None,
1423            },
1424        )
1425        .with_search_store(Arc::clone(&search));
1426
1427        merge.next().unwrap();
1428
1429        let nodes = store.nodes_by_label("Item");
1430        assert_eq!(nodes.len(), 1);
1431        let node = store.get_node(nodes[0]).unwrap();
1432        assert_eq!(
1433            node.properties.get(&PropertyKey::new("x")),
1434            Some(&Value::Int64(99))
1435        );
1436    }
1437
1438    // ── Two-phase constraint validation regression tests ──────────────
1439    //
1440    // The two-phase create path (ON CREATE expression sources) used to call
1441    // `create_node` / `create_edge` with an empty on_create list, which made
1442    // `validate_node_complete` and `check_unique_node_property` only see the
1443    // match properties. The fix routes the two phases through dedicated
1444    // helpers that validate the full property set at the right time.
1445
1446    use super::ConstraintValidator;
1447
1448    /// Minimal validator that enforces NOT NULL on a single named property.
1449    struct RequirePropertyValidator {
1450        required_property: &'static str,
1451    }
1452
1453    impl ConstraintValidator for RequirePropertyValidator {
1454        fn validate_node_property(
1455            &self,
1456            _labels: &[String],
1457            _key: &str,
1458            _value: &Value,
1459        ) -> Result<(), super::super::OperatorError> {
1460            Ok(())
1461        }
1462        fn validate_node_complete(
1463            &self,
1464            _labels: &[String],
1465            properties: &[(String, Value)],
1466        ) -> Result<(), super::super::OperatorError> {
1467            if !properties.iter().any(|(k, _)| k == self.required_property) {
1468                return Err(super::super::OperatorError::ConstraintViolation(format!(
1469                    "missing required property '{}'",
1470                    self.required_property
1471                )));
1472            }
1473            Ok(())
1474        }
1475        fn check_unique_node_property(
1476            &self,
1477            _labels: &[String],
1478            _key: &str,
1479            _value: &Value,
1480        ) -> Result<(), super::super::OperatorError> {
1481            Ok(())
1482        }
1483        fn validate_edge_property(
1484            &self,
1485            _edge_type: &str,
1486            _key: &str,
1487            _value: &Value,
1488        ) -> Result<(), super::super::OperatorError> {
1489            Ok(())
1490        }
1491        fn validate_edge_complete(
1492            &self,
1493            _edge_type: &str,
1494            properties: &[(String, Value)],
1495        ) -> Result<(), super::super::OperatorError> {
1496            if !properties.iter().any(|(k, _)| k == self.required_property) {
1497                return Err(super::super::OperatorError::ConstraintViolation(format!(
1498                    "missing required edge property '{}'",
1499                    self.required_property
1500                )));
1501            }
1502            Ok(())
1503        }
1504    }
1505
1506    /// Validator that records every uniqueness check it sees, so the test
1507    /// can assert ON CREATE properties were not silently bypassed.
1508    struct RecordingUniqueValidator {
1509        seen: std::sync::Mutex<Vec<(String, Value)>>,
1510    }
1511
1512    impl RecordingUniqueValidator {
1513        fn new() -> Self {
1514            Self {
1515                seen: std::sync::Mutex::new(Vec::new()),
1516            }
1517        }
1518    }
1519
1520    impl ConstraintValidator for RecordingUniqueValidator {
1521        fn validate_node_property(
1522            &self,
1523            _labels: &[String],
1524            _key: &str,
1525            _value: &Value,
1526        ) -> Result<(), super::super::OperatorError> {
1527            Ok(())
1528        }
1529        fn validate_node_complete(
1530            &self,
1531            _labels: &[String],
1532            _properties: &[(String, Value)],
1533        ) -> Result<(), super::super::OperatorError> {
1534            Ok(())
1535        }
1536        fn check_unique_node_property(
1537            &self,
1538            _labels: &[String],
1539            key: &str,
1540            value: &Value,
1541        ) -> Result<(), super::super::OperatorError> {
1542            self.seen
1543                .lock()
1544                .unwrap()
1545                .push((key.to_string(), value.clone()));
1546            Ok(())
1547        }
1548        fn validate_edge_property(
1549            &self,
1550            _edge_type: &str,
1551            _key: &str,
1552            _value: &Value,
1553        ) -> Result<(), super::super::OperatorError> {
1554            Ok(())
1555        }
1556        fn validate_edge_complete(
1557            &self,
1558            _edge_type: &str,
1559            _properties: &[(String, Value)],
1560        ) -> Result<(), super::super::OperatorError> {
1561            Ok(())
1562        }
1563    }
1564
1565    fn coalesce_n_x_else(default: i64) -> super::super::filter::FilterExpression {
1566        use super::super::filter::FilterExpression;
1567        FilterExpression::FunctionCall {
1568            name: "coalesce".to_string(),
1569            args: vec![
1570                FilterExpression::Property {
1571                    variable: "n".to_string(),
1572                    property: "x".to_string(),
1573                },
1574                FilterExpression::Literal(Value::Int64(default)),
1575            ],
1576        }
1577    }
1578
1579    #[test]
1580    fn test_merge_two_phase_completeness_uses_full_property_set() {
1581        // Regression: phase one used to run completeness against match
1582        // properties only, falsely rejecting an ON CREATE property that
1583        // satisfies a NOT NULL requirement. With the fix, completeness is
1584        // checked once both phases have produced their properties.
1585        use crate::graph::lpg::LpgStore;
1586        use std::collections::HashMap;
1587
1588        let lpg = Arc::new(LpgStore::new().unwrap());
1589        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1590        let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1591
1592        let mut variable_columns = HashMap::new();
1593        variable_columns.insert("n".to_string(), 0_usize);
1594
1595        let mut merge = MergeOperator::new(
1596            Arc::clone(&store),
1597            None,
1598            MergeConfig {
1599                variable: "n".to_string(),
1600                labels: vec!["Item".to_string()],
1601                match_properties: const_props(vec![("val", Value::Int64(1))]),
1602                // ON CREATE supplies the NOT NULL property `x`.
1603                on_create_properties: vec![(
1604                    "x".to_string(),
1605                    PropertySource::Expression {
1606                        expr: Box::new(coalesce_n_x_else(99)),
1607                        variable_columns,
1608                    },
1609                )],
1610                on_match_properties: vec![],
1611                output_schema: vec![LogicalType::Node],
1612                output_column: 0,
1613                bound_variable_column: None,
1614            },
1615        )
1616        .with_search_store(Arc::clone(&search))
1617        .with_validator(Arc::new(RequirePropertyValidator {
1618            required_property: "x",
1619        }));
1620
1621        merge
1622            .next()
1623            .expect("MERGE must succeed because ON CREATE supplies the required property");
1624
1625        let nodes = store.nodes_by_label("Item");
1626        assert_eq!(nodes.len(), 1);
1627        let node = store.get_node(nodes[0]).unwrap();
1628        assert_eq!(
1629            node.properties.get(&PropertyKey::new("x")),
1630            Some(&Value::Int64(99)),
1631            "ON CREATE expression value must be persisted"
1632        );
1633    }
1634
1635    #[test]
1636    fn test_merge_two_phase_unique_check_runs_on_on_create_props() {
1637        // Regression: phase one used to skip uniqueness checks on ON CREATE
1638        // properties because the empty list passed to `create_node` hid
1639        // them. The fix runs `check_unique_node_property` for ON CREATE
1640        // values in phase two.
1641        use crate::graph::lpg::LpgStore;
1642        use std::collections::HashMap;
1643
1644        let lpg = Arc::new(LpgStore::new().unwrap());
1645        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1646        let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1647
1648        let mut variable_columns = HashMap::new();
1649        variable_columns.insert("n".to_string(), 0_usize);
1650
1651        let recorder = Arc::new(RecordingUniqueValidator::new());
1652
1653        let mut merge = MergeOperator::new(
1654            Arc::clone(&store),
1655            None,
1656            MergeConfig {
1657                variable: "n".to_string(),
1658                labels: vec!["Item".to_string()],
1659                match_properties: const_props(vec![("val", Value::Int64(1))]),
1660                on_create_properties: vec![(
1661                    "x".to_string(),
1662                    PropertySource::Expression {
1663                        expr: Box::new(coalesce_n_x_else(42)),
1664                        variable_columns,
1665                    },
1666                )],
1667                on_match_properties: vec![],
1668                output_schema: vec![LogicalType::Node],
1669                output_column: 0,
1670                bound_variable_column: None,
1671            },
1672        )
1673        .with_search_store(Arc::clone(&search))
1674        .with_validator(Arc::clone(&recorder) as Arc<dyn ConstraintValidator>);
1675
1676        merge.next().unwrap();
1677
1678        let seen = recorder.seen.lock().unwrap().clone();
1679        assert!(
1680            seen.iter().any(|(k, v)| k == "x" && *v == Value::Int64(42)),
1681            "uniqueness check must fire for ON CREATE expression property `x`, observed: {seen:?}"
1682        );
1683    }
1684
1685    #[test]
1686    fn test_merge_relationship_two_phase_completeness_uses_full_property_set() {
1687        // Edge-equivalent of the node completeness regression.
1688        use super::super::filter::FilterExpression;
1689        use crate::execution::chunk::DataChunkBuilder;
1690        use crate::graph::lpg::LpgStore;
1691        use std::collections::HashMap;
1692
1693        let lpg = Arc::new(LpgStore::new().unwrap());
1694        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1695        let search: Arc<dyn GraphStoreSearch> = Arc::clone(&lpg) as Arc<dyn GraphStoreSearch>;
1696
1697        let src_id = store.create_node_with_props(
1698            &["Node"],
1699            &[(PropertyKey::new("name"), Value::String("Vincent".into()))],
1700        );
1701        let dst_id = store.create_node_with_props(
1702            &["Node"],
1703            &[(PropertyKey::new("name"), Value::String("Mia".into()))],
1704        );
1705
1706        // Build an input chunk: [src_id, dst_id] with the edge column at index 2.
1707        let input_schema = vec![LogicalType::Node, LogicalType::Node];
1708        let mut builder = DataChunkBuilder::with_capacity(&input_schema, 1);
1709        builder.column_mut(0).unwrap().push_node_id(src_id);
1710        builder.column_mut(1).unwrap().push_node_id(dst_id);
1711        builder.advance_row();
1712        let chunk = builder.finish();
1713
1714        struct OneShot(Option<DataChunk>);
1715        impl Operator for OneShot {
1716            fn next(&mut self) -> OperatorResult {
1717                Ok(self.0.take())
1718            }
1719            fn reset(&mut self) {}
1720            fn name(&self) -> &'static str {
1721                "OneShot"
1722            }
1723            fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1724                self
1725            }
1726        }
1727
1728        // ON CREATE supplies NOT NULL property `x` via expression.
1729        let coalesce = FilterExpression::FunctionCall {
1730            name: "coalesce".to_string(),
1731            args: vec![
1732                FilterExpression::Property {
1733                    variable: "r".to_string(),
1734                    property: "x".to_string(),
1735                },
1736                FilterExpression::Literal(Value::Int64(7)),
1737            ],
1738        };
1739        let mut variable_columns = HashMap::new();
1740        // Augmented edge chunk: [src, dst, r] → r at index 2.
1741        variable_columns.insert("r".to_string(), 2_usize);
1742
1743        let mut merge_rel = MergeRelationshipOperator::new(
1744            Arc::clone(&store),
1745            Box::new(OneShot(Some(chunk))),
1746            MergeRelationshipConfig {
1747                source_column: 0,
1748                target_column: 1,
1749                source_variable: "a".to_string(),
1750                target_variable: "b".to_string(),
1751                edge_type: "KNOWS".to_string(),
1752                match_properties: vec![],
1753                on_create_properties: vec![(
1754                    "x".to_string(),
1755                    PropertySource::Expression {
1756                        expr: Box::new(coalesce),
1757                        variable_columns,
1758                    },
1759                )],
1760                on_match_properties: vec![],
1761                output_schema: vec![LogicalType::Node, LogicalType::Node, LogicalType::Edge],
1762                edge_output_column: 2,
1763            },
1764        )
1765        .with_search_store(Arc::clone(&search))
1766        .with_validator(Arc::new(RequirePropertyValidator {
1767            required_property: "x",
1768        }));
1769
1770        merge_rel.next().expect(
1771            "MERGE relationship must succeed because ON CREATE supplies the required property",
1772        );
1773
1774        // Confirm the edge was created with `x` set to the expression value.
1775        use crate::graph::Direction;
1776        let edges: Vec<EdgeId> = store
1777            .edges_from(src_id, Direction::Outgoing)
1778            .into_iter()
1779            .filter_map(|(target, edge_id)| (target == dst_id).then_some(edge_id))
1780            .collect();
1781        assert_eq!(edges.len(), 1, "expected exactly one outgoing edge");
1782        let edge = store.get_edge(edges[0]).unwrap();
1783        assert_eq!(edge.get_property("x"), Some(&Value::Int64(7)));
1784    }
1785
1786    #[test]
1787    fn test_merge_in_transaction_dedupes_within_unwind() {
1788        // Regression: MERGE inside UNWIND, executed in a transaction (auto-
1789        // commit or otherwise), tags its creates at `EpochId::PENDING`.
1790        // `find_matching_node`'s read path used to call the unversioned
1791        // `get_node`, which rejects PENDING records, so subsequent rows of
1792        // the same UNWIND could not see the node the operator had just
1793        // created and produced a duplicate per row.
1794        use crate::execution::chunk::DataChunkBuilder;
1795        use crate::graph::lpg::LpgStore;
1796        use grafeo_common::types::EpochId;
1797
1798        let lpg = Arc::new(LpgStore::new().unwrap());
1799        let store: Arc<dyn GraphStoreMut> = Arc::clone(&lpg) as Arc<dyn GraphStoreMut>;
1800
1801        // Build an input chunk emulating `UNWIND [1, 1, 1] AS i`.
1802        let input_schema = vec![LogicalType::Int64];
1803        let mut builder = DataChunkBuilder::with_capacity(&input_schema, 3);
1804        for _ in 0..3 {
1805            builder.column_mut(0).unwrap().push_value(Value::Int64(1));
1806            builder.advance_row();
1807        }
1808        let chunk = builder.finish();
1809
1810        struct OneShot(Option<DataChunk>);
1811        impl Operator for OneShot {
1812            fn next(&mut self) -> OperatorResult {
1813                Ok(self.0.take())
1814            }
1815            fn reset(&mut self) {}
1816            fn name(&self) -> &'static str {
1817                "OneShot"
1818            }
1819            fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1820                self
1821            }
1822        }
1823
1824        // Use a non-SYSTEM transaction so versioned creates land at PENDING.
1825        let tx = TransactionId::new(1);
1826        let mut merge = MergeOperator::new(
1827            Arc::clone(&store),
1828            Some(Box::new(OneShot(Some(chunk)))),
1829            MergeConfig {
1830                variable: "n".to_string(),
1831                labels: vec!["Item".to_string()],
1832                match_properties: vec![("val".to_string(), PropertySource::Column(0))],
1833                on_create_properties: vec![],
1834                on_match_properties: vec![],
1835                output_schema: vec![LogicalType::Int64, LogicalType::Node],
1836                output_column: 1,
1837                bound_variable_column: None,
1838            },
1839        )
1840        .with_transaction_context(EpochId::INITIAL, Some(tx));
1841
1842        while merge.next().unwrap().is_some() {}
1843
1844        // All three rows had val = 1, so MERGE must observe the node it
1845        // created on iteration 1 in iterations 2 and 3 and skip the create.
1846        let nodes = store.nodes_by_label("Item");
1847        let visible: Vec<_> = nodes
1848            .iter()
1849            .filter_map(|&id| store.get_node_versioned(id, EpochId::INITIAL, tx))
1850            .collect();
1851        assert_eq!(
1852            visible.len(),
1853            1,
1854            "MERGE inside UNWIND must dedupe nodes its own transaction created in earlier rows"
1855        );
1856    }
1857
1858    #[test]
1859    fn test_merge_into_any() {
1860        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
1861        let op = MergeOperator::new(
1862            Arc::clone(&store),
1863            None,
1864            MergeConfig {
1865                variable: "n".to_string(),
1866                labels: vec!["Person".to_string()],
1867                match_properties: vec![],
1868                on_create_properties: vec![],
1869                on_match_properties: vec![],
1870                output_schema: vec![LogicalType::Node],
1871                output_column: 0,
1872                bound_variable_column: None,
1873            },
1874        );
1875        let any = Box::new(op).into_any();
1876        assert!(any.downcast::<MergeOperator>().is_ok());
1877    }
1878}