Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    ///
28    /// # Errors
29    ///
30    /// Returns `Err` if the value type is incompatible or a NOT NULL constraint is violated.
31    fn validate_node_property(
32        &self,
33        labels: &[String],
34        key: &str,
35        value: &Value,
36    ) -> Result<(), OperatorError>;
37
38    /// Validates that all required properties are present after creating a node.
39    ///
40    /// Checks NOT NULL constraints for properties that were not explicitly set.
41    ///
42    /// # Errors
43    ///
44    /// Returns `Err` if a required (NOT NULL) property is missing.
45    fn validate_node_complete(
46        &self,
47        labels: &[String],
48        properties: &[(String, Value)],
49    ) -> Result<(), OperatorError>;
50
51    /// Checks UNIQUE constraint for a node property value.
52    ///
53    /// # Errors
54    ///
55    /// Returns `Err` if a node with the same label already has this value.
56    fn check_unique_node_property(
57        &self,
58        labels: &[String],
59        key: &str,
60        value: &Value,
61    ) -> Result<(), OperatorError>;
62
63    /// Validates a single property value for an edge of the given type.
64    ///
65    /// # Errors
66    ///
67    /// Returns `Err` if the value type is incompatible with the edge schema.
68    fn validate_edge_property(
69        &self,
70        edge_type: &str,
71        key: &str,
72        value: &Value,
73    ) -> Result<(), OperatorError>;
74
75    /// Validates that all required properties are present after creating an edge.
76    ///
77    /// # Errors
78    ///
79    /// Returns `Err` if a required property is missing from the edge.
80    fn validate_edge_complete(
81        &self,
82        edge_type: &str,
83        properties: &[(String, Value)],
84    ) -> Result<(), OperatorError>;
85
86    /// Validates that the node labels are allowed by the bound graph type.
87    ///
88    /// # Errors
89    ///
90    /// Returns `Err` if any label is not defined in the graph type.
91    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92        let _ = labels;
93        Ok(())
94    }
95
96    /// Validates that the edge type is allowed by the bound graph type.
97    ///
98    /// # Errors
99    ///
100    /// Returns `Err` if the edge type is not defined in the graph type.
101    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102        let _ = edge_type;
103        Ok(())
104    }
105
106    /// Validates that edge endpoints have the correct node type labels.
107    ///
108    /// # Errors
109    ///
110    /// Returns `Err` if the source or target node labels do not match the edge type definition.
111    fn validate_edge_endpoints(
112        &self,
113        edge_type: &str,
114        source_labels: &[String],
115        target_labels: &[String],
116    ) -> Result<(), OperatorError> {
117        let _ = (edge_type, source_labels, target_labels);
118        Ok(())
119    }
120
121    /// Injects default values for properties that are defined in a type but
122    /// not explicitly provided.
123    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124        let _ = (labels, properties);
125    }
126}
127
128/// Operator that creates new nodes.
129///
130/// For each input row, creates a new node with the specified labels
131/// and properties, then outputs the row with the new node.
132pub struct CreateNodeOperator {
133    /// The graph store to modify.
134    store: Arc<dyn GraphStoreMut>,
135    /// Input operator.
136    input: Option<Box<dyn Operator>>,
137    /// Labels for the new nodes.
138    labels: Vec<String>,
139    /// Properties to set (name -> column index or constant value).
140    properties: Vec<(String, PropertySource)>,
141    /// Output schema.
142    output_schema: Vec<LogicalType>,
143    /// Column index for the created node variable.
144    output_column: usize,
145    /// Whether this operator has been executed (for no-input case).
146    executed: bool,
147    /// Epoch for MVCC versioning.
148    viewing_epoch: Option<EpochId>,
149    /// Transaction ID for MVCC versioning.
150    transaction_id: Option<TransactionId>,
151    /// Optional constraint validator for schema enforcement.
152    validator: Option<Arc<dyn ConstraintValidator>>,
153    /// Optional write tracker for conflict detection.
154    write_tracker: Option<SharedWriteTracker>,
155}
156
157/// Source for a property value.
158#[derive(Debug, Clone)]
159#[non_exhaustive]
160pub enum PropertySource {
161    /// Get value from an input column.
162    Column(usize),
163    /// Use a constant value.
164    Constant(Value),
165    /// Access a named property from a map/node/edge in an input column.
166    PropertyAccess {
167        /// The column containing the map, node ID, or edge ID.
168        column: usize,
169        /// The property name to extract.
170        property: String,
171    },
172}
173
174impl PropertySource {
175    /// Resolves a property value from a data chunk row.
176    pub fn resolve(
177        &self,
178        chunk: &crate::execution::chunk::DataChunk,
179        row: usize,
180        store: &dyn GraphStore,
181    ) -> Value {
182        match self {
183            PropertySource::Column(col_idx) => chunk
184                .column(*col_idx)
185                .and_then(|c| c.get_value(row))
186                .unwrap_or(Value::Null),
187            PropertySource::Constant(v) => v.clone(),
188            PropertySource::PropertyAccess { column, property } => {
189                let Some(col) = chunk.column(*column) else {
190                    return Value::Null;
191                };
192                // Try node ID first, then edge ID, then map value
193                if let Some(node_id) = col.get_node_id(row) {
194                    store
195                        .get_node(node_id)
196                        .and_then(|node| node.get_property(property).cloned())
197                        .unwrap_or(Value::Null)
198                } else if let Some(edge_id) = col.get_edge_id(row) {
199                    store
200                        .get_edge(edge_id)
201                        .and_then(|edge| edge.get_property(property).cloned())
202                        .unwrap_or(Value::Null)
203                } else if let Some(Value::Map(map)) = col.get_value(row) {
204                    let key = PropertyKey::new(property);
205                    map.get(&key).cloned().unwrap_or(Value::Null)
206                } else {
207                    Value::Null
208                }
209            }
210        }
211    }
212}
213
214impl CreateNodeOperator {
215    /// Creates a new node creation operator.
216    ///
217    /// # Arguments
218    /// * `store` - The graph store to modify.
219    /// * `input` - Optional input operator (None for standalone CREATE).
220    /// * `labels` - Labels to assign to created nodes.
221    /// * `properties` - Properties to set on created nodes.
222    /// * `output_schema` - Schema of the output.
223    /// * `output_column` - Column index where the created node ID goes.
224    pub fn new(
225        store: Arc<dyn GraphStoreMut>,
226        input: Option<Box<dyn Operator>>,
227        labels: Vec<String>,
228        properties: Vec<(String, PropertySource)>,
229        output_schema: Vec<LogicalType>,
230        output_column: usize,
231    ) -> Self {
232        Self {
233            store,
234            input,
235            labels,
236            properties,
237            output_schema,
238            output_column,
239            executed: false,
240            viewing_epoch: None,
241            transaction_id: None,
242            validator: None,
243            write_tracker: None,
244        }
245    }
246
247    /// Sets the transaction context for MVCC versioning.
248    pub fn with_transaction_context(
249        mut self,
250        epoch: EpochId,
251        transaction_id: Option<TransactionId>,
252    ) -> Self {
253        self.viewing_epoch = Some(epoch);
254        self.transaction_id = transaction_id;
255        self
256    }
257
258    /// Sets the constraint validator for schema enforcement.
259    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
260        self.validator = Some(validator);
261        self
262    }
263
264    /// Sets the write tracker for conflict detection.
265    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
266        self.write_tracker = Some(tracker);
267        self
268    }
269}
270
271impl CreateNodeOperator {
272    /// Validates and sets properties on a newly created node.
273    fn validate_and_set_properties(
274        &self,
275        node_id: NodeId,
276        resolved_props: &mut Vec<(String, Value)>,
277    ) -> Result<(), OperatorError> {
278        // Phase 0: Validate that node labels are allowed by the bound graph type
279        if let Some(ref validator) = self.validator {
280            validator.validate_node_labels_allowed(&self.labels)?;
281        }
282
283        // Phase 0.5: Inject defaults for properties not explicitly provided
284        if let Some(ref validator) = self.validator {
285            validator.inject_defaults(&self.labels, resolved_props);
286        }
287
288        // Phase 1: Validate each property value
289        if let Some(ref validator) = self.validator {
290            for (name, value) in resolved_props.iter() {
291                validator.validate_node_property(&self.labels, name, value)?;
292                validator.check_unique_node_property(&self.labels, name, value)?;
293            }
294            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
295            validator.validate_node_complete(&self.labels, resolved_props)?;
296        }
297
298        // Phase 3: Write properties to the store
299        if let Some(tid) = self.transaction_id {
300            for (name, value) in resolved_props.iter() {
301                self.store
302                    .set_node_property_versioned(node_id, name, value.clone(), tid);
303            }
304        } else {
305            for (name, value) in resolved_props.iter() {
306                self.store.set_node_property(node_id, name, value.clone());
307            }
308        }
309        Ok(())
310    }
311}
312
313impl Operator for CreateNodeOperator {
314    fn next(&mut self) -> OperatorResult {
315        // Get transaction context for versioned creation
316        let epoch = self
317            .viewing_epoch
318            .unwrap_or_else(|| self.store.current_epoch());
319        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
320
321        if let Some(ref mut input) = self.input {
322            // For each input row, create a node
323            if let Some(chunk) = input.next()? {
324                let mut builder =
325                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
326
327                for row in chunk.selected_indices() {
328                    // Resolve all property values first (before creating node)
329                    let mut resolved_props: Vec<(String, Value)> = self
330                        .properties
331                        .iter()
332                        .map(|(name, source)| {
333                            let value =
334                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
335                            (name.clone(), value)
336                        })
337                        .collect();
338
339                    // Create the node with MVCC versioning
340                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
341                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
342
343                    // Record write for conflict detection
344                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
345                        tracker.record_node_write(tid, node_id)?;
346                    }
347
348                    // Validate and set properties
349                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
350
351                    // Copy input columns to output
352                    for col_idx in 0..chunk.column_count() {
353                        if col_idx < self.output_column
354                            && let (Some(src), Some(dst)) =
355                                (chunk.column(col_idx), builder.column_mut(col_idx))
356                        {
357                            if let Some(val) = src.get_value(row) {
358                                dst.push_value(val);
359                            } else {
360                                dst.push_value(Value::Null);
361                            }
362                        }
363                    }
364
365                    // Add the new node ID
366                    if let Some(dst) = builder.column_mut(self.output_column) {
367                        dst.push_value(Value::Int64(node_id.0 as i64));
368                    }
369
370                    builder.advance_row();
371                }
372
373                return Ok(Some(builder.finish()));
374            }
375            Ok(None)
376        } else {
377            // No input - create a single node
378            if self.executed {
379                return Ok(None);
380            }
381            self.executed = true;
382
383            // Resolve constant properties
384            let mut resolved_props: Vec<(String, Value)> = self
385                .properties
386                .iter()
387                .filter_map(|(name, source)| {
388                    if let PropertySource::Constant(value) = source {
389                        Some((name.clone(), value.clone()))
390                    } else {
391                        None
392                    }
393                })
394                .collect();
395
396            // Create the node with MVCC versioning
397            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
398            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
399
400            // Record write for conflict detection
401            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
402                tracker.record_node_write(tid, node_id)?;
403            }
404
405            // Validate and set properties
406            self.validate_and_set_properties(node_id, &mut resolved_props)?;
407
408            // Build output chunk with just the node ID
409            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
410            if let Some(dst) = builder.column_mut(self.output_column) {
411                dst.push_value(Value::Int64(node_id.0 as i64));
412            }
413            builder.advance_row();
414
415            Ok(Some(builder.finish()))
416        }
417    }
418
419    fn reset(&mut self) {
420        if let Some(ref mut input) = self.input {
421            input.reset();
422        }
423        self.executed = false;
424    }
425
426    fn name(&self) -> &'static str {
427        "CreateNode"
428    }
429}
430
431/// Operator that creates new edges.
432pub struct CreateEdgeOperator {
433    /// The graph store to modify.
434    store: Arc<dyn GraphStoreMut>,
435    /// Input operator.
436    input: Box<dyn Operator>,
437    /// Column index for the source node.
438    from_column: usize,
439    /// Column index for the target node.
440    to_column: usize,
441    /// Edge type.
442    edge_type: String,
443    /// Properties to set.
444    properties: Vec<(String, PropertySource)>,
445    /// Output schema.
446    output_schema: Vec<LogicalType>,
447    /// Column index for the created edge variable (if any).
448    output_column: Option<usize>,
449    /// Epoch for MVCC versioning.
450    viewing_epoch: Option<EpochId>,
451    /// Transaction ID for MVCC versioning.
452    transaction_id: Option<TransactionId>,
453    /// Optional constraint validator for schema enforcement.
454    validator: Option<Arc<dyn ConstraintValidator>>,
455    /// Optional write tracker for conflict detection.
456    write_tracker: Option<SharedWriteTracker>,
457}
458
459impl CreateEdgeOperator {
460    /// Creates a new edge creation operator.
461    ///
462    /// Use builder methods to set additional options:
463    /// - [`with_properties`](Self::with_properties) - set edge properties
464    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
465    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
466    pub fn new(
467        store: Arc<dyn GraphStoreMut>,
468        input: Box<dyn Operator>,
469        from_column: usize,
470        to_column: usize,
471        edge_type: String,
472        output_schema: Vec<LogicalType>,
473    ) -> Self {
474        Self {
475            store,
476            input,
477            from_column,
478            to_column,
479            edge_type,
480            properties: Vec::new(),
481            output_schema,
482            output_column: None,
483            viewing_epoch: None,
484            transaction_id: None,
485            validator: None,
486            write_tracker: None,
487        }
488    }
489
490    /// Sets the properties to assign to created edges.
491    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
492        self.properties = properties;
493        self
494    }
495
496    /// Sets the output column for the created edge ID.
497    pub fn with_output_column(mut self, column: usize) -> Self {
498        self.output_column = Some(column);
499        self
500    }
501
502    /// Sets the transaction context for MVCC versioning.
503    pub fn with_transaction_context(
504        mut self,
505        epoch: EpochId,
506        transaction_id: Option<TransactionId>,
507    ) -> Self {
508        self.viewing_epoch = Some(epoch);
509        self.transaction_id = transaction_id;
510        self
511    }
512
513    /// Sets the constraint validator for schema enforcement.
514    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
515        self.validator = Some(validator);
516        self
517    }
518
519    /// Sets the write tracker for conflict detection.
520    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
521        self.write_tracker = Some(tracker);
522        self
523    }
524}
525
526impl Operator for CreateEdgeOperator {
527    fn next(&mut self) -> OperatorResult {
528        // Get transaction context for versioned creation
529        let epoch = self
530            .viewing_epoch
531            .unwrap_or_else(|| self.store.current_epoch());
532        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
533
534        if let Some(chunk) = self.input.next()? {
535            let mut builder =
536                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
537
538            for row in chunk.selected_indices() {
539                // Get source and target node IDs
540                let from_id = chunk
541                    .column(self.from_column)
542                    .and_then(|c| c.get_value(row))
543                    .ok_or_else(|| {
544                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
545                    })?;
546
547                let to_id = chunk
548                    .column(self.to_column)
549                    .and_then(|c| c.get_value(row))
550                    .ok_or_else(|| {
551                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
552                    })?;
553
554                // Extract node IDs
555                let from_node_id = match from_id {
556                    Value::Int64(id) => NodeId(id as u64),
557                    _ => {
558                        return Err(OperatorError::TypeMismatch {
559                            expected: "Int64 (node ID)".to_string(),
560                            found: format!("{from_id:?}"),
561                        });
562                    }
563                };
564
565                let to_node_id = match to_id {
566                    Value::Int64(id) => NodeId(id as u64),
567                    _ => {
568                        return Err(OperatorError::TypeMismatch {
569                            expected: "Int64 (node ID)".to_string(),
570                            found: format!("{to_id:?}"),
571                        });
572                    }
573                };
574
575                // Validate graph type and edge endpoint constraints
576                if let Some(ref validator) = self.validator {
577                    validator.validate_edge_type_allowed(&self.edge_type)?;
578
579                    // Look up source and target node labels for endpoint validation
580                    let source_labels: Vec<String> = self
581                        .store
582                        .get_node(from_node_id)
583                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
584                        .unwrap_or_default();
585                    let target_labels: Vec<String> = self
586                        .store
587                        .get_node(to_node_id)
588                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
589                        .unwrap_or_default();
590                    validator.validate_edge_endpoints(
591                        &self.edge_type,
592                        &source_labels,
593                        &target_labels,
594                    )?;
595                }
596
597                // Resolve property values
598                let resolved_props: Vec<(String, Value)> = self
599                    .properties
600                    .iter()
601                    .map(|(name, source)| {
602                        let value =
603                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
604                        (name.clone(), value)
605                    })
606                    .collect();
607
608                // Validate constraints before writing
609                if let Some(ref validator) = self.validator {
610                    for (name, value) in &resolved_props {
611                        validator.validate_edge_property(&self.edge_type, name, value)?;
612                    }
613                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
614                }
615
616                // Create the edge with MVCC versioning
617                let edge_id = self.store.create_edge_versioned(
618                    from_node_id,
619                    to_node_id,
620                    &self.edge_type,
621                    epoch,
622                    tx,
623                );
624
625                // Record write for conflict detection
626                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
627                    tracker.record_edge_write(tid, edge_id)?;
628                }
629
630                // Set properties
631                if let Some(tid) = self.transaction_id {
632                    for (name, value) in resolved_props {
633                        self.store
634                            .set_edge_property_versioned(edge_id, &name, value, tid);
635                    }
636                } else {
637                    for (name, value) in resolved_props {
638                        self.store.set_edge_property(edge_id, &name, value);
639                    }
640                }
641
642                // Copy input columns
643                for col_idx in 0..chunk.column_count() {
644                    if let (Some(src), Some(dst)) =
645                        (chunk.column(col_idx), builder.column_mut(col_idx))
646                    {
647                        if let Some(val) = src.get_value(row) {
648                            dst.push_value(val);
649                        } else {
650                            dst.push_value(Value::Null);
651                        }
652                    }
653                }
654
655                // Add edge ID if requested
656                if let Some(out_col) = self.output_column
657                    && let Some(dst) = builder.column_mut(out_col)
658                {
659                    dst.push_value(Value::Int64(edge_id.0 as i64));
660                }
661
662                builder.advance_row();
663            }
664
665            return Ok(Some(builder.finish()));
666        }
667        Ok(None)
668    }
669
670    fn reset(&mut self) {
671        self.input.reset();
672    }
673
674    fn name(&self) -> &'static str {
675        "CreateEdge"
676    }
677}
678
679/// Operator that deletes nodes.
680pub struct DeleteNodeOperator {
681    /// The graph store to modify.
682    store: Arc<dyn GraphStoreMut>,
683    /// Input operator.
684    input: Box<dyn Operator>,
685    /// Column index for the node to delete.
686    node_column: usize,
687    /// Output schema.
688    output_schema: Vec<LogicalType>,
689    /// Whether to detach (delete connected edges) before deleting.
690    detach: bool,
691    /// Epoch for MVCC versioning.
692    viewing_epoch: Option<EpochId>,
693    /// Transaction ID for MVCC versioning.
694    transaction_id: Option<TransactionId>,
695    /// Optional write tracker for conflict detection.
696    write_tracker: Option<SharedWriteTracker>,
697}
698
699impl DeleteNodeOperator {
700    /// Creates a new node deletion operator.
701    pub fn new(
702        store: Arc<dyn GraphStoreMut>,
703        input: Box<dyn Operator>,
704        node_column: usize,
705        output_schema: Vec<LogicalType>,
706        detach: bool,
707    ) -> Self {
708        Self {
709            store,
710            input,
711            node_column,
712            output_schema,
713            detach,
714            viewing_epoch: None,
715            transaction_id: None,
716            write_tracker: None,
717        }
718    }
719
720    /// Sets the transaction context for MVCC versioning.
721    pub fn with_transaction_context(
722        mut self,
723        epoch: EpochId,
724        transaction_id: Option<TransactionId>,
725    ) -> Self {
726        self.viewing_epoch = Some(epoch);
727        self.transaction_id = transaction_id;
728        self
729    }
730
731    /// Sets the write tracker for conflict detection.
732    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
733        self.write_tracker = Some(tracker);
734        self
735    }
736}
737
738impl Operator for DeleteNodeOperator {
739    fn next(&mut self) -> OperatorResult {
740        // Get transaction context for versioned deletion
741        let epoch = self
742            .viewing_epoch
743            .unwrap_or_else(|| self.store.current_epoch());
744        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
745
746        if let Some(chunk) = self.input.next()? {
747            let mut builder =
748                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
749
750            for row in chunk.selected_indices() {
751                let node_val = chunk
752                    .column(self.node_column)
753                    .and_then(|c| c.get_value(row))
754                    .ok_or_else(|| {
755                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
756                    })?;
757
758                let node_id = match node_val {
759                    Value::Int64(id) => NodeId(id as u64),
760                    _ => {
761                        return Err(OperatorError::TypeMismatch {
762                            expected: "Int64 (node ID)".to_string(),
763                            found: format!("{node_val:?}"),
764                        });
765                    }
766                };
767
768                if self.detach {
769                    // Delete all connected edges first, using versioned deletion
770                    // so rollback can restore them
771                    let outgoing = self
772                        .store
773                        .edges_from(node_id, crate::graph::Direction::Outgoing);
774                    let incoming = self
775                        .store
776                        .edges_from(node_id, crate::graph::Direction::Incoming);
777                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
778                        self.store.delete_edge_versioned(edge_id, epoch, tx);
779                        if let (Some(tracker), Some(tid)) =
780                            (&self.write_tracker, self.transaction_id)
781                        {
782                            tracker.record_edge_write(tid, edge_id)?;
783                        }
784                    }
785                } else {
786                    // NODETACH: check that node has no connected edges
787                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
788                    if degree > 0 {
789                        return Err(OperatorError::ConstraintViolation(format!(
790                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
791                            degree
792                        )));
793                    }
794                }
795
796                // Delete the node with MVCC versioning
797                self.store.delete_node_versioned(node_id, epoch, tx);
798
799                // Record write for conflict detection
800                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
801                    tracker.record_node_write(tid, node_id)?;
802                }
803
804                // Pass through all input columns so downstream RETURN can
805                // reference the variable (e.g., count(n) after DELETE n).
806                for col_idx in 0..chunk.column_count() {
807                    if let (Some(src), Some(dst)) =
808                        (chunk.column(col_idx), builder.column_mut(col_idx))
809                    {
810                        if let Some(val) = src.get_value(row) {
811                            dst.push_value(val);
812                        } else {
813                            dst.push_value(Value::Null);
814                        }
815                    }
816                }
817                builder.advance_row();
818            }
819
820            return Ok(Some(builder.finish()));
821        }
822        Ok(None)
823    }
824
825    fn reset(&mut self) {
826        self.input.reset();
827    }
828
829    fn name(&self) -> &'static str {
830        "DeleteNode"
831    }
832}
833
834/// Operator that deletes edges.
835pub struct DeleteEdgeOperator {
836    /// The graph store to modify.
837    store: Arc<dyn GraphStoreMut>,
838    /// Input operator.
839    input: Box<dyn Operator>,
840    /// Column index for the edge to delete.
841    edge_column: usize,
842    /// Output schema.
843    output_schema: Vec<LogicalType>,
844    /// Epoch for MVCC versioning.
845    viewing_epoch: Option<EpochId>,
846    /// Transaction ID for MVCC versioning.
847    transaction_id: Option<TransactionId>,
848    /// Optional write tracker for conflict detection.
849    write_tracker: Option<SharedWriteTracker>,
850}
851
852impl DeleteEdgeOperator {
853    /// Creates a new edge deletion operator.
854    pub fn new(
855        store: Arc<dyn GraphStoreMut>,
856        input: Box<dyn Operator>,
857        edge_column: usize,
858        output_schema: Vec<LogicalType>,
859    ) -> Self {
860        Self {
861            store,
862            input,
863            edge_column,
864            output_schema,
865            viewing_epoch: None,
866            transaction_id: None,
867            write_tracker: None,
868        }
869    }
870
871    /// Sets the transaction context for MVCC versioning.
872    pub fn with_transaction_context(
873        mut self,
874        epoch: EpochId,
875        transaction_id: Option<TransactionId>,
876    ) -> Self {
877        self.viewing_epoch = Some(epoch);
878        self.transaction_id = transaction_id;
879        self
880    }
881
882    /// Sets the write tracker for conflict detection.
883    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
884        self.write_tracker = Some(tracker);
885        self
886    }
887}
888
889impl Operator for DeleteEdgeOperator {
890    fn next(&mut self) -> OperatorResult {
891        // Get transaction context for versioned deletion
892        let epoch = self
893            .viewing_epoch
894            .unwrap_or_else(|| self.store.current_epoch());
895        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
896
897        if let Some(chunk) = self.input.next()? {
898            let mut builder =
899                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
900
901            for row in chunk.selected_indices() {
902                let edge_val = chunk
903                    .column(self.edge_column)
904                    .and_then(|c| c.get_value(row))
905                    .ok_or_else(|| {
906                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
907                    })?;
908
909                let edge_id = match edge_val {
910                    Value::Int64(id) => EdgeId(id as u64),
911                    _ => {
912                        return Err(OperatorError::TypeMismatch {
913                            expected: "Int64 (edge ID)".to_string(),
914                            found: format!("{edge_val:?}"),
915                        });
916                    }
917                };
918
919                // Delete the edge with MVCC versioning
920                self.store.delete_edge_versioned(edge_id, epoch, tx);
921
922                // Record write for conflict detection
923                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
924                    tracker.record_edge_write(tid, edge_id)?;
925                }
926
927                // Pass through all input columns
928                for col_idx in 0..chunk.column_count() {
929                    if let (Some(src), Some(dst)) =
930                        (chunk.column(col_idx), builder.column_mut(col_idx))
931                    {
932                        if let Some(val) = src.get_value(row) {
933                            dst.push_value(val);
934                        } else {
935                            dst.push_value(Value::Null);
936                        }
937                    }
938                }
939                builder.advance_row();
940            }
941
942            return Ok(Some(builder.finish()));
943        }
944        Ok(None)
945    }
946
947    fn reset(&mut self) {
948        self.input.reset();
949    }
950
951    fn name(&self) -> &'static str {
952        "DeleteEdge"
953    }
954}
955
956/// Operator that adds labels to nodes.
957pub struct AddLabelOperator {
958    /// The graph store.
959    store: Arc<dyn GraphStoreMut>,
960    /// Child operator providing nodes.
961    input: Box<dyn Operator>,
962    /// Column index containing node IDs.
963    node_column: usize,
964    /// Labels to add.
965    labels: Vec<String>,
966    /// Output schema.
967    output_schema: Vec<LogicalType>,
968    /// Column index for the update count (last column).
969    count_column: usize,
970    /// Epoch for MVCC versioning.
971    viewing_epoch: Option<EpochId>,
972    /// Transaction ID for undo log tracking.
973    transaction_id: Option<TransactionId>,
974    /// Optional write tracker for conflict detection.
975    write_tracker: Option<SharedWriteTracker>,
976}
977
978impl AddLabelOperator {
979    /// Creates a new add label operator.
980    pub fn new(
981        store: Arc<dyn GraphStoreMut>,
982        input: Box<dyn Operator>,
983        node_column: usize,
984        labels: Vec<String>,
985        output_schema: Vec<LogicalType>,
986    ) -> Self {
987        let count_column = output_schema.len() - 1;
988        Self {
989            store,
990            input,
991            node_column,
992            labels,
993            count_column,
994            output_schema,
995            viewing_epoch: None,
996            transaction_id: None,
997            write_tracker: None,
998        }
999    }
1000
1001    /// Sets the transaction context for versioned label mutations.
1002    pub fn with_transaction_context(
1003        mut self,
1004        epoch: EpochId,
1005        transaction_id: Option<TransactionId>,
1006    ) -> Self {
1007        self.viewing_epoch = Some(epoch);
1008        self.transaction_id = transaction_id;
1009        self
1010    }
1011
1012    /// Sets the write tracker for conflict detection.
1013    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1014        self.write_tracker = Some(tracker);
1015        self
1016    }
1017}
1018
1019impl Operator for AddLabelOperator {
1020    fn next(&mut self) -> OperatorResult {
1021        if let Some(chunk) = self.input.next()? {
1022            let mut builder =
1023                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1024
1025            for row in chunk.selected_indices() {
1026                let node_val = chunk
1027                    .column(self.node_column)
1028                    .and_then(|c| c.get_value(row))
1029                    .ok_or_else(|| {
1030                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1031                    })?;
1032
1033                let node_id = match node_val {
1034                    Value::Int64(id) => NodeId(id as u64),
1035                    _ => {
1036                        return Err(OperatorError::TypeMismatch {
1037                            expected: "Int64 (node ID)".to_string(),
1038                            found: format!("{node_val:?}"),
1039                        });
1040                    }
1041                };
1042
1043                // Record write for conflict detection
1044                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1045                    tracker.record_node_write(tid, node_id)?;
1046                }
1047
1048                // Add all labels
1049                let mut row_count: i64 = 0;
1050                for label in &self.labels {
1051                    let added = if let Some(tid) = self.transaction_id {
1052                        self.store.add_label_versioned(node_id, label, tid)
1053                    } else {
1054                        self.store.add_label(node_id, label)
1055                    };
1056                    if added {
1057                        row_count += 1;
1058                    }
1059                }
1060
1061                // Copy input columns to output (pass-through)
1062                for col_idx in 0..chunk.column_count() {
1063                    if let (Some(src), Some(dst)) =
1064                        (chunk.column(col_idx), builder.column_mut(col_idx))
1065                    {
1066                        if let Some(val) = src.get_value(row) {
1067                            dst.push_value(val);
1068                        } else {
1069                            dst.push_value(Value::Null);
1070                        }
1071                    }
1072                }
1073                // Append the update count column
1074                if let Some(dst) = builder.column_mut(self.count_column) {
1075                    dst.push_value(Value::Int64(row_count));
1076                }
1077
1078                builder.advance_row();
1079            }
1080
1081            return Ok(Some(builder.finish()));
1082        }
1083        Ok(None)
1084    }
1085
1086    fn reset(&mut self) {
1087        self.input.reset();
1088    }
1089
1090    fn name(&self) -> &'static str {
1091        "AddLabel"
1092    }
1093}
1094
1095/// Operator that removes labels from nodes.
1096pub struct RemoveLabelOperator {
1097    /// The graph store.
1098    store: Arc<dyn GraphStoreMut>,
1099    /// Child operator providing nodes.
1100    input: Box<dyn Operator>,
1101    /// Column index containing node IDs.
1102    node_column: usize,
1103    /// Labels to remove.
1104    labels: Vec<String>,
1105    /// Output schema.
1106    output_schema: Vec<LogicalType>,
1107    /// Column index for the update count (last column).
1108    count_column: usize,
1109    /// Epoch for MVCC versioning.
1110    viewing_epoch: Option<EpochId>,
1111    /// Transaction ID for undo log tracking.
1112    transaction_id: Option<TransactionId>,
1113    /// Optional write tracker for conflict detection.
1114    write_tracker: Option<SharedWriteTracker>,
1115}
1116
1117impl RemoveLabelOperator {
1118    /// Creates a new remove label operator.
1119    pub fn new(
1120        store: Arc<dyn GraphStoreMut>,
1121        input: Box<dyn Operator>,
1122        node_column: usize,
1123        labels: Vec<String>,
1124        output_schema: Vec<LogicalType>,
1125    ) -> Self {
1126        let count_column = output_schema.len() - 1;
1127        Self {
1128            store,
1129            input,
1130            node_column,
1131            labels,
1132            count_column,
1133            output_schema,
1134            viewing_epoch: None,
1135            transaction_id: None,
1136            write_tracker: None,
1137        }
1138    }
1139
1140    /// Sets the transaction context for versioned label mutations.
1141    pub fn with_transaction_context(
1142        mut self,
1143        epoch: EpochId,
1144        transaction_id: Option<TransactionId>,
1145    ) -> Self {
1146        self.viewing_epoch = Some(epoch);
1147        self.transaction_id = transaction_id;
1148        self
1149    }
1150
1151    /// Sets the write tracker for conflict detection.
1152    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1153        self.write_tracker = Some(tracker);
1154        self
1155    }
1156}
1157
1158impl Operator for RemoveLabelOperator {
1159    fn next(&mut self) -> OperatorResult {
1160        if let Some(chunk) = self.input.next()? {
1161            let mut builder =
1162                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1163
1164            for row in chunk.selected_indices() {
1165                let node_val = chunk
1166                    .column(self.node_column)
1167                    .and_then(|c| c.get_value(row))
1168                    .ok_or_else(|| {
1169                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1170                    })?;
1171
1172                let node_id = match node_val {
1173                    Value::Int64(id) => NodeId(id as u64),
1174                    _ => {
1175                        return Err(OperatorError::TypeMismatch {
1176                            expected: "Int64 (node ID)".to_string(),
1177                            found: format!("{node_val:?}"),
1178                        });
1179                    }
1180                };
1181
1182                // Record write for conflict detection
1183                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1184                    tracker.record_node_write(tid, node_id)?;
1185                }
1186
1187                // Remove all labels
1188                let mut row_count: i64 = 0;
1189                for label in &self.labels {
1190                    let removed = if let Some(tid) = self.transaction_id {
1191                        self.store.remove_label_versioned(node_id, label, tid)
1192                    } else {
1193                        self.store.remove_label(node_id, label)
1194                    };
1195                    if removed {
1196                        row_count += 1;
1197                    }
1198                }
1199
1200                // Copy input columns to output (pass-through)
1201                for col_idx in 0..chunk.column_count() {
1202                    if let (Some(src), Some(dst)) =
1203                        (chunk.column(col_idx), builder.column_mut(col_idx))
1204                    {
1205                        if let Some(val) = src.get_value(row) {
1206                            dst.push_value(val);
1207                        } else {
1208                            dst.push_value(Value::Null);
1209                        }
1210                    }
1211                }
1212                // Append the update count column
1213                if let Some(dst) = builder.column_mut(self.count_column) {
1214                    dst.push_value(Value::Int64(row_count));
1215                }
1216
1217                builder.advance_row();
1218            }
1219
1220            return Ok(Some(builder.finish()));
1221        }
1222        Ok(None)
1223    }
1224
1225    fn reset(&mut self) {
1226        self.input.reset();
1227    }
1228
1229    fn name(&self) -> &'static str {
1230        "RemoveLabel"
1231    }
1232}
1233
1234/// Operator that sets properties on nodes or edges.
1235///
1236/// This operator reads node/edge IDs from a column and sets the
1237/// specified properties on each entity.
1238pub struct SetPropertyOperator {
1239    /// The graph store.
1240    store: Arc<dyn GraphStoreMut>,
1241    /// Child operator providing entities.
1242    input: Box<dyn Operator>,
1243    /// Column index containing entity IDs (node or edge).
1244    entity_column: usize,
1245    /// Whether the entity is an edge (false = node).
1246    is_edge: bool,
1247    /// Properties to set (name -> source).
1248    properties: Vec<(String, PropertySource)>,
1249    /// Output schema.
1250    output_schema: Vec<LogicalType>,
1251    /// Whether to replace all properties (true) or merge (false) for map assignments.
1252    replace: bool,
1253    /// Optional constraint validator for schema enforcement.
1254    validator: Option<Arc<dyn ConstraintValidator>>,
1255    /// Entity labels (for node constraint validation).
1256    labels: Vec<String>,
1257    /// Edge type (for edge constraint validation).
1258    edge_type_name: Option<String>,
1259    /// Epoch for MVCC versioning.
1260    viewing_epoch: Option<EpochId>,
1261    /// Transaction ID for undo log tracking.
1262    transaction_id: Option<TransactionId>,
1263    /// Optional write tracker for conflict detection.
1264    write_tracker: Option<SharedWriteTracker>,
1265}
1266
1267impl SetPropertyOperator {
1268    /// Creates a new set property operator for nodes.
1269    pub fn new_for_node(
1270        store: Arc<dyn GraphStoreMut>,
1271        input: Box<dyn Operator>,
1272        node_column: usize,
1273        properties: Vec<(String, PropertySource)>,
1274        output_schema: Vec<LogicalType>,
1275    ) -> Self {
1276        Self {
1277            store,
1278            input,
1279            entity_column: node_column,
1280            is_edge: false,
1281            properties,
1282            output_schema,
1283            replace: false,
1284            validator: None,
1285            labels: Vec::new(),
1286            edge_type_name: None,
1287            viewing_epoch: None,
1288            transaction_id: None,
1289            write_tracker: None,
1290        }
1291    }
1292
1293    /// Creates a new set property operator for edges.
1294    pub fn new_for_edge(
1295        store: Arc<dyn GraphStoreMut>,
1296        input: Box<dyn Operator>,
1297        edge_column: usize,
1298        properties: Vec<(String, PropertySource)>,
1299        output_schema: Vec<LogicalType>,
1300    ) -> Self {
1301        Self {
1302            store,
1303            input,
1304            entity_column: edge_column,
1305            is_edge: true,
1306            properties,
1307            output_schema,
1308            replace: false,
1309            validator: None,
1310            labels: Vec::new(),
1311            edge_type_name: None,
1312            viewing_epoch: None,
1313            transaction_id: None,
1314            write_tracker: None,
1315        }
1316    }
1317
1318    /// Sets whether this operator replaces all properties (for map assignment).
1319    pub fn with_replace(mut self, replace: bool) -> Self {
1320        self.replace = replace;
1321        self
1322    }
1323
1324    /// Sets the constraint validator for schema enforcement.
1325    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1326        self.validator = Some(validator);
1327        self
1328    }
1329
1330    /// Sets the entity labels (for node constraint validation).
1331    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1332        self.labels = labels;
1333        self
1334    }
1335
1336    /// Sets the edge type name (for edge constraint validation).
1337    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1338        self.edge_type_name = Some(edge_type);
1339        self
1340    }
1341
1342    /// Sets the transaction context for versioned property mutations.
1343    ///
1344    /// When a transaction ID is provided, property changes are recorded in
1345    /// an undo log so they can be restored on rollback.
1346    pub fn with_transaction_context(
1347        mut self,
1348        epoch: EpochId,
1349        transaction_id: Option<TransactionId>,
1350    ) -> Self {
1351        self.viewing_epoch = Some(epoch);
1352        self.transaction_id = transaction_id;
1353        self
1354    }
1355
1356    /// Sets the write tracker for conflict detection.
1357    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1358        self.write_tracker = Some(tracker);
1359        self
1360    }
1361}
1362
1363impl Operator for SetPropertyOperator {
1364    fn next(&mut self) -> OperatorResult {
1365        if let Some(chunk) = self.input.next()? {
1366            let mut builder =
1367                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1368
1369            for row in chunk.selected_indices() {
1370                let entity_val = chunk
1371                    .column(self.entity_column)
1372                    .and_then(|c| c.get_value(row))
1373                    .ok_or_else(|| {
1374                        OperatorError::ColumnNotFound(format!(
1375                            "entity column {}",
1376                            self.entity_column
1377                        ))
1378                    })?;
1379
1380                let entity_id = match entity_val {
1381                    Value::Int64(id) => id as u64,
1382                    _ => {
1383                        return Err(OperatorError::TypeMismatch {
1384                            expected: "Int64 (entity ID)".to_string(),
1385                            found: format!("{entity_val:?}"),
1386                        });
1387                    }
1388                };
1389
1390                // Record write for conflict detection
1391                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1392                    if self.is_edge {
1393                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1394                    } else {
1395                        tracker.record_node_write(tid, NodeId(entity_id))?;
1396                    }
1397                }
1398
1399                // Resolve all property values
1400                let resolved_props: Vec<(String, Value)> = self
1401                    .properties
1402                    .iter()
1403                    .map(|(name, source)| {
1404                        let value =
1405                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1406                        (name.clone(), value)
1407                    })
1408                    .collect();
1409
1410                // Validate constraints before writing
1411                if let Some(ref validator) = self.validator {
1412                    if self.is_edge {
1413                        if let Some(ref et) = self.edge_type_name {
1414                            for (name, value) in &resolved_props {
1415                                validator.validate_edge_property(et, name, value)?;
1416                            }
1417                        }
1418                    } else {
1419                        for (name, value) in &resolved_props {
1420                            validator.validate_node_property(&self.labels, name, value)?;
1421                            validator.check_unique_node_property(&self.labels, name, value)?;
1422                        }
1423                    }
1424                }
1425
1426                // Write all properties (use versioned methods when inside a transaction)
1427                let tx_id = self.transaction_id;
1428                for (prop_name, value) in resolved_props {
1429                    if prop_name == "*" {
1430                        // Map assignment: value should be a Map
1431                        if let Value::Map(map) = value {
1432                            if self.replace {
1433                                // Replace: remove all existing properties first
1434                                if self.is_edge {
1435                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1436                                        let keys: Vec<String> = edge
1437                                            .properties
1438                                            .iter()
1439                                            .map(|(k, _)| k.as_str().to_string())
1440                                            .collect();
1441                                        for key in keys {
1442                                            if let Some(tid) = tx_id {
1443                                                self.store.remove_edge_property_versioned(
1444                                                    EdgeId(entity_id),
1445                                                    &key,
1446                                                    tid,
1447                                                );
1448                                            } else {
1449                                                self.store
1450                                                    .remove_edge_property(EdgeId(entity_id), &key);
1451                                            }
1452                                        }
1453                                    }
1454                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1455                                    let keys: Vec<String> = node
1456                                        .properties
1457                                        .iter()
1458                                        .map(|(k, _)| k.as_str().to_string())
1459                                        .collect();
1460                                    for key in keys {
1461                                        if let Some(tid) = tx_id {
1462                                            self.store.remove_node_property_versioned(
1463                                                NodeId(entity_id),
1464                                                &key,
1465                                                tid,
1466                                            );
1467                                        } else {
1468                                            self.store
1469                                                .remove_node_property(NodeId(entity_id), &key);
1470                                        }
1471                                    }
1472                                }
1473                            }
1474                            // Set each map entry (null values remove the property)
1475                            for (key, val) in map.iter() {
1476                                if val.is_null() {
1477                                    // Null in SET += removes the property (Cypher/GQL semantics)
1478                                    if self.is_edge {
1479                                        if let Some(tid) = tx_id {
1480                                            self.store.remove_edge_property_versioned(
1481                                                EdgeId(entity_id),
1482                                                key.as_str(),
1483                                                tid,
1484                                            );
1485                                        } else {
1486                                            self.store.remove_edge_property(
1487                                                EdgeId(entity_id),
1488                                                key.as_str(),
1489                                            );
1490                                        }
1491                                    } else if let Some(tid) = tx_id {
1492                                        self.store.remove_node_property_versioned(
1493                                            NodeId(entity_id),
1494                                            key.as_str(),
1495                                            tid,
1496                                        );
1497                                    } else {
1498                                        self.store
1499                                            .remove_node_property(NodeId(entity_id), key.as_str());
1500                                    }
1501                                } else if self.is_edge {
1502                                    if let Some(tid) = tx_id {
1503                                        self.store.set_edge_property_versioned(
1504                                            EdgeId(entity_id),
1505                                            key.as_str(),
1506                                            val.clone(),
1507                                            tid,
1508                                        );
1509                                    } else {
1510                                        self.store.set_edge_property(
1511                                            EdgeId(entity_id),
1512                                            key.as_str(),
1513                                            val.clone(),
1514                                        );
1515                                    }
1516                                } else if let Some(tid) = tx_id {
1517                                    self.store.set_node_property_versioned(
1518                                        NodeId(entity_id),
1519                                        key.as_str(),
1520                                        val.clone(),
1521                                        tid,
1522                                    );
1523                                } else {
1524                                    self.store.set_node_property(
1525                                        NodeId(entity_id),
1526                                        key.as_str(),
1527                                        val.clone(),
1528                                    );
1529                                }
1530                            }
1531                        }
1532                    } else if self.is_edge {
1533                        if let Some(tid) = tx_id {
1534                            self.store.set_edge_property_versioned(
1535                                EdgeId(entity_id),
1536                                &prop_name,
1537                                value,
1538                                tid,
1539                            );
1540                        } else {
1541                            self.store
1542                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1543                        }
1544                    } else if let Some(tid) = tx_id {
1545                        self.store.set_node_property_versioned(
1546                            NodeId(entity_id),
1547                            &prop_name,
1548                            value,
1549                            tid,
1550                        );
1551                    } else {
1552                        self.store
1553                            .set_node_property(NodeId(entity_id), &prop_name, value);
1554                    }
1555                }
1556
1557                // Copy input columns to output
1558                for col_idx in 0..chunk.column_count() {
1559                    if let (Some(src), Some(dst)) =
1560                        (chunk.column(col_idx), builder.column_mut(col_idx))
1561                    {
1562                        if let Some(val) = src.get_value(row) {
1563                            dst.push_value(val);
1564                        } else {
1565                            dst.push_value(Value::Null);
1566                        }
1567                    }
1568                }
1569
1570                builder.advance_row();
1571            }
1572
1573            return Ok(Some(builder.finish()));
1574        }
1575        Ok(None)
1576    }
1577
1578    fn reset(&mut self) {
1579        self.input.reset();
1580    }
1581
1582    fn name(&self) -> &'static str {
1583        "SetProperty"
1584    }
1585}
1586
1587#[cfg(all(test, feature = "lpg"))]
1588mod tests {
1589    use super::*;
1590    use crate::execution::DataChunk;
1591    use crate::execution::chunk::DataChunkBuilder;
1592    use crate::graph::lpg::LpgStore;
1593
1594    // ── Helpers ────────────────────────────────────────────────────
1595
1596    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1597        Arc::new(LpgStore::new().unwrap())
1598    }
1599
1600    struct MockInput {
1601        chunk: Option<DataChunk>,
1602    }
1603
1604    impl MockInput {
1605        fn boxed(chunk: DataChunk) -> Box<Self> {
1606            Box::new(Self { chunk: Some(chunk) })
1607        }
1608    }
1609
1610    impl Operator for MockInput {
1611        fn next(&mut self) -> OperatorResult {
1612            Ok(self.chunk.take())
1613        }
1614        fn reset(&mut self) {}
1615        fn name(&self) -> &'static str {
1616            "MockInput"
1617        }
1618    }
1619
1620    struct EmptyInput;
1621    impl Operator for EmptyInput {
1622        fn next(&mut self) -> OperatorResult {
1623            Ok(None)
1624        }
1625        fn reset(&mut self) {}
1626        fn name(&self) -> &'static str {
1627            "EmptyInput"
1628        }
1629    }
1630
1631    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1632        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1633        for id in ids {
1634            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1635            builder.advance_row();
1636        }
1637        builder.finish()
1638    }
1639
1640    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1641        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1642        for id in ids {
1643            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1644            builder.advance_row();
1645        }
1646        builder.finish()
1647    }
1648
1649    // ── CreateNodeOperator ──────────────────────────────────────
1650
1651    #[test]
1652    fn test_create_node_standalone() {
1653        let store = create_test_store();
1654
1655        let mut op = CreateNodeOperator::new(
1656            Arc::clone(&store),
1657            None,
1658            vec!["Person".to_string()],
1659            vec![(
1660                "name".to_string(),
1661                PropertySource::Constant(Value::String("Alix".into())),
1662            )],
1663            vec![LogicalType::Int64],
1664            0,
1665        );
1666
1667        let chunk = op.next().unwrap().unwrap();
1668        assert_eq!(chunk.row_count(), 1);
1669
1670        // Second call should return None (standalone executes once)
1671        assert!(op.next().unwrap().is_none());
1672
1673        assert_eq!(store.node_count(), 1);
1674    }
1675
1676    #[test]
1677    fn test_create_edge() {
1678        let store = create_test_store();
1679
1680        let node1 = store.create_node(&["Person"]);
1681        let node2 = store.create_node(&["Person"]);
1682
1683        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1684        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1685        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1686        builder.advance_row();
1687
1688        let mut op = CreateEdgeOperator::new(
1689            Arc::clone(&store),
1690            MockInput::boxed(builder.finish()),
1691            0,
1692            1,
1693            "KNOWS".to_string(),
1694            vec![LogicalType::Int64, LogicalType::Int64],
1695        );
1696
1697        let _chunk = op.next().unwrap().unwrap();
1698        assert_eq!(store.edge_count(), 1);
1699    }
1700
1701    #[test]
1702    fn test_delete_node() {
1703        let store = create_test_store();
1704
1705        let node_id = store.create_node(&["Person"]);
1706        assert_eq!(store.node_count(), 1);
1707
1708        let mut op = DeleteNodeOperator::new(
1709            Arc::clone(&store),
1710            MockInput::boxed(node_id_chunk(&[node_id])),
1711            0,
1712            vec![LogicalType::Node],
1713            false,
1714        );
1715
1716        let chunk = op.next().unwrap().unwrap();
1717        // Pass-through: output row contains the original node ID
1718        assert_eq!(chunk.row_count(), 1);
1719        assert_eq!(store.node_count(), 0);
1720    }
1721
1722    // ── DeleteEdgeOperator ───────────────────────────────────────
1723
1724    #[test]
1725    fn test_delete_edge() {
1726        let store = create_test_store();
1727
1728        let n1 = store.create_node(&["Person"]);
1729        let n2 = store.create_node(&["Person"]);
1730        let eid = store.create_edge(n1, n2, "KNOWS");
1731        assert_eq!(store.edge_count(), 1);
1732
1733        let mut op = DeleteEdgeOperator::new(
1734            Arc::clone(&store),
1735            MockInput::boxed(edge_id_chunk(&[eid])),
1736            0,
1737            vec![LogicalType::Node],
1738        );
1739
1740        let chunk = op.next().unwrap().unwrap();
1741        assert_eq!(chunk.row_count(), 1);
1742        assert_eq!(store.edge_count(), 0);
1743    }
1744
1745    #[test]
1746    fn test_delete_edge_no_input_returns_none() {
1747        let store = create_test_store();
1748
1749        let mut op = DeleteEdgeOperator::new(
1750            Arc::clone(&store),
1751            Box::new(EmptyInput),
1752            0,
1753            vec![LogicalType::Int64],
1754        );
1755
1756        assert!(op.next().unwrap().is_none());
1757    }
1758
1759    #[test]
1760    fn test_delete_multiple_edges() {
1761        let store = create_test_store();
1762
1763        let n1 = store.create_node(&["N"]);
1764        let n2 = store.create_node(&["N"]);
1765        let e1 = store.create_edge(n1, n2, "R");
1766        let e2 = store.create_edge(n2, n1, "S");
1767        assert_eq!(store.edge_count(), 2);
1768
1769        let mut op = DeleteEdgeOperator::new(
1770            Arc::clone(&store),
1771            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1772            0,
1773            vec![LogicalType::Node],
1774        );
1775
1776        let chunk = op.next().unwrap().unwrap();
1777        assert_eq!(chunk.row_count(), 2);
1778        assert_eq!(store.edge_count(), 0);
1779    }
1780
1781    // ── DeleteNodeOperator with DETACH ───────────────────────────
1782
1783    #[test]
1784    fn test_delete_node_detach() {
1785        let store = create_test_store();
1786
1787        let n1 = store.create_node(&["Person"]);
1788        let n2 = store.create_node(&["Person"]);
1789        store.create_edge(n1, n2, "KNOWS");
1790        store.create_edge(n2, n1, "FOLLOWS");
1791        assert_eq!(store.edge_count(), 2);
1792
1793        let mut op = DeleteNodeOperator::new(
1794            Arc::clone(&store),
1795            MockInput::boxed(node_id_chunk(&[n1])),
1796            0,
1797            vec![LogicalType::Node],
1798            true, // detach = true
1799        );
1800
1801        let chunk = op.next().unwrap().unwrap();
1802        assert_eq!(chunk.row_count(), 1);
1803        assert_eq!(store.node_count(), 1);
1804        assert_eq!(store.edge_count(), 0); // edges detached
1805    }
1806
1807    // ── AddLabelOperator ─────────────────────────────────────────
1808
1809    #[test]
1810    fn test_add_label() {
1811        let store = create_test_store();
1812
1813        let node = store.create_node(&["Person"]);
1814
1815        let mut op = AddLabelOperator::new(
1816            Arc::clone(&store),
1817            MockInput::boxed(node_id_chunk(&[node])),
1818            0,
1819            vec!["Employee".to_string()],
1820            vec![LogicalType::Int64, LogicalType::Int64],
1821        );
1822
1823        let chunk = op.next().unwrap().unwrap();
1824        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1825        assert_eq!(updated, 1);
1826
1827        // Verify label was added
1828        let node_data = store.get_node(node).unwrap();
1829        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1830        assert!(labels.contains(&"Person"));
1831        assert!(labels.contains(&"Employee"));
1832    }
1833
1834    #[test]
1835    fn test_add_multiple_labels() {
1836        let store = create_test_store();
1837
1838        let node = store.create_node(&["Base"]);
1839
1840        let mut op = AddLabelOperator::new(
1841            Arc::clone(&store),
1842            MockInput::boxed(node_id_chunk(&[node])),
1843            0,
1844            vec!["LabelA".to_string(), "LabelB".to_string()],
1845            vec![LogicalType::Int64, LogicalType::Int64],
1846        );
1847
1848        let chunk = op.next().unwrap().unwrap();
1849        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1850        assert_eq!(updated, 2); // 2 labels added
1851
1852        let node_data = store.get_node(node).unwrap();
1853        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1854        assert!(labels.contains(&"LabelA"));
1855        assert!(labels.contains(&"LabelB"));
1856    }
1857
1858    #[test]
1859    fn test_add_label_no_input_returns_none() {
1860        let store = create_test_store();
1861
1862        let mut op = AddLabelOperator::new(
1863            Arc::clone(&store),
1864            Box::new(EmptyInput),
1865            0,
1866            vec!["Foo".to_string()],
1867            vec![LogicalType::Int64, LogicalType::Int64],
1868        );
1869
1870        assert!(op.next().unwrap().is_none());
1871    }
1872
1873    // ── RemoveLabelOperator ──────────────────────────────────────
1874
1875    #[test]
1876    fn test_remove_label() {
1877        let store = create_test_store();
1878
1879        let node = store.create_node(&["Person", "Employee"]);
1880
1881        let mut op = RemoveLabelOperator::new(
1882            Arc::clone(&store),
1883            MockInput::boxed(node_id_chunk(&[node])),
1884            0,
1885            vec!["Employee".to_string()],
1886            vec![LogicalType::Int64, LogicalType::Int64],
1887        );
1888
1889        let chunk = op.next().unwrap().unwrap();
1890        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1891        assert_eq!(updated, 1);
1892
1893        // Verify label was removed
1894        let node_data = store.get_node(node).unwrap();
1895        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1896        assert!(labels.contains(&"Person"));
1897        assert!(!labels.contains(&"Employee"));
1898    }
1899
1900    #[test]
1901    fn test_remove_nonexistent_label() {
1902        let store = create_test_store();
1903
1904        let node = store.create_node(&["Person"]);
1905
1906        let mut op = RemoveLabelOperator::new(
1907            Arc::clone(&store),
1908            MockInput::boxed(node_id_chunk(&[node])),
1909            0,
1910            vec!["NonExistent".to_string()],
1911            vec![LogicalType::Int64, LogicalType::Int64],
1912        );
1913
1914        let chunk = op.next().unwrap().unwrap();
1915        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1916        assert_eq!(updated, 0); // nothing removed
1917    }
1918
1919    // ── SetPropertyOperator ──────────────────────────────────────
1920
1921    #[test]
1922    fn test_set_node_property_constant() {
1923        let store = create_test_store();
1924
1925        let node = store.create_node(&["Person"]);
1926
1927        let mut op = SetPropertyOperator::new_for_node(
1928            Arc::clone(&store),
1929            MockInput::boxed(node_id_chunk(&[node])),
1930            0,
1931            vec![(
1932                "name".to_string(),
1933                PropertySource::Constant(Value::String("Alix".into())),
1934            )],
1935            vec![LogicalType::Int64],
1936        );
1937
1938        let chunk = op.next().unwrap().unwrap();
1939        assert_eq!(chunk.row_count(), 1);
1940
1941        // Verify property was set
1942        let node_data = store.get_node(node).unwrap();
1943        assert_eq!(
1944            node_data
1945                .properties
1946                .get(&grafeo_common::types::PropertyKey::new("name")),
1947            Some(&Value::String("Alix".into()))
1948        );
1949    }
1950
1951    #[test]
1952    fn test_set_node_property_from_column() {
1953        let store = create_test_store();
1954
1955        let node = store.create_node(&["Person"]);
1956
1957        // Input: column 0 = node ID, column 1 = property value
1958        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1959        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1960        builder
1961            .column_mut(1)
1962            .unwrap()
1963            .push_value(Value::String("Gus".into()));
1964        builder.advance_row();
1965
1966        let mut op = SetPropertyOperator::new_for_node(
1967            Arc::clone(&store),
1968            MockInput::boxed(builder.finish()),
1969            0,
1970            vec![("name".to_string(), PropertySource::Column(1))],
1971            vec![LogicalType::Int64, LogicalType::String],
1972        );
1973
1974        let chunk = op.next().unwrap().unwrap();
1975        assert_eq!(chunk.row_count(), 1);
1976
1977        let node_data = store.get_node(node).unwrap();
1978        assert_eq!(
1979            node_data
1980                .properties
1981                .get(&grafeo_common::types::PropertyKey::new("name")),
1982            Some(&Value::String("Gus".into()))
1983        );
1984    }
1985
1986    #[test]
1987    fn test_set_edge_property() {
1988        let store = create_test_store();
1989
1990        let n1 = store.create_node(&["N"]);
1991        let n2 = store.create_node(&["N"]);
1992        let eid = store.create_edge(n1, n2, "KNOWS");
1993
1994        let mut op = SetPropertyOperator::new_for_edge(
1995            Arc::clone(&store),
1996            MockInput::boxed(edge_id_chunk(&[eid])),
1997            0,
1998            vec![(
1999                "weight".to_string(),
2000                PropertySource::Constant(Value::Float64(0.75)),
2001            )],
2002            vec![LogicalType::Int64],
2003        );
2004
2005        let chunk = op.next().unwrap().unwrap();
2006        assert_eq!(chunk.row_count(), 1);
2007
2008        let edge_data = store.get_edge(eid).unwrap();
2009        assert_eq!(
2010            edge_data
2011                .properties
2012                .get(&grafeo_common::types::PropertyKey::new("weight")),
2013            Some(&Value::Float64(0.75))
2014        );
2015    }
2016
2017    #[test]
2018    fn test_set_multiple_properties() {
2019        let store = create_test_store();
2020
2021        let node = store.create_node(&["Person"]);
2022
2023        let mut op = SetPropertyOperator::new_for_node(
2024            Arc::clone(&store),
2025            MockInput::boxed(node_id_chunk(&[node])),
2026            0,
2027            vec![
2028                (
2029                    "name".to_string(),
2030                    PropertySource::Constant(Value::String("Alix".into())),
2031                ),
2032                (
2033                    "age".to_string(),
2034                    PropertySource::Constant(Value::Int64(30)),
2035                ),
2036            ],
2037            vec![LogicalType::Int64],
2038        );
2039
2040        op.next().unwrap().unwrap();
2041
2042        let node_data = store.get_node(node).unwrap();
2043        assert_eq!(
2044            node_data
2045                .properties
2046                .get(&grafeo_common::types::PropertyKey::new("name")),
2047            Some(&Value::String("Alix".into()))
2048        );
2049        assert_eq!(
2050            node_data
2051                .properties
2052                .get(&grafeo_common::types::PropertyKey::new("age")),
2053            Some(&Value::Int64(30))
2054        );
2055    }
2056
2057    #[test]
2058    fn test_set_property_no_input_returns_none() {
2059        let store = create_test_store();
2060
2061        let mut op = SetPropertyOperator::new_for_node(
2062            Arc::clone(&store),
2063            Box::new(EmptyInput),
2064            0,
2065            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2066            vec![LogicalType::Int64],
2067        );
2068
2069        assert!(op.next().unwrap().is_none());
2070    }
2071
2072    // ── Error paths ──────────────────────────────────────────────
2073
2074    #[test]
2075    fn test_delete_node_without_detach_errors_when_edges_exist() {
2076        let store = create_test_store();
2077
2078        let n1 = store.create_node(&["Person"]);
2079        let n2 = store.create_node(&["Person"]);
2080        store.create_edge(n1, n2, "KNOWS");
2081
2082        let mut op = DeleteNodeOperator::new(
2083            Arc::clone(&store),
2084            MockInput::boxed(node_id_chunk(&[n1])),
2085            0,
2086            vec![LogicalType::Int64],
2087            false, // no detach
2088        );
2089
2090        let err = op.next().unwrap_err();
2091        match err {
2092            OperatorError::ConstraintViolation(msg) => {
2093                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2094            }
2095            other => panic!("expected ConstraintViolation, got {other:?}"),
2096        }
2097        // Node should still exist
2098        assert_eq!(store.node_count(), 2);
2099    }
2100
2101    // ── CreateNodeOperator with input ───────────────────────────
2102
2103    #[test]
2104    fn test_create_node_with_input_operator() {
2105        let store = create_test_store();
2106
2107        // Seed node to provide input rows
2108        let existing = store.create_node(&["Seed"]);
2109
2110        let mut op = CreateNodeOperator::new(
2111            Arc::clone(&store),
2112            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2113            vec!["Created".to_string()],
2114            vec![(
2115                "source".to_string(),
2116                PropertySource::Constant(Value::String("from_input".into())),
2117            )],
2118            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2119            1,                                            // output column for new node ID
2120        );
2121
2122        let chunk = op.next().unwrap().unwrap();
2123        assert_eq!(chunk.row_count(), 1);
2124
2125        // Should have created one new node (2 total: Seed + Created)
2126        assert_eq!(store.node_count(), 2);
2127
2128        // Exhausted
2129        assert!(op.next().unwrap().is_none());
2130    }
2131
2132    // ── CreateEdgeOperator with properties and output column ────
2133
2134    #[test]
2135    fn test_create_edge_with_properties_and_output_column() {
2136        let store = create_test_store();
2137
2138        let n1 = store.create_node(&["Person"]);
2139        let n2 = store.create_node(&["Person"]);
2140
2141        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2142        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2143        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2144        builder.advance_row();
2145
2146        let mut op = CreateEdgeOperator::new(
2147            Arc::clone(&store),
2148            MockInput::boxed(builder.finish()),
2149            0,
2150            1,
2151            "KNOWS".to_string(),
2152            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2153        )
2154        .with_properties(vec![(
2155            "since".to_string(),
2156            PropertySource::Constant(Value::Int64(2024)),
2157        )])
2158        .with_output_column(2);
2159
2160        let chunk = op.next().unwrap().unwrap();
2161        assert_eq!(chunk.row_count(), 1);
2162        assert_eq!(store.edge_count(), 1);
2163
2164        // Verify the output chunk contains the edge ID in column 2
2165        let edge_id_raw = chunk
2166            .column(2)
2167            .and_then(|c| c.get_int64(0))
2168            .expect("edge ID should be in output column 2");
2169        let edge_id = EdgeId(edge_id_raw as u64);
2170
2171        // Verify the edge has the property
2172        let edge = store.get_edge(edge_id).expect("edge should exist");
2173        assert_eq!(
2174            edge.properties
2175                .get(&grafeo_common::types::PropertyKey::new("since")),
2176            Some(&Value::Int64(2024))
2177        );
2178    }
2179
2180    // ── SetPropertyOperator with map replacement ────────────────
2181
2182    #[test]
2183    fn test_set_property_map_replace() {
2184        use std::collections::BTreeMap;
2185
2186        let store = create_test_store();
2187
2188        let node = store.create_node(&["Person"]);
2189        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2190
2191        let mut map = BTreeMap::new();
2192        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2193
2194        let mut op = SetPropertyOperator::new_for_node(
2195            Arc::clone(&store),
2196            MockInput::boxed(node_id_chunk(&[node])),
2197            0,
2198            vec![(
2199                "*".to_string(),
2200                PropertySource::Constant(Value::Map(Arc::new(map))),
2201            )],
2202            vec![LogicalType::Int64],
2203        )
2204        .with_replace(true);
2205
2206        op.next().unwrap().unwrap();
2207
2208        let node_data = store.get_node(node).unwrap();
2209        // Old property should be gone
2210        assert!(
2211            node_data
2212                .properties
2213                .get(&PropertyKey::new("old_prop"))
2214                .is_none()
2215        );
2216        // New property should exist
2217        assert_eq!(
2218            node_data.properties.get(&PropertyKey::new("new_key")),
2219            Some(&Value::String("new_val".into()))
2220        );
2221    }
2222
2223    // ── SetPropertyOperator with map merge (no replace) ─────────
2224
2225    #[test]
2226    fn test_set_property_map_merge() {
2227        use std::collections::BTreeMap;
2228
2229        let store = create_test_store();
2230
2231        let node = store.create_node(&["Person"]);
2232        store.set_node_property(node, "existing", Value::Int64(42));
2233
2234        let mut map = BTreeMap::new();
2235        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2236
2237        let mut op = SetPropertyOperator::new_for_node(
2238            Arc::clone(&store),
2239            MockInput::boxed(node_id_chunk(&[node])),
2240            0,
2241            vec![(
2242                "*".to_string(),
2243                PropertySource::Constant(Value::Map(Arc::new(map))),
2244            )],
2245            vec![LogicalType::Int64],
2246        ); // replace defaults to false
2247
2248        op.next().unwrap().unwrap();
2249
2250        let node_data = store.get_node(node).unwrap();
2251        // Existing property should still be there
2252        assert_eq!(
2253            node_data.properties.get(&PropertyKey::new("existing")),
2254            Some(&Value::Int64(42))
2255        );
2256        // New property should also exist
2257        assert_eq!(
2258            node_data.properties.get(&PropertyKey::new("added")),
2259            Some(&Value::String("hello".into()))
2260        );
2261    }
2262
2263    // ── PropertySource::PropertyAccess ──────────────────────────
2264
2265    #[test]
2266    fn test_property_source_property_access() {
2267        let store = create_test_store();
2268
2269        let source_node = store.create_node(&["Source"]);
2270        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2271
2272        let target_node = store.create_node(&["Target"]);
2273
2274        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2275        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2276        builder.column_mut(0).unwrap().push_node_id(source_node);
2277        builder
2278            .column_mut(1)
2279            .unwrap()
2280            .push_int64(target_node.0 as i64);
2281        builder.advance_row();
2282
2283        let mut op = SetPropertyOperator::new_for_node(
2284            Arc::clone(&store),
2285            MockInput::boxed(builder.finish()),
2286            1, // entity column = target node
2287            vec![(
2288                "copied_name".to_string(),
2289                PropertySource::PropertyAccess {
2290                    column: 0,
2291                    property: "name".to_string(),
2292                },
2293            )],
2294            vec![LogicalType::Node, LogicalType::Int64],
2295        );
2296
2297        op.next().unwrap().unwrap();
2298
2299        let target_data = store.get_node(target_node).unwrap();
2300        assert_eq!(
2301            target_data.properties.get(&PropertyKey::new("copied_name")),
2302            Some(&Value::String("Alix".into()))
2303        );
2304    }
2305
2306    // ── ConstraintValidator integration ─────────────────────────
2307
2308    #[test]
2309    fn test_create_node_with_constraint_validator() {
2310        let store = create_test_store();
2311
2312        struct RejectAgeValidator;
2313        impl ConstraintValidator for RejectAgeValidator {
2314            fn validate_node_property(
2315                &self,
2316                _labels: &[String],
2317                key: &str,
2318                _value: &Value,
2319            ) -> Result<(), OperatorError> {
2320                if key == "forbidden" {
2321                    return Err(OperatorError::ConstraintViolation(
2322                        "property 'forbidden' is not allowed".to_string(),
2323                    ));
2324                }
2325                Ok(())
2326            }
2327            fn validate_node_complete(
2328                &self,
2329                _labels: &[String],
2330                _properties: &[(String, Value)],
2331            ) -> Result<(), OperatorError> {
2332                Ok(())
2333            }
2334            fn check_unique_node_property(
2335                &self,
2336                _labels: &[String],
2337                _key: &str,
2338                _value: &Value,
2339            ) -> Result<(), OperatorError> {
2340                Ok(())
2341            }
2342            fn validate_edge_property(
2343                &self,
2344                _edge_type: &str,
2345                _key: &str,
2346                _value: &Value,
2347            ) -> Result<(), OperatorError> {
2348                Ok(())
2349            }
2350            fn validate_edge_complete(
2351                &self,
2352                _edge_type: &str,
2353                _properties: &[(String, Value)],
2354            ) -> Result<(), OperatorError> {
2355                Ok(())
2356            }
2357        }
2358
2359        // Valid property should succeed
2360        let mut op = CreateNodeOperator::new(
2361            Arc::clone(&store),
2362            None,
2363            vec!["Thing".to_string()],
2364            vec![(
2365                "name".to_string(),
2366                PropertySource::Constant(Value::String("ok".into())),
2367            )],
2368            vec![LogicalType::Int64],
2369            0,
2370        )
2371        .with_validator(Arc::new(RejectAgeValidator));
2372
2373        assert!(op.next().is_ok());
2374        assert_eq!(store.node_count(), 1);
2375
2376        // Forbidden property should fail
2377        let mut op = CreateNodeOperator::new(
2378            Arc::clone(&store),
2379            None,
2380            vec!["Thing".to_string()],
2381            vec![(
2382                "forbidden".to_string(),
2383                PropertySource::Constant(Value::Int64(1)),
2384            )],
2385            vec![LogicalType::Int64],
2386            0,
2387        )
2388        .with_validator(Arc::new(RejectAgeValidator));
2389
2390        let err = op.next().unwrap_err();
2391        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2392        // Node count should still be 2 (the node is created before validation, but the error
2393        // propagates - this tests the validation logic fires)
2394    }
2395
2396    // ── Reset behavior ──────────────────────────────────────────
2397
2398    #[test]
2399    fn test_create_node_reset_allows_re_execution() {
2400        let store = create_test_store();
2401
2402        let mut op = CreateNodeOperator::new(
2403            Arc::clone(&store),
2404            None,
2405            vec!["Person".to_string()],
2406            vec![],
2407            vec![LogicalType::Int64],
2408            0,
2409        );
2410
2411        // First execution
2412        assert!(op.next().unwrap().is_some());
2413        assert!(op.next().unwrap().is_none());
2414
2415        // Reset and re-execute
2416        op.reset();
2417        assert!(op.next().unwrap().is_some());
2418
2419        assert_eq!(store.node_count(), 2);
2420    }
2421
2422    // ── Operator name() ──────────────────────────────────────────
2423
2424    #[test]
2425    fn test_operator_names() {
2426        let store = create_test_store();
2427
2428        let op = CreateNodeOperator::new(
2429            Arc::clone(&store),
2430            None,
2431            vec![],
2432            vec![],
2433            vec![LogicalType::Int64],
2434            0,
2435        );
2436        assert_eq!(op.name(), "CreateNode");
2437
2438        let op = CreateEdgeOperator::new(
2439            Arc::clone(&store),
2440            Box::new(EmptyInput),
2441            0,
2442            1,
2443            "R".to_string(),
2444            vec![LogicalType::Int64],
2445        );
2446        assert_eq!(op.name(), "CreateEdge");
2447
2448        let op = DeleteNodeOperator::new(
2449            Arc::clone(&store),
2450            Box::new(EmptyInput),
2451            0,
2452            vec![LogicalType::Int64],
2453            false,
2454        );
2455        assert_eq!(op.name(), "DeleteNode");
2456
2457        let op = DeleteEdgeOperator::new(
2458            Arc::clone(&store),
2459            Box::new(EmptyInput),
2460            0,
2461            vec![LogicalType::Int64],
2462        );
2463        assert_eq!(op.name(), "DeleteEdge");
2464
2465        let op = AddLabelOperator::new(
2466            Arc::clone(&store),
2467            Box::new(EmptyInput),
2468            0,
2469            vec!["L".to_string()],
2470            vec![LogicalType::Int64],
2471        );
2472        assert_eq!(op.name(), "AddLabel");
2473
2474        let op = RemoveLabelOperator::new(
2475            Arc::clone(&store),
2476            Box::new(EmptyInput),
2477            0,
2478            vec!["L".to_string()],
2479            vec![LogicalType::Int64],
2480        );
2481        assert_eq!(op.name(), "RemoveLabel");
2482
2483        let op = SetPropertyOperator::new_for_node(
2484            Arc::clone(&store),
2485            Box::new(EmptyInput),
2486            0,
2487            vec![],
2488            vec![LogicalType::Int64],
2489        );
2490        assert_eq!(op.name(), "SetProperty");
2491    }
2492}