Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    ///
28    /// # Errors
29    ///
30    /// Returns `Err` if the value type is incompatible or a NOT NULL constraint is violated.
31    fn validate_node_property(
32        &self,
33        labels: &[String],
34        key: &str,
35        value: &Value,
36    ) -> Result<(), OperatorError>;
37
38    /// Validates that all required properties are present after creating a node.
39    ///
40    /// Checks NOT NULL constraints for properties that were not explicitly set.
41    ///
42    /// # Errors
43    ///
44    /// Returns `Err` if a required (NOT NULL) property is missing.
45    fn validate_node_complete(
46        &self,
47        labels: &[String],
48        properties: &[(String, Value)],
49    ) -> Result<(), OperatorError>;
50
51    /// Checks UNIQUE constraint for a node property value.
52    ///
53    /// # Errors
54    ///
55    /// Returns `Err` if a node with the same label already has this value.
56    fn check_unique_node_property(
57        &self,
58        labels: &[String],
59        key: &str,
60        value: &Value,
61    ) -> Result<(), OperatorError>;
62
63    /// Validates a single property value for an edge of the given type.
64    ///
65    /// # Errors
66    ///
67    /// Returns `Err` if the value type is incompatible with the edge schema.
68    fn validate_edge_property(
69        &self,
70        edge_type: &str,
71        key: &str,
72        value: &Value,
73    ) -> Result<(), OperatorError>;
74
75    /// Validates that all required properties are present after creating an edge.
76    ///
77    /// # Errors
78    ///
79    /// Returns `Err` if a required property is missing from the edge.
80    fn validate_edge_complete(
81        &self,
82        edge_type: &str,
83        properties: &[(String, Value)],
84    ) -> Result<(), OperatorError>;
85
86    /// Validates that the node labels are allowed by the bound graph type.
87    ///
88    /// # Errors
89    ///
90    /// Returns `Err` if any label is not defined in the graph type.
91    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92        let _ = labels;
93        Ok(())
94    }
95
96    /// Validates that the edge type is allowed by the bound graph type.
97    ///
98    /// # Errors
99    ///
100    /// Returns `Err` if the edge type is not defined in the graph type.
101    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102        let _ = edge_type;
103        Ok(())
104    }
105
106    /// Validates that edge endpoints have the correct node type labels.
107    ///
108    /// # Errors
109    ///
110    /// Returns `Err` if the source or target node labels do not match the edge type definition.
111    fn validate_edge_endpoints(
112        &self,
113        edge_type: &str,
114        source_labels: &[String],
115        target_labels: &[String],
116    ) -> Result<(), OperatorError> {
117        let _ = (edge_type, source_labels, target_labels);
118        Ok(())
119    }
120
121    /// Injects default values for properties that are defined in a type but
122    /// not explicitly provided.
123    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124        let _ = (labels, properties);
125    }
126}
127
128/// Operator that creates new nodes.
129///
130/// For each input row, creates a new node with the specified labels
131/// and properties, then outputs the row with the new node.
132pub struct CreateNodeOperator {
133    /// The graph store to modify.
134    store: Arc<dyn GraphStoreMut>,
135    /// Input operator.
136    input: Option<Box<dyn Operator>>,
137    /// Labels for the new nodes.
138    labels: Vec<String>,
139    /// Properties to set (name -> column index or constant value).
140    properties: Vec<(String, PropertySource)>,
141    /// Output schema.
142    output_schema: Vec<LogicalType>,
143    /// Column index for the created node variable.
144    output_column: usize,
145    /// Whether this operator has been executed (for no-input case).
146    executed: bool,
147    /// Epoch for MVCC versioning.
148    viewing_epoch: Option<EpochId>,
149    /// Transaction ID for MVCC versioning.
150    transaction_id: Option<TransactionId>,
151    /// Optional constraint validator for schema enforcement.
152    validator: Option<Arc<dyn ConstraintValidator>>,
153    /// Optional write tracker for conflict detection.
154    write_tracker: Option<SharedWriteTracker>,
155}
156
157/// Source for a property value.
158#[derive(Debug, Clone)]
159pub enum PropertySource {
160    /// Get value from an input column.
161    Column(usize),
162    /// Use a constant value.
163    Constant(Value),
164    /// Access a named property from a map/node/edge in an input column.
165    PropertyAccess {
166        /// The column containing the map, node ID, or edge ID.
167        column: usize,
168        /// The property name to extract.
169        property: String,
170    },
171}
172
173impl PropertySource {
174    /// Resolves a property value from a data chunk row.
175    pub fn resolve(
176        &self,
177        chunk: &crate::execution::chunk::DataChunk,
178        row: usize,
179        store: &dyn GraphStore,
180    ) -> Value {
181        match self {
182            PropertySource::Column(col_idx) => chunk
183                .column(*col_idx)
184                .and_then(|c| c.get_value(row))
185                .unwrap_or(Value::Null),
186            PropertySource::Constant(v) => v.clone(),
187            PropertySource::PropertyAccess { column, property } => {
188                let Some(col) = chunk.column(*column) else {
189                    return Value::Null;
190                };
191                // Try node ID first, then edge ID, then map value
192                if let Some(node_id) = col.get_node_id(row) {
193                    store
194                        .get_node(node_id)
195                        .and_then(|node| node.get_property(property).cloned())
196                        .unwrap_or(Value::Null)
197                } else if let Some(edge_id) = col.get_edge_id(row) {
198                    store
199                        .get_edge(edge_id)
200                        .and_then(|edge| edge.get_property(property).cloned())
201                        .unwrap_or(Value::Null)
202                } else if let Some(Value::Map(map)) = col.get_value(row) {
203                    let key = PropertyKey::new(property);
204                    map.get(&key).cloned().unwrap_or(Value::Null)
205                } else {
206                    Value::Null
207                }
208            }
209        }
210    }
211}
212
213impl CreateNodeOperator {
214    /// Creates a new node creation operator.
215    ///
216    /// # Arguments
217    /// * `store` - The graph store to modify.
218    /// * `input` - Optional input operator (None for standalone CREATE).
219    /// * `labels` - Labels to assign to created nodes.
220    /// * `properties` - Properties to set on created nodes.
221    /// * `output_schema` - Schema of the output.
222    /// * `output_column` - Column index where the created node ID goes.
223    pub fn new(
224        store: Arc<dyn GraphStoreMut>,
225        input: Option<Box<dyn Operator>>,
226        labels: Vec<String>,
227        properties: Vec<(String, PropertySource)>,
228        output_schema: Vec<LogicalType>,
229        output_column: usize,
230    ) -> Self {
231        Self {
232            store,
233            input,
234            labels,
235            properties,
236            output_schema,
237            output_column,
238            executed: false,
239            viewing_epoch: None,
240            transaction_id: None,
241            validator: None,
242            write_tracker: None,
243        }
244    }
245
246    /// Sets the transaction context for MVCC versioning.
247    pub fn with_transaction_context(
248        mut self,
249        epoch: EpochId,
250        transaction_id: Option<TransactionId>,
251    ) -> Self {
252        self.viewing_epoch = Some(epoch);
253        self.transaction_id = transaction_id;
254        self
255    }
256
257    /// Sets the constraint validator for schema enforcement.
258    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
259        self.validator = Some(validator);
260        self
261    }
262
263    /// Sets the write tracker for conflict detection.
264    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
265        self.write_tracker = Some(tracker);
266        self
267    }
268}
269
270impl CreateNodeOperator {
271    /// Validates and sets properties on a newly created node.
272    fn validate_and_set_properties(
273        &self,
274        node_id: NodeId,
275        resolved_props: &mut Vec<(String, Value)>,
276    ) -> Result<(), OperatorError> {
277        // Phase 0: Validate that node labels are allowed by the bound graph type
278        if let Some(ref validator) = self.validator {
279            validator.validate_node_labels_allowed(&self.labels)?;
280        }
281
282        // Phase 0.5: Inject defaults for properties not explicitly provided
283        if let Some(ref validator) = self.validator {
284            validator.inject_defaults(&self.labels, resolved_props);
285        }
286
287        // Phase 1: Validate each property value
288        if let Some(ref validator) = self.validator {
289            for (name, value) in resolved_props.iter() {
290                validator.validate_node_property(&self.labels, name, value)?;
291                validator.check_unique_node_property(&self.labels, name, value)?;
292            }
293            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
294            validator.validate_node_complete(&self.labels, resolved_props)?;
295        }
296
297        // Phase 3: Write properties to the store
298        if let Some(tid) = self.transaction_id {
299            for (name, value) in resolved_props.iter() {
300                self.store
301                    .set_node_property_versioned(node_id, name, value.clone(), tid);
302            }
303        } else {
304            for (name, value) in resolved_props.iter() {
305                self.store.set_node_property(node_id, name, value.clone());
306            }
307        }
308        Ok(())
309    }
310}
311
312impl Operator for CreateNodeOperator {
313    fn next(&mut self) -> OperatorResult {
314        // Get transaction context for versioned creation
315        let epoch = self
316            .viewing_epoch
317            .unwrap_or_else(|| self.store.current_epoch());
318        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
319
320        if let Some(ref mut input) = self.input {
321            // For each input row, create a node
322            if let Some(chunk) = input.next()? {
323                let mut builder =
324                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
325
326                for row in chunk.selected_indices() {
327                    // Resolve all property values first (before creating node)
328                    let mut resolved_props: Vec<(String, Value)> = self
329                        .properties
330                        .iter()
331                        .map(|(name, source)| {
332                            let value =
333                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
334                            (name.clone(), value)
335                        })
336                        .collect();
337
338                    // Create the node with MVCC versioning
339                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
340                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
341
342                    // Record write for conflict detection
343                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
344                        tracker.record_node_write(tid, node_id)?;
345                    }
346
347                    // Validate and set properties
348                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
349
350                    // Copy input columns to output
351                    for col_idx in 0..chunk.column_count() {
352                        if col_idx < self.output_column
353                            && let (Some(src), Some(dst)) =
354                                (chunk.column(col_idx), builder.column_mut(col_idx))
355                        {
356                            if let Some(val) = src.get_value(row) {
357                                dst.push_value(val);
358                            } else {
359                                dst.push_value(Value::Null);
360                            }
361                        }
362                    }
363
364                    // Add the new node ID
365                    if let Some(dst) = builder.column_mut(self.output_column) {
366                        dst.push_value(Value::Int64(node_id.0 as i64));
367                    }
368
369                    builder.advance_row();
370                }
371
372                return Ok(Some(builder.finish()));
373            }
374            Ok(None)
375        } else {
376            // No input - create a single node
377            if self.executed {
378                return Ok(None);
379            }
380            self.executed = true;
381
382            // Resolve constant properties
383            let mut resolved_props: Vec<(String, Value)> = self
384                .properties
385                .iter()
386                .filter_map(|(name, source)| {
387                    if let PropertySource::Constant(value) = source {
388                        Some((name.clone(), value.clone()))
389                    } else {
390                        None
391                    }
392                })
393                .collect();
394
395            // Create the node with MVCC versioning
396            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
397            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
398
399            // Record write for conflict detection
400            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
401                tracker.record_node_write(tid, node_id)?;
402            }
403
404            // Validate and set properties
405            self.validate_and_set_properties(node_id, &mut resolved_props)?;
406
407            // Build output chunk with just the node ID
408            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
409            if let Some(dst) = builder.column_mut(self.output_column) {
410                dst.push_value(Value::Int64(node_id.0 as i64));
411            }
412            builder.advance_row();
413
414            Ok(Some(builder.finish()))
415        }
416    }
417
418    fn reset(&mut self) {
419        if let Some(ref mut input) = self.input {
420            input.reset();
421        }
422        self.executed = false;
423    }
424
425    fn name(&self) -> &'static str {
426        "CreateNode"
427    }
428}
429
430/// Operator that creates new edges.
431pub struct CreateEdgeOperator {
432    /// The graph store to modify.
433    store: Arc<dyn GraphStoreMut>,
434    /// Input operator.
435    input: Box<dyn Operator>,
436    /// Column index for the source node.
437    from_column: usize,
438    /// Column index for the target node.
439    to_column: usize,
440    /// Edge type.
441    edge_type: String,
442    /// Properties to set.
443    properties: Vec<(String, PropertySource)>,
444    /// Output schema.
445    output_schema: Vec<LogicalType>,
446    /// Column index for the created edge variable (if any).
447    output_column: Option<usize>,
448    /// Epoch for MVCC versioning.
449    viewing_epoch: Option<EpochId>,
450    /// Transaction ID for MVCC versioning.
451    transaction_id: Option<TransactionId>,
452    /// Optional constraint validator for schema enforcement.
453    validator: Option<Arc<dyn ConstraintValidator>>,
454    /// Optional write tracker for conflict detection.
455    write_tracker: Option<SharedWriteTracker>,
456}
457
458impl CreateEdgeOperator {
459    /// Creates a new edge creation operator.
460    ///
461    /// Use builder methods to set additional options:
462    /// - [`with_properties`](Self::with_properties) - set edge properties
463    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
464    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
465    pub fn new(
466        store: Arc<dyn GraphStoreMut>,
467        input: Box<dyn Operator>,
468        from_column: usize,
469        to_column: usize,
470        edge_type: String,
471        output_schema: Vec<LogicalType>,
472    ) -> Self {
473        Self {
474            store,
475            input,
476            from_column,
477            to_column,
478            edge_type,
479            properties: Vec::new(),
480            output_schema,
481            output_column: None,
482            viewing_epoch: None,
483            transaction_id: None,
484            validator: None,
485            write_tracker: None,
486        }
487    }
488
489    /// Sets the properties to assign to created edges.
490    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
491        self.properties = properties;
492        self
493    }
494
495    /// Sets the output column for the created edge ID.
496    pub fn with_output_column(mut self, column: usize) -> Self {
497        self.output_column = Some(column);
498        self
499    }
500
501    /// Sets the transaction context for MVCC versioning.
502    pub fn with_transaction_context(
503        mut self,
504        epoch: EpochId,
505        transaction_id: Option<TransactionId>,
506    ) -> Self {
507        self.viewing_epoch = Some(epoch);
508        self.transaction_id = transaction_id;
509        self
510    }
511
512    /// Sets the constraint validator for schema enforcement.
513    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
514        self.validator = Some(validator);
515        self
516    }
517
518    /// Sets the write tracker for conflict detection.
519    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
520        self.write_tracker = Some(tracker);
521        self
522    }
523}
524
525impl Operator for CreateEdgeOperator {
526    fn next(&mut self) -> OperatorResult {
527        // Get transaction context for versioned creation
528        let epoch = self
529            .viewing_epoch
530            .unwrap_or_else(|| self.store.current_epoch());
531        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
532
533        if let Some(chunk) = self.input.next()? {
534            let mut builder =
535                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
536
537            for row in chunk.selected_indices() {
538                // Get source and target node IDs
539                let from_id = chunk
540                    .column(self.from_column)
541                    .and_then(|c| c.get_value(row))
542                    .ok_or_else(|| {
543                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
544                    })?;
545
546                let to_id = chunk
547                    .column(self.to_column)
548                    .and_then(|c| c.get_value(row))
549                    .ok_or_else(|| {
550                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
551                    })?;
552
553                // Extract node IDs
554                let from_node_id = match from_id {
555                    Value::Int64(id) => NodeId(id as u64),
556                    _ => {
557                        return Err(OperatorError::TypeMismatch {
558                            expected: "Int64 (node ID)".to_string(),
559                            found: format!("{from_id:?}"),
560                        });
561                    }
562                };
563
564                let to_node_id = match to_id {
565                    Value::Int64(id) => NodeId(id as u64),
566                    _ => {
567                        return Err(OperatorError::TypeMismatch {
568                            expected: "Int64 (node ID)".to_string(),
569                            found: format!("{to_id:?}"),
570                        });
571                    }
572                };
573
574                // Validate graph type and edge endpoint constraints
575                if let Some(ref validator) = self.validator {
576                    validator.validate_edge_type_allowed(&self.edge_type)?;
577
578                    // Look up source and target node labels for endpoint validation
579                    let source_labels: Vec<String> = self
580                        .store
581                        .get_node(from_node_id)
582                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
583                        .unwrap_or_default();
584                    let target_labels: Vec<String> = self
585                        .store
586                        .get_node(to_node_id)
587                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
588                        .unwrap_or_default();
589                    validator.validate_edge_endpoints(
590                        &self.edge_type,
591                        &source_labels,
592                        &target_labels,
593                    )?;
594                }
595
596                // Resolve property values
597                let resolved_props: Vec<(String, Value)> = self
598                    .properties
599                    .iter()
600                    .map(|(name, source)| {
601                        let value =
602                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
603                        (name.clone(), value)
604                    })
605                    .collect();
606
607                // Validate constraints before writing
608                if let Some(ref validator) = self.validator {
609                    for (name, value) in &resolved_props {
610                        validator.validate_edge_property(&self.edge_type, name, value)?;
611                    }
612                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
613                }
614
615                // Create the edge with MVCC versioning
616                let edge_id = self.store.create_edge_versioned(
617                    from_node_id,
618                    to_node_id,
619                    &self.edge_type,
620                    epoch,
621                    tx,
622                );
623
624                // Record write for conflict detection
625                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
626                    tracker.record_edge_write(tid, edge_id)?;
627                }
628
629                // Set properties
630                if let Some(tid) = self.transaction_id {
631                    for (name, value) in resolved_props {
632                        self.store
633                            .set_edge_property_versioned(edge_id, &name, value, tid);
634                    }
635                } else {
636                    for (name, value) in resolved_props {
637                        self.store.set_edge_property(edge_id, &name, value);
638                    }
639                }
640
641                // Copy input columns
642                for col_idx in 0..chunk.column_count() {
643                    if let (Some(src), Some(dst)) =
644                        (chunk.column(col_idx), builder.column_mut(col_idx))
645                    {
646                        if let Some(val) = src.get_value(row) {
647                            dst.push_value(val);
648                        } else {
649                            dst.push_value(Value::Null);
650                        }
651                    }
652                }
653
654                // Add edge ID if requested
655                if let Some(out_col) = self.output_column
656                    && let Some(dst) = builder.column_mut(out_col)
657                {
658                    dst.push_value(Value::Int64(edge_id.0 as i64));
659                }
660
661                builder.advance_row();
662            }
663
664            return Ok(Some(builder.finish()));
665        }
666        Ok(None)
667    }
668
669    fn reset(&mut self) {
670        self.input.reset();
671    }
672
673    fn name(&self) -> &'static str {
674        "CreateEdge"
675    }
676}
677
678/// Operator that deletes nodes.
679pub struct DeleteNodeOperator {
680    /// The graph store to modify.
681    store: Arc<dyn GraphStoreMut>,
682    /// Input operator.
683    input: Box<dyn Operator>,
684    /// Column index for the node to delete.
685    node_column: usize,
686    /// Output schema.
687    output_schema: Vec<LogicalType>,
688    /// Whether to detach (delete connected edges) before deleting.
689    detach: bool,
690    /// Epoch for MVCC versioning.
691    viewing_epoch: Option<EpochId>,
692    /// Transaction ID for MVCC versioning.
693    transaction_id: Option<TransactionId>,
694    /// Optional write tracker for conflict detection.
695    write_tracker: Option<SharedWriteTracker>,
696}
697
698impl DeleteNodeOperator {
699    /// Creates a new node deletion operator.
700    pub fn new(
701        store: Arc<dyn GraphStoreMut>,
702        input: Box<dyn Operator>,
703        node_column: usize,
704        output_schema: Vec<LogicalType>,
705        detach: bool,
706    ) -> Self {
707        Self {
708            store,
709            input,
710            node_column,
711            output_schema,
712            detach,
713            viewing_epoch: None,
714            transaction_id: None,
715            write_tracker: None,
716        }
717    }
718
719    /// Sets the transaction context for MVCC versioning.
720    pub fn with_transaction_context(
721        mut self,
722        epoch: EpochId,
723        transaction_id: Option<TransactionId>,
724    ) -> Self {
725        self.viewing_epoch = Some(epoch);
726        self.transaction_id = transaction_id;
727        self
728    }
729
730    /// Sets the write tracker for conflict detection.
731    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
732        self.write_tracker = Some(tracker);
733        self
734    }
735}
736
737impl Operator for DeleteNodeOperator {
738    fn next(&mut self) -> OperatorResult {
739        // Get transaction context for versioned deletion
740        let epoch = self
741            .viewing_epoch
742            .unwrap_or_else(|| self.store.current_epoch());
743        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
744
745        if let Some(chunk) = self.input.next()? {
746            let mut builder =
747                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
748
749            for row in chunk.selected_indices() {
750                let node_val = chunk
751                    .column(self.node_column)
752                    .and_then(|c| c.get_value(row))
753                    .ok_or_else(|| {
754                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
755                    })?;
756
757                let node_id = match node_val {
758                    Value::Int64(id) => NodeId(id as u64),
759                    _ => {
760                        return Err(OperatorError::TypeMismatch {
761                            expected: "Int64 (node ID)".to_string(),
762                            found: format!("{node_val:?}"),
763                        });
764                    }
765                };
766
767                if self.detach {
768                    // Delete all connected edges first, using versioned deletion
769                    // so rollback can restore them
770                    let outgoing = self
771                        .store
772                        .edges_from(node_id, crate::graph::Direction::Outgoing);
773                    let incoming = self
774                        .store
775                        .edges_from(node_id, crate::graph::Direction::Incoming);
776                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
777                        self.store.delete_edge_versioned(edge_id, epoch, tx);
778                        if let (Some(tracker), Some(tid)) =
779                            (&self.write_tracker, self.transaction_id)
780                        {
781                            tracker.record_edge_write(tid, edge_id)?;
782                        }
783                    }
784                } else {
785                    // NODETACH: check that node has no connected edges
786                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
787                    if degree > 0 {
788                        return Err(OperatorError::ConstraintViolation(format!(
789                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
790                            degree
791                        )));
792                    }
793                }
794
795                // Delete the node with MVCC versioning
796                self.store.delete_node_versioned(node_id, epoch, tx);
797
798                // Record write for conflict detection
799                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
800                    tracker.record_node_write(tid, node_id)?;
801                }
802
803                // Pass through all input columns so downstream RETURN can
804                // reference the variable (e.g., count(n) after DELETE n).
805                for col_idx in 0..chunk.column_count() {
806                    if let (Some(src), Some(dst)) =
807                        (chunk.column(col_idx), builder.column_mut(col_idx))
808                    {
809                        if let Some(val) = src.get_value(row) {
810                            dst.push_value(val);
811                        } else {
812                            dst.push_value(Value::Null);
813                        }
814                    }
815                }
816                builder.advance_row();
817            }
818
819            return Ok(Some(builder.finish()));
820        }
821        Ok(None)
822    }
823
824    fn reset(&mut self) {
825        self.input.reset();
826    }
827
828    fn name(&self) -> &'static str {
829        "DeleteNode"
830    }
831}
832
833/// Operator that deletes edges.
834pub struct DeleteEdgeOperator {
835    /// The graph store to modify.
836    store: Arc<dyn GraphStoreMut>,
837    /// Input operator.
838    input: Box<dyn Operator>,
839    /// Column index for the edge to delete.
840    edge_column: usize,
841    /// Output schema.
842    output_schema: Vec<LogicalType>,
843    /// Epoch for MVCC versioning.
844    viewing_epoch: Option<EpochId>,
845    /// Transaction ID for MVCC versioning.
846    transaction_id: Option<TransactionId>,
847    /// Optional write tracker for conflict detection.
848    write_tracker: Option<SharedWriteTracker>,
849}
850
851impl DeleteEdgeOperator {
852    /// Creates a new edge deletion operator.
853    pub fn new(
854        store: Arc<dyn GraphStoreMut>,
855        input: Box<dyn Operator>,
856        edge_column: usize,
857        output_schema: Vec<LogicalType>,
858    ) -> Self {
859        Self {
860            store,
861            input,
862            edge_column,
863            output_schema,
864            viewing_epoch: None,
865            transaction_id: None,
866            write_tracker: None,
867        }
868    }
869
870    /// Sets the transaction context for MVCC versioning.
871    pub fn with_transaction_context(
872        mut self,
873        epoch: EpochId,
874        transaction_id: Option<TransactionId>,
875    ) -> Self {
876        self.viewing_epoch = Some(epoch);
877        self.transaction_id = transaction_id;
878        self
879    }
880
881    /// Sets the write tracker for conflict detection.
882    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
883        self.write_tracker = Some(tracker);
884        self
885    }
886}
887
888impl Operator for DeleteEdgeOperator {
889    fn next(&mut self) -> OperatorResult {
890        // Get transaction context for versioned deletion
891        let epoch = self
892            .viewing_epoch
893            .unwrap_or_else(|| self.store.current_epoch());
894        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
895
896        if let Some(chunk) = self.input.next()? {
897            let mut builder =
898                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
899
900            for row in chunk.selected_indices() {
901                let edge_val = chunk
902                    .column(self.edge_column)
903                    .and_then(|c| c.get_value(row))
904                    .ok_or_else(|| {
905                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
906                    })?;
907
908                let edge_id = match edge_val {
909                    Value::Int64(id) => EdgeId(id as u64),
910                    _ => {
911                        return Err(OperatorError::TypeMismatch {
912                            expected: "Int64 (edge ID)".to_string(),
913                            found: format!("{edge_val:?}"),
914                        });
915                    }
916                };
917
918                // Delete the edge with MVCC versioning
919                self.store.delete_edge_versioned(edge_id, epoch, tx);
920
921                // Record write for conflict detection
922                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
923                    tracker.record_edge_write(tid, edge_id)?;
924                }
925
926                // Pass through all input columns
927                for col_idx in 0..chunk.column_count() {
928                    if let (Some(src), Some(dst)) =
929                        (chunk.column(col_idx), builder.column_mut(col_idx))
930                    {
931                        if let Some(val) = src.get_value(row) {
932                            dst.push_value(val);
933                        } else {
934                            dst.push_value(Value::Null);
935                        }
936                    }
937                }
938                builder.advance_row();
939            }
940
941            return Ok(Some(builder.finish()));
942        }
943        Ok(None)
944    }
945
946    fn reset(&mut self) {
947        self.input.reset();
948    }
949
950    fn name(&self) -> &'static str {
951        "DeleteEdge"
952    }
953}
954
955/// Operator that adds labels to nodes.
956pub struct AddLabelOperator {
957    /// The graph store.
958    store: Arc<dyn GraphStoreMut>,
959    /// Child operator providing nodes.
960    input: Box<dyn Operator>,
961    /// Column index containing node IDs.
962    node_column: usize,
963    /// Labels to add.
964    labels: Vec<String>,
965    /// Output schema.
966    output_schema: Vec<LogicalType>,
967    /// Column index for the update count (last column).
968    count_column: usize,
969    /// Epoch for MVCC versioning.
970    viewing_epoch: Option<EpochId>,
971    /// Transaction ID for undo log tracking.
972    transaction_id: Option<TransactionId>,
973    /// Optional write tracker for conflict detection.
974    write_tracker: Option<SharedWriteTracker>,
975}
976
977impl AddLabelOperator {
978    /// Creates a new add label operator.
979    pub fn new(
980        store: Arc<dyn GraphStoreMut>,
981        input: Box<dyn Operator>,
982        node_column: usize,
983        labels: Vec<String>,
984        output_schema: Vec<LogicalType>,
985    ) -> Self {
986        let count_column = output_schema.len() - 1;
987        Self {
988            store,
989            input,
990            node_column,
991            labels,
992            count_column,
993            output_schema,
994            viewing_epoch: None,
995            transaction_id: None,
996            write_tracker: None,
997        }
998    }
999
1000    /// Sets the transaction context for versioned label mutations.
1001    pub fn with_transaction_context(
1002        mut self,
1003        epoch: EpochId,
1004        transaction_id: Option<TransactionId>,
1005    ) -> Self {
1006        self.viewing_epoch = Some(epoch);
1007        self.transaction_id = transaction_id;
1008        self
1009    }
1010
1011    /// Sets the write tracker for conflict detection.
1012    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1013        self.write_tracker = Some(tracker);
1014        self
1015    }
1016}
1017
1018impl Operator for AddLabelOperator {
1019    fn next(&mut self) -> OperatorResult {
1020        if let Some(chunk) = self.input.next()? {
1021            let mut builder =
1022                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1023
1024            for row in chunk.selected_indices() {
1025                let node_val = chunk
1026                    .column(self.node_column)
1027                    .and_then(|c| c.get_value(row))
1028                    .ok_or_else(|| {
1029                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1030                    })?;
1031
1032                let node_id = match node_val {
1033                    Value::Int64(id) => NodeId(id as u64),
1034                    _ => {
1035                        return Err(OperatorError::TypeMismatch {
1036                            expected: "Int64 (node ID)".to_string(),
1037                            found: format!("{node_val:?}"),
1038                        });
1039                    }
1040                };
1041
1042                // Record write for conflict detection
1043                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1044                    tracker.record_node_write(tid, node_id)?;
1045                }
1046
1047                // Add all labels
1048                let mut row_count: i64 = 0;
1049                for label in &self.labels {
1050                    let added = if let Some(tid) = self.transaction_id {
1051                        self.store.add_label_versioned(node_id, label, tid)
1052                    } else {
1053                        self.store.add_label(node_id, label)
1054                    };
1055                    if added {
1056                        row_count += 1;
1057                    }
1058                }
1059
1060                // Copy input columns to output (pass-through)
1061                for col_idx in 0..chunk.column_count() {
1062                    if let (Some(src), Some(dst)) =
1063                        (chunk.column(col_idx), builder.column_mut(col_idx))
1064                    {
1065                        if let Some(val) = src.get_value(row) {
1066                            dst.push_value(val);
1067                        } else {
1068                            dst.push_value(Value::Null);
1069                        }
1070                    }
1071                }
1072                // Append the update count column
1073                if let Some(dst) = builder.column_mut(self.count_column) {
1074                    dst.push_value(Value::Int64(row_count));
1075                }
1076
1077                builder.advance_row();
1078            }
1079
1080            return Ok(Some(builder.finish()));
1081        }
1082        Ok(None)
1083    }
1084
1085    fn reset(&mut self) {
1086        self.input.reset();
1087    }
1088
1089    fn name(&self) -> &'static str {
1090        "AddLabel"
1091    }
1092}
1093
1094/// Operator that removes labels from nodes.
1095pub struct RemoveLabelOperator {
1096    /// The graph store.
1097    store: Arc<dyn GraphStoreMut>,
1098    /// Child operator providing nodes.
1099    input: Box<dyn Operator>,
1100    /// Column index containing node IDs.
1101    node_column: usize,
1102    /// Labels to remove.
1103    labels: Vec<String>,
1104    /// Output schema.
1105    output_schema: Vec<LogicalType>,
1106    /// Column index for the update count (last column).
1107    count_column: usize,
1108    /// Epoch for MVCC versioning.
1109    viewing_epoch: Option<EpochId>,
1110    /// Transaction ID for undo log tracking.
1111    transaction_id: Option<TransactionId>,
1112    /// Optional write tracker for conflict detection.
1113    write_tracker: Option<SharedWriteTracker>,
1114}
1115
1116impl RemoveLabelOperator {
1117    /// Creates a new remove label operator.
1118    pub fn new(
1119        store: Arc<dyn GraphStoreMut>,
1120        input: Box<dyn Operator>,
1121        node_column: usize,
1122        labels: Vec<String>,
1123        output_schema: Vec<LogicalType>,
1124    ) -> Self {
1125        let count_column = output_schema.len() - 1;
1126        Self {
1127            store,
1128            input,
1129            node_column,
1130            labels,
1131            count_column,
1132            output_schema,
1133            viewing_epoch: None,
1134            transaction_id: None,
1135            write_tracker: None,
1136        }
1137    }
1138
1139    /// Sets the transaction context for versioned label mutations.
1140    pub fn with_transaction_context(
1141        mut self,
1142        epoch: EpochId,
1143        transaction_id: Option<TransactionId>,
1144    ) -> Self {
1145        self.viewing_epoch = Some(epoch);
1146        self.transaction_id = transaction_id;
1147        self
1148    }
1149
1150    /// Sets the write tracker for conflict detection.
1151    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1152        self.write_tracker = Some(tracker);
1153        self
1154    }
1155}
1156
1157impl Operator for RemoveLabelOperator {
1158    fn next(&mut self) -> OperatorResult {
1159        if let Some(chunk) = self.input.next()? {
1160            let mut builder =
1161                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1162
1163            for row in chunk.selected_indices() {
1164                let node_val = chunk
1165                    .column(self.node_column)
1166                    .and_then(|c| c.get_value(row))
1167                    .ok_or_else(|| {
1168                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1169                    })?;
1170
1171                let node_id = match node_val {
1172                    Value::Int64(id) => NodeId(id as u64),
1173                    _ => {
1174                        return Err(OperatorError::TypeMismatch {
1175                            expected: "Int64 (node ID)".to_string(),
1176                            found: format!("{node_val:?}"),
1177                        });
1178                    }
1179                };
1180
1181                // Record write for conflict detection
1182                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1183                    tracker.record_node_write(tid, node_id)?;
1184                }
1185
1186                // Remove all labels
1187                let mut row_count: i64 = 0;
1188                for label in &self.labels {
1189                    let removed = if let Some(tid) = self.transaction_id {
1190                        self.store.remove_label_versioned(node_id, label, tid)
1191                    } else {
1192                        self.store.remove_label(node_id, label)
1193                    };
1194                    if removed {
1195                        row_count += 1;
1196                    }
1197                }
1198
1199                // Copy input columns to output (pass-through)
1200                for col_idx in 0..chunk.column_count() {
1201                    if let (Some(src), Some(dst)) =
1202                        (chunk.column(col_idx), builder.column_mut(col_idx))
1203                    {
1204                        if let Some(val) = src.get_value(row) {
1205                            dst.push_value(val);
1206                        } else {
1207                            dst.push_value(Value::Null);
1208                        }
1209                    }
1210                }
1211                // Append the update count column
1212                if let Some(dst) = builder.column_mut(self.count_column) {
1213                    dst.push_value(Value::Int64(row_count));
1214                }
1215
1216                builder.advance_row();
1217            }
1218
1219            return Ok(Some(builder.finish()));
1220        }
1221        Ok(None)
1222    }
1223
1224    fn reset(&mut self) {
1225        self.input.reset();
1226    }
1227
1228    fn name(&self) -> &'static str {
1229        "RemoveLabel"
1230    }
1231}
1232
1233/// Operator that sets properties on nodes or edges.
1234///
1235/// This operator reads node/edge IDs from a column and sets the
1236/// specified properties on each entity.
1237pub struct SetPropertyOperator {
1238    /// The graph store.
1239    store: Arc<dyn GraphStoreMut>,
1240    /// Child operator providing entities.
1241    input: Box<dyn Operator>,
1242    /// Column index containing entity IDs (node or edge).
1243    entity_column: usize,
1244    /// Whether the entity is an edge (false = node).
1245    is_edge: bool,
1246    /// Properties to set (name -> source).
1247    properties: Vec<(String, PropertySource)>,
1248    /// Output schema.
1249    output_schema: Vec<LogicalType>,
1250    /// Whether to replace all properties (true) or merge (false) for map assignments.
1251    replace: bool,
1252    /// Optional constraint validator for schema enforcement.
1253    validator: Option<Arc<dyn ConstraintValidator>>,
1254    /// Entity labels (for node constraint validation).
1255    labels: Vec<String>,
1256    /// Edge type (for edge constraint validation).
1257    edge_type_name: Option<String>,
1258    /// Epoch for MVCC versioning.
1259    viewing_epoch: Option<EpochId>,
1260    /// Transaction ID for undo log tracking.
1261    transaction_id: Option<TransactionId>,
1262    /// Optional write tracker for conflict detection.
1263    write_tracker: Option<SharedWriteTracker>,
1264}
1265
1266impl SetPropertyOperator {
1267    /// Creates a new set property operator for nodes.
1268    pub fn new_for_node(
1269        store: Arc<dyn GraphStoreMut>,
1270        input: Box<dyn Operator>,
1271        node_column: usize,
1272        properties: Vec<(String, PropertySource)>,
1273        output_schema: Vec<LogicalType>,
1274    ) -> Self {
1275        Self {
1276            store,
1277            input,
1278            entity_column: node_column,
1279            is_edge: false,
1280            properties,
1281            output_schema,
1282            replace: false,
1283            validator: None,
1284            labels: Vec::new(),
1285            edge_type_name: None,
1286            viewing_epoch: None,
1287            transaction_id: None,
1288            write_tracker: None,
1289        }
1290    }
1291
1292    /// Creates a new set property operator for edges.
1293    pub fn new_for_edge(
1294        store: Arc<dyn GraphStoreMut>,
1295        input: Box<dyn Operator>,
1296        edge_column: usize,
1297        properties: Vec<(String, PropertySource)>,
1298        output_schema: Vec<LogicalType>,
1299    ) -> Self {
1300        Self {
1301            store,
1302            input,
1303            entity_column: edge_column,
1304            is_edge: true,
1305            properties,
1306            output_schema,
1307            replace: false,
1308            validator: None,
1309            labels: Vec::new(),
1310            edge_type_name: None,
1311            viewing_epoch: None,
1312            transaction_id: None,
1313            write_tracker: None,
1314        }
1315    }
1316
1317    /// Sets whether this operator replaces all properties (for map assignment).
1318    pub fn with_replace(mut self, replace: bool) -> Self {
1319        self.replace = replace;
1320        self
1321    }
1322
1323    /// Sets the constraint validator for schema enforcement.
1324    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1325        self.validator = Some(validator);
1326        self
1327    }
1328
1329    /// Sets the entity labels (for node constraint validation).
1330    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1331        self.labels = labels;
1332        self
1333    }
1334
1335    /// Sets the edge type name (for edge constraint validation).
1336    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1337        self.edge_type_name = Some(edge_type);
1338        self
1339    }
1340
1341    /// Sets the transaction context for versioned property mutations.
1342    ///
1343    /// When a transaction ID is provided, property changes are recorded in
1344    /// an undo log so they can be restored on rollback.
1345    pub fn with_transaction_context(
1346        mut self,
1347        epoch: EpochId,
1348        transaction_id: Option<TransactionId>,
1349    ) -> Self {
1350        self.viewing_epoch = Some(epoch);
1351        self.transaction_id = transaction_id;
1352        self
1353    }
1354
1355    /// Sets the write tracker for conflict detection.
1356    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1357        self.write_tracker = Some(tracker);
1358        self
1359    }
1360}
1361
1362impl Operator for SetPropertyOperator {
1363    fn next(&mut self) -> OperatorResult {
1364        if let Some(chunk) = self.input.next()? {
1365            let mut builder =
1366                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1367
1368            for row in chunk.selected_indices() {
1369                let entity_val = chunk
1370                    .column(self.entity_column)
1371                    .and_then(|c| c.get_value(row))
1372                    .ok_or_else(|| {
1373                        OperatorError::ColumnNotFound(format!(
1374                            "entity column {}",
1375                            self.entity_column
1376                        ))
1377                    })?;
1378
1379                let entity_id = match entity_val {
1380                    Value::Int64(id) => id as u64,
1381                    _ => {
1382                        return Err(OperatorError::TypeMismatch {
1383                            expected: "Int64 (entity ID)".to_string(),
1384                            found: format!("{entity_val:?}"),
1385                        });
1386                    }
1387                };
1388
1389                // Record write for conflict detection
1390                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1391                    if self.is_edge {
1392                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1393                    } else {
1394                        tracker.record_node_write(tid, NodeId(entity_id))?;
1395                    }
1396                }
1397
1398                // Resolve all property values
1399                let resolved_props: Vec<(String, Value)> = self
1400                    .properties
1401                    .iter()
1402                    .map(|(name, source)| {
1403                        let value =
1404                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1405                        (name.clone(), value)
1406                    })
1407                    .collect();
1408
1409                // Validate constraints before writing
1410                if let Some(ref validator) = self.validator {
1411                    if self.is_edge {
1412                        if let Some(ref et) = self.edge_type_name {
1413                            for (name, value) in &resolved_props {
1414                                validator.validate_edge_property(et, name, value)?;
1415                            }
1416                        }
1417                    } else {
1418                        for (name, value) in &resolved_props {
1419                            validator.validate_node_property(&self.labels, name, value)?;
1420                            validator.check_unique_node_property(&self.labels, name, value)?;
1421                        }
1422                    }
1423                }
1424
1425                // Write all properties (use versioned methods when inside a transaction)
1426                let tx_id = self.transaction_id;
1427                for (prop_name, value) in resolved_props {
1428                    if prop_name == "*" {
1429                        // Map assignment: value should be a Map
1430                        if let Value::Map(map) = value {
1431                            if self.replace {
1432                                // Replace: remove all existing properties first
1433                                if self.is_edge {
1434                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1435                                        let keys: Vec<String> = edge
1436                                            .properties
1437                                            .iter()
1438                                            .map(|(k, _)| k.as_str().to_string())
1439                                            .collect();
1440                                        for key in keys {
1441                                            if let Some(tid) = tx_id {
1442                                                self.store.remove_edge_property_versioned(
1443                                                    EdgeId(entity_id),
1444                                                    &key,
1445                                                    tid,
1446                                                );
1447                                            } else {
1448                                                self.store
1449                                                    .remove_edge_property(EdgeId(entity_id), &key);
1450                                            }
1451                                        }
1452                                    }
1453                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1454                                    let keys: Vec<String> = node
1455                                        .properties
1456                                        .iter()
1457                                        .map(|(k, _)| k.as_str().to_string())
1458                                        .collect();
1459                                    for key in keys {
1460                                        if let Some(tid) = tx_id {
1461                                            self.store.remove_node_property_versioned(
1462                                                NodeId(entity_id),
1463                                                &key,
1464                                                tid,
1465                                            );
1466                                        } else {
1467                                            self.store
1468                                                .remove_node_property(NodeId(entity_id), &key);
1469                                        }
1470                                    }
1471                                }
1472                            }
1473                            // Set each map entry (null values remove the property)
1474                            for (key, val) in map.iter() {
1475                                if val.is_null() {
1476                                    // Null in SET += removes the property (Cypher/GQL semantics)
1477                                    if self.is_edge {
1478                                        if let Some(tid) = tx_id {
1479                                            self.store.remove_edge_property_versioned(
1480                                                EdgeId(entity_id),
1481                                                key.as_str(),
1482                                                tid,
1483                                            );
1484                                        } else {
1485                                            self.store.remove_edge_property(
1486                                                EdgeId(entity_id),
1487                                                key.as_str(),
1488                                            );
1489                                        }
1490                                    } else if let Some(tid) = tx_id {
1491                                        self.store.remove_node_property_versioned(
1492                                            NodeId(entity_id),
1493                                            key.as_str(),
1494                                            tid,
1495                                        );
1496                                    } else {
1497                                        self.store
1498                                            .remove_node_property(NodeId(entity_id), key.as_str());
1499                                    }
1500                                } else if self.is_edge {
1501                                    if let Some(tid) = tx_id {
1502                                        self.store.set_edge_property_versioned(
1503                                            EdgeId(entity_id),
1504                                            key.as_str(),
1505                                            val.clone(),
1506                                            tid,
1507                                        );
1508                                    } else {
1509                                        self.store.set_edge_property(
1510                                            EdgeId(entity_id),
1511                                            key.as_str(),
1512                                            val.clone(),
1513                                        );
1514                                    }
1515                                } else if let Some(tid) = tx_id {
1516                                    self.store.set_node_property_versioned(
1517                                        NodeId(entity_id),
1518                                        key.as_str(),
1519                                        val.clone(),
1520                                        tid,
1521                                    );
1522                                } else {
1523                                    self.store.set_node_property(
1524                                        NodeId(entity_id),
1525                                        key.as_str(),
1526                                        val.clone(),
1527                                    );
1528                                }
1529                            }
1530                        }
1531                    } else if self.is_edge {
1532                        if let Some(tid) = tx_id {
1533                            self.store.set_edge_property_versioned(
1534                                EdgeId(entity_id),
1535                                &prop_name,
1536                                value,
1537                                tid,
1538                            );
1539                        } else {
1540                            self.store
1541                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1542                        }
1543                    } else if let Some(tid) = tx_id {
1544                        self.store.set_node_property_versioned(
1545                            NodeId(entity_id),
1546                            &prop_name,
1547                            value,
1548                            tid,
1549                        );
1550                    } else {
1551                        self.store
1552                            .set_node_property(NodeId(entity_id), &prop_name, value);
1553                    }
1554                }
1555
1556                // Copy input columns to output
1557                for col_idx in 0..chunk.column_count() {
1558                    if let (Some(src), Some(dst)) =
1559                        (chunk.column(col_idx), builder.column_mut(col_idx))
1560                    {
1561                        if let Some(val) = src.get_value(row) {
1562                            dst.push_value(val);
1563                        } else {
1564                            dst.push_value(Value::Null);
1565                        }
1566                    }
1567                }
1568
1569                builder.advance_row();
1570            }
1571
1572            return Ok(Some(builder.finish()));
1573        }
1574        Ok(None)
1575    }
1576
1577    fn reset(&mut self) {
1578        self.input.reset();
1579    }
1580
1581    fn name(&self) -> &'static str {
1582        "SetProperty"
1583    }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588    use super::*;
1589    use crate::execution::DataChunk;
1590    use crate::execution::chunk::DataChunkBuilder;
1591    use crate::graph::lpg::LpgStore;
1592
1593    // ── Helpers ────────────────────────────────────────────────────
1594
1595    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1596        Arc::new(LpgStore::new().unwrap())
1597    }
1598
1599    struct MockInput {
1600        chunk: Option<DataChunk>,
1601    }
1602
1603    impl MockInput {
1604        fn boxed(chunk: DataChunk) -> Box<Self> {
1605            Box::new(Self { chunk: Some(chunk) })
1606        }
1607    }
1608
1609    impl Operator for MockInput {
1610        fn next(&mut self) -> OperatorResult {
1611            Ok(self.chunk.take())
1612        }
1613        fn reset(&mut self) {}
1614        fn name(&self) -> &'static str {
1615            "MockInput"
1616        }
1617    }
1618
1619    struct EmptyInput;
1620    impl Operator for EmptyInput {
1621        fn next(&mut self) -> OperatorResult {
1622            Ok(None)
1623        }
1624        fn reset(&mut self) {}
1625        fn name(&self) -> &'static str {
1626            "EmptyInput"
1627        }
1628    }
1629
1630    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1631        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1632        for id in ids {
1633            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1634            builder.advance_row();
1635        }
1636        builder.finish()
1637    }
1638
1639    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1640        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1641        for id in ids {
1642            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1643            builder.advance_row();
1644        }
1645        builder.finish()
1646    }
1647
1648    // ── CreateNodeOperator ──────────────────────────────────────
1649
1650    #[test]
1651    fn test_create_node_standalone() {
1652        let store = create_test_store();
1653
1654        let mut op = CreateNodeOperator::new(
1655            Arc::clone(&store),
1656            None,
1657            vec!["Person".to_string()],
1658            vec![(
1659                "name".to_string(),
1660                PropertySource::Constant(Value::String("Alix".into())),
1661            )],
1662            vec![LogicalType::Int64],
1663            0,
1664        );
1665
1666        let chunk = op.next().unwrap().unwrap();
1667        assert_eq!(chunk.row_count(), 1);
1668
1669        // Second call should return None (standalone executes once)
1670        assert!(op.next().unwrap().is_none());
1671
1672        assert_eq!(store.node_count(), 1);
1673    }
1674
1675    #[test]
1676    fn test_create_edge() {
1677        let store = create_test_store();
1678
1679        let node1 = store.create_node(&["Person"]);
1680        let node2 = store.create_node(&["Person"]);
1681
1682        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1683        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1684        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1685        builder.advance_row();
1686
1687        let mut op = CreateEdgeOperator::new(
1688            Arc::clone(&store),
1689            MockInput::boxed(builder.finish()),
1690            0,
1691            1,
1692            "KNOWS".to_string(),
1693            vec![LogicalType::Int64, LogicalType::Int64],
1694        );
1695
1696        let _chunk = op.next().unwrap().unwrap();
1697        assert_eq!(store.edge_count(), 1);
1698    }
1699
1700    #[test]
1701    fn test_delete_node() {
1702        let store = create_test_store();
1703
1704        let node_id = store.create_node(&["Person"]);
1705        assert_eq!(store.node_count(), 1);
1706
1707        let mut op = DeleteNodeOperator::new(
1708            Arc::clone(&store),
1709            MockInput::boxed(node_id_chunk(&[node_id])),
1710            0,
1711            vec![LogicalType::Node],
1712            false,
1713        );
1714
1715        let chunk = op.next().unwrap().unwrap();
1716        // Pass-through: output row contains the original node ID
1717        assert_eq!(chunk.row_count(), 1);
1718        assert_eq!(store.node_count(), 0);
1719    }
1720
1721    // ── DeleteEdgeOperator ───────────────────────────────────────
1722
1723    #[test]
1724    fn test_delete_edge() {
1725        let store = create_test_store();
1726
1727        let n1 = store.create_node(&["Person"]);
1728        let n2 = store.create_node(&["Person"]);
1729        let eid = store.create_edge(n1, n2, "KNOWS");
1730        assert_eq!(store.edge_count(), 1);
1731
1732        let mut op = DeleteEdgeOperator::new(
1733            Arc::clone(&store),
1734            MockInput::boxed(edge_id_chunk(&[eid])),
1735            0,
1736            vec![LogicalType::Node],
1737        );
1738
1739        let chunk = op.next().unwrap().unwrap();
1740        assert_eq!(chunk.row_count(), 1);
1741        assert_eq!(store.edge_count(), 0);
1742    }
1743
1744    #[test]
1745    fn test_delete_edge_no_input_returns_none() {
1746        let store = create_test_store();
1747
1748        let mut op = DeleteEdgeOperator::new(
1749            Arc::clone(&store),
1750            Box::new(EmptyInput),
1751            0,
1752            vec![LogicalType::Int64],
1753        );
1754
1755        assert!(op.next().unwrap().is_none());
1756    }
1757
1758    #[test]
1759    fn test_delete_multiple_edges() {
1760        let store = create_test_store();
1761
1762        let n1 = store.create_node(&["N"]);
1763        let n2 = store.create_node(&["N"]);
1764        let e1 = store.create_edge(n1, n2, "R");
1765        let e2 = store.create_edge(n2, n1, "S");
1766        assert_eq!(store.edge_count(), 2);
1767
1768        let mut op = DeleteEdgeOperator::new(
1769            Arc::clone(&store),
1770            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1771            0,
1772            vec![LogicalType::Node],
1773        );
1774
1775        let chunk = op.next().unwrap().unwrap();
1776        assert_eq!(chunk.row_count(), 2);
1777        assert_eq!(store.edge_count(), 0);
1778    }
1779
1780    // ── DeleteNodeOperator with DETACH ───────────────────────────
1781
1782    #[test]
1783    fn test_delete_node_detach() {
1784        let store = create_test_store();
1785
1786        let n1 = store.create_node(&["Person"]);
1787        let n2 = store.create_node(&["Person"]);
1788        store.create_edge(n1, n2, "KNOWS");
1789        store.create_edge(n2, n1, "FOLLOWS");
1790        assert_eq!(store.edge_count(), 2);
1791
1792        let mut op = DeleteNodeOperator::new(
1793            Arc::clone(&store),
1794            MockInput::boxed(node_id_chunk(&[n1])),
1795            0,
1796            vec![LogicalType::Node],
1797            true, // detach = true
1798        );
1799
1800        let chunk = op.next().unwrap().unwrap();
1801        assert_eq!(chunk.row_count(), 1);
1802        assert_eq!(store.node_count(), 1);
1803        assert_eq!(store.edge_count(), 0); // edges detached
1804    }
1805
1806    // ── AddLabelOperator ─────────────────────────────────────────
1807
1808    #[test]
1809    fn test_add_label() {
1810        let store = create_test_store();
1811
1812        let node = store.create_node(&["Person"]);
1813
1814        let mut op = AddLabelOperator::new(
1815            Arc::clone(&store),
1816            MockInput::boxed(node_id_chunk(&[node])),
1817            0,
1818            vec!["Employee".to_string()],
1819            vec![LogicalType::Int64, LogicalType::Int64],
1820        );
1821
1822        let chunk = op.next().unwrap().unwrap();
1823        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1824        assert_eq!(updated, 1);
1825
1826        // Verify label was added
1827        let node_data = store.get_node(node).unwrap();
1828        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1829        assert!(labels.contains(&"Person"));
1830        assert!(labels.contains(&"Employee"));
1831    }
1832
1833    #[test]
1834    fn test_add_multiple_labels() {
1835        let store = create_test_store();
1836
1837        let node = store.create_node(&["Base"]);
1838
1839        let mut op = AddLabelOperator::new(
1840            Arc::clone(&store),
1841            MockInput::boxed(node_id_chunk(&[node])),
1842            0,
1843            vec!["LabelA".to_string(), "LabelB".to_string()],
1844            vec![LogicalType::Int64, LogicalType::Int64],
1845        );
1846
1847        let chunk = op.next().unwrap().unwrap();
1848        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1849        assert_eq!(updated, 2); // 2 labels added
1850
1851        let node_data = store.get_node(node).unwrap();
1852        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1853        assert!(labels.contains(&"LabelA"));
1854        assert!(labels.contains(&"LabelB"));
1855    }
1856
1857    #[test]
1858    fn test_add_label_no_input_returns_none() {
1859        let store = create_test_store();
1860
1861        let mut op = AddLabelOperator::new(
1862            Arc::clone(&store),
1863            Box::new(EmptyInput),
1864            0,
1865            vec!["Foo".to_string()],
1866            vec![LogicalType::Int64, LogicalType::Int64],
1867        );
1868
1869        assert!(op.next().unwrap().is_none());
1870    }
1871
1872    // ── RemoveLabelOperator ──────────────────────────────────────
1873
1874    #[test]
1875    fn test_remove_label() {
1876        let store = create_test_store();
1877
1878        let node = store.create_node(&["Person", "Employee"]);
1879
1880        let mut op = RemoveLabelOperator::new(
1881            Arc::clone(&store),
1882            MockInput::boxed(node_id_chunk(&[node])),
1883            0,
1884            vec!["Employee".to_string()],
1885            vec![LogicalType::Int64, LogicalType::Int64],
1886        );
1887
1888        let chunk = op.next().unwrap().unwrap();
1889        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1890        assert_eq!(updated, 1);
1891
1892        // Verify label was removed
1893        let node_data = store.get_node(node).unwrap();
1894        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1895        assert!(labels.contains(&"Person"));
1896        assert!(!labels.contains(&"Employee"));
1897    }
1898
1899    #[test]
1900    fn test_remove_nonexistent_label() {
1901        let store = create_test_store();
1902
1903        let node = store.create_node(&["Person"]);
1904
1905        let mut op = RemoveLabelOperator::new(
1906            Arc::clone(&store),
1907            MockInput::boxed(node_id_chunk(&[node])),
1908            0,
1909            vec!["NonExistent".to_string()],
1910            vec![LogicalType::Int64, LogicalType::Int64],
1911        );
1912
1913        let chunk = op.next().unwrap().unwrap();
1914        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1915        assert_eq!(updated, 0); // nothing removed
1916    }
1917
1918    // ── SetPropertyOperator ──────────────────────────────────────
1919
1920    #[test]
1921    fn test_set_node_property_constant() {
1922        let store = create_test_store();
1923
1924        let node = store.create_node(&["Person"]);
1925
1926        let mut op = SetPropertyOperator::new_for_node(
1927            Arc::clone(&store),
1928            MockInput::boxed(node_id_chunk(&[node])),
1929            0,
1930            vec![(
1931                "name".to_string(),
1932                PropertySource::Constant(Value::String("Alix".into())),
1933            )],
1934            vec![LogicalType::Int64],
1935        );
1936
1937        let chunk = op.next().unwrap().unwrap();
1938        assert_eq!(chunk.row_count(), 1);
1939
1940        // Verify property was set
1941        let node_data = store.get_node(node).unwrap();
1942        assert_eq!(
1943            node_data
1944                .properties
1945                .get(&grafeo_common::types::PropertyKey::new("name")),
1946            Some(&Value::String("Alix".into()))
1947        );
1948    }
1949
1950    #[test]
1951    fn test_set_node_property_from_column() {
1952        let store = create_test_store();
1953
1954        let node = store.create_node(&["Person"]);
1955
1956        // Input: column 0 = node ID, column 1 = property value
1957        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1958        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1959        builder
1960            .column_mut(1)
1961            .unwrap()
1962            .push_value(Value::String("Gus".into()));
1963        builder.advance_row();
1964
1965        let mut op = SetPropertyOperator::new_for_node(
1966            Arc::clone(&store),
1967            MockInput::boxed(builder.finish()),
1968            0,
1969            vec![("name".to_string(), PropertySource::Column(1))],
1970            vec![LogicalType::Int64, LogicalType::String],
1971        );
1972
1973        let chunk = op.next().unwrap().unwrap();
1974        assert_eq!(chunk.row_count(), 1);
1975
1976        let node_data = store.get_node(node).unwrap();
1977        assert_eq!(
1978            node_data
1979                .properties
1980                .get(&grafeo_common::types::PropertyKey::new("name")),
1981            Some(&Value::String("Gus".into()))
1982        );
1983    }
1984
1985    #[test]
1986    fn test_set_edge_property() {
1987        let store = create_test_store();
1988
1989        let n1 = store.create_node(&["N"]);
1990        let n2 = store.create_node(&["N"]);
1991        let eid = store.create_edge(n1, n2, "KNOWS");
1992
1993        let mut op = SetPropertyOperator::new_for_edge(
1994            Arc::clone(&store),
1995            MockInput::boxed(edge_id_chunk(&[eid])),
1996            0,
1997            vec![(
1998                "weight".to_string(),
1999                PropertySource::Constant(Value::Float64(0.75)),
2000            )],
2001            vec![LogicalType::Int64],
2002        );
2003
2004        let chunk = op.next().unwrap().unwrap();
2005        assert_eq!(chunk.row_count(), 1);
2006
2007        let edge_data = store.get_edge(eid).unwrap();
2008        assert_eq!(
2009            edge_data
2010                .properties
2011                .get(&grafeo_common::types::PropertyKey::new("weight")),
2012            Some(&Value::Float64(0.75))
2013        );
2014    }
2015
2016    #[test]
2017    fn test_set_multiple_properties() {
2018        let store = create_test_store();
2019
2020        let node = store.create_node(&["Person"]);
2021
2022        let mut op = SetPropertyOperator::new_for_node(
2023            Arc::clone(&store),
2024            MockInput::boxed(node_id_chunk(&[node])),
2025            0,
2026            vec![
2027                (
2028                    "name".to_string(),
2029                    PropertySource::Constant(Value::String("Alix".into())),
2030                ),
2031                (
2032                    "age".to_string(),
2033                    PropertySource::Constant(Value::Int64(30)),
2034                ),
2035            ],
2036            vec![LogicalType::Int64],
2037        );
2038
2039        op.next().unwrap().unwrap();
2040
2041        let node_data = store.get_node(node).unwrap();
2042        assert_eq!(
2043            node_data
2044                .properties
2045                .get(&grafeo_common::types::PropertyKey::new("name")),
2046            Some(&Value::String("Alix".into()))
2047        );
2048        assert_eq!(
2049            node_data
2050                .properties
2051                .get(&grafeo_common::types::PropertyKey::new("age")),
2052            Some(&Value::Int64(30))
2053        );
2054    }
2055
2056    #[test]
2057    fn test_set_property_no_input_returns_none() {
2058        let store = create_test_store();
2059
2060        let mut op = SetPropertyOperator::new_for_node(
2061            Arc::clone(&store),
2062            Box::new(EmptyInput),
2063            0,
2064            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2065            vec![LogicalType::Int64],
2066        );
2067
2068        assert!(op.next().unwrap().is_none());
2069    }
2070
2071    // ── Error paths ──────────────────────────────────────────────
2072
2073    #[test]
2074    fn test_delete_node_without_detach_errors_when_edges_exist() {
2075        let store = create_test_store();
2076
2077        let n1 = store.create_node(&["Person"]);
2078        let n2 = store.create_node(&["Person"]);
2079        store.create_edge(n1, n2, "KNOWS");
2080
2081        let mut op = DeleteNodeOperator::new(
2082            Arc::clone(&store),
2083            MockInput::boxed(node_id_chunk(&[n1])),
2084            0,
2085            vec![LogicalType::Int64],
2086            false, // no detach
2087        );
2088
2089        let err = op.next().unwrap_err();
2090        match err {
2091            OperatorError::ConstraintViolation(msg) => {
2092                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2093            }
2094            other => panic!("expected ConstraintViolation, got {other:?}"),
2095        }
2096        // Node should still exist
2097        assert_eq!(store.node_count(), 2);
2098    }
2099
2100    // ── CreateNodeOperator with input ───────────────────────────
2101
2102    #[test]
2103    fn test_create_node_with_input_operator() {
2104        let store = create_test_store();
2105
2106        // Seed node to provide input rows
2107        let existing = store.create_node(&["Seed"]);
2108
2109        let mut op = CreateNodeOperator::new(
2110            Arc::clone(&store),
2111            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2112            vec!["Created".to_string()],
2113            vec![(
2114                "source".to_string(),
2115                PropertySource::Constant(Value::String("from_input".into())),
2116            )],
2117            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2118            1,                                            // output column for new node ID
2119        );
2120
2121        let chunk = op.next().unwrap().unwrap();
2122        assert_eq!(chunk.row_count(), 1);
2123
2124        // Should have created one new node (2 total: Seed + Created)
2125        assert_eq!(store.node_count(), 2);
2126
2127        // Exhausted
2128        assert!(op.next().unwrap().is_none());
2129    }
2130
2131    // ── CreateEdgeOperator with properties and output column ────
2132
2133    #[test]
2134    fn test_create_edge_with_properties_and_output_column() {
2135        let store = create_test_store();
2136
2137        let n1 = store.create_node(&["Person"]);
2138        let n2 = store.create_node(&["Person"]);
2139
2140        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2141        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2142        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2143        builder.advance_row();
2144
2145        let mut op = CreateEdgeOperator::new(
2146            Arc::clone(&store),
2147            MockInput::boxed(builder.finish()),
2148            0,
2149            1,
2150            "KNOWS".to_string(),
2151            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2152        )
2153        .with_properties(vec![(
2154            "since".to_string(),
2155            PropertySource::Constant(Value::Int64(2024)),
2156        )])
2157        .with_output_column(2);
2158
2159        let chunk = op.next().unwrap().unwrap();
2160        assert_eq!(chunk.row_count(), 1);
2161        assert_eq!(store.edge_count(), 1);
2162
2163        // Verify the output chunk contains the edge ID in column 2
2164        let edge_id_raw = chunk
2165            .column(2)
2166            .and_then(|c| c.get_int64(0))
2167            .expect("edge ID should be in output column 2");
2168        let edge_id = EdgeId(edge_id_raw as u64);
2169
2170        // Verify the edge has the property
2171        let edge = store.get_edge(edge_id).expect("edge should exist");
2172        assert_eq!(
2173            edge.properties
2174                .get(&grafeo_common::types::PropertyKey::new("since")),
2175            Some(&Value::Int64(2024))
2176        );
2177    }
2178
2179    // ── SetPropertyOperator with map replacement ────────────────
2180
2181    #[test]
2182    fn test_set_property_map_replace() {
2183        use std::collections::BTreeMap;
2184
2185        let store = create_test_store();
2186
2187        let node = store.create_node(&["Person"]);
2188        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2189
2190        let mut map = BTreeMap::new();
2191        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2192
2193        let mut op = SetPropertyOperator::new_for_node(
2194            Arc::clone(&store),
2195            MockInput::boxed(node_id_chunk(&[node])),
2196            0,
2197            vec![(
2198                "*".to_string(),
2199                PropertySource::Constant(Value::Map(Arc::new(map))),
2200            )],
2201            vec![LogicalType::Int64],
2202        )
2203        .with_replace(true);
2204
2205        op.next().unwrap().unwrap();
2206
2207        let node_data = store.get_node(node).unwrap();
2208        // Old property should be gone
2209        assert!(
2210            node_data
2211                .properties
2212                .get(&PropertyKey::new("old_prop"))
2213                .is_none()
2214        );
2215        // New property should exist
2216        assert_eq!(
2217            node_data.properties.get(&PropertyKey::new("new_key")),
2218            Some(&Value::String("new_val".into()))
2219        );
2220    }
2221
2222    // ── SetPropertyOperator with map merge (no replace) ─────────
2223
2224    #[test]
2225    fn test_set_property_map_merge() {
2226        use std::collections::BTreeMap;
2227
2228        let store = create_test_store();
2229
2230        let node = store.create_node(&["Person"]);
2231        store.set_node_property(node, "existing", Value::Int64(42));
2232
2233        let mut map = BTreeMap::new();
2234        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2235
2236        let mut op = SetPropertyOperator::new_for_node(
2237            Arc::clone(&store),
2238            MockInput::boxed(node_id_chunk(&[node])),
2239            0,
2240            vec![(
2241                "*".to_string(),
2242                PropertySource::Constant(Value::Map(Arc::new(map))),
2243            )],
2244            vec![LogicalType::Int64],
2245        ); // replace defaults to false
2246
2247        op.next().unwrap().unwrap();
2248
2249        let node_data = store.get_node(node).unwrap();
2250        // Existing property should still be there
2251        assert_eq!(
2252            node_data.properties.get(&PropertyKey::new("existing")),
2253            Some(&Value::Int64(42))
2254        );
2255        // New property should also exist
2256        assert_eq!(
2257            node_data.properties.get(&PropertyKey::new("added")),
2258            Some(&Value::String("hello".into()))
2259        );
2260    }
2261
2262    // ── PropertySource::PropertyAccess ──────────────────────────
2263
2264    #[test]
2265    fn test_property_source_property_access() {
2266        let store = create_test_store();
2267
2268        let source_node = store.create_node(&["Source"]);
2269        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2270
2271        let target_node = store.create_node(&["Target"]);
2272
2273        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2274        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2275        builder.column_mut(0).unwrap().push_node_id(source_node);
2276        builder
2277            .column_mut(1)
2278            .unwrap()
2279            .push_int64(target_node.0 as i64);
2280        builder.advance_row();
2281
2282        let mut op = SetPropertyOperator::new_for_node(
2283            Arc::clone(&store),
2284            MockInput::boxed(builder.finish()),
2285            1, // entity column = target node
2286            vec![(
2287                "copied_name".to_string(),
2288                PropertySource::PropertyAccess {
2289                    column: 0,
2290                    property: "name".to_string(),
2291                },
2292            )],
2293            vec![LogicalType::Node, LogicalType::Int64],
2294        );
2295
2296        op.next().unwrap().unwrap();
2297
2298        let target_data = store.get_node(target_node).unwrap();
2299        assert_eq!(
2300            target_data.properties.get(&PropertyKey::new("copied_name")),
2301            Some(&Value::String("Alix".into()))
2302        );
2303    }
2304
2305    // ── ConstraintValidator integration ─────────────────────────
2306
2307    #[test]
2308    fn test_create_node_with_constraint_validator() {
2309        let store = create_test_store();
2310
2311        struct RejectAgeValidator;
2312        impl ConstraintValidator for RejectAgeValidator {
2313            fn validate_node_property(
2314                &self,
2315                _labels: &[String],
2316                key: &str,
2317                _value: &Value,
2318            ) -> Result<(), OperatorError> {
2319                if key == "forbidden" {
2320                    return Err(OperatorError::ConstraintViolation(
2321                        "property 'forbidden' is not allowed".to_string(),
2322                    ));
2323                }
2324                Ok(())
2325            }
2326            fn validate_node_complete(
2327                &self,
2328                _labels: &[String],
2329                _properties: &[(String, Value)],
2330            ) -> Result<(), OperatorError> {
2331                Ok(())
2332            }
2333            fn check_unique_node_property(
2334                &self,
2335                _labels: &[String],
2336                _key: &str,
2337                _value: &Value,
2338            ) -> Result<(), OperatorError> {
2339                Ok(())
2340            }
2341            fn validate_edge_property(
2342                &self,
2343                _edge_type: &str,
2344                _key: &str,
2345                _value: &Value,
2346            ) -> Result<(), OperatorError> {
2347                Ok(())
2348            }
2349            fn validate_edge_complete(
2350                &self,
2351                _edge_type: &str,
2352                _properties: &[(String, Value)],
2353            ) -> Result<(), OperatorError> {
2354                Ok(())
2355            }
2356        }
2357
2358        // Valid property should succeed
2359        let mut op = CreateNodeOperator::new(
2360            Arc::clone(&store),
2361            None,
2362            vec!["Thing".to_string()],
2363            vec![(
2364                "name".to_string(),
2365                PropertySource::Constant(Value::String("ok".into())),
2366            )],
2367            vec![LogicalType::Int64],
2368            0,
2369        )
2370        .with_validator(Arc::new(RejectAgeValidator));
2371
2372        assert!(op.next().is_ok());
2373        assert_eq!(store.node_count(), 1);
2374
2375        // Forbidden property should fail
2376        let mut op = CreateNodeOperator::new(
2377            Arc::clone(&store),
2378            None,
2379            vec!["Thing".to_string()],
2380            vec![(
2381                "forbidden".to_string(),
2382                PropertySource::Constant(Value::Int64(1)),
2383            )],
2384            vec![LogicalType::Int64],
2385            0,
2386        )
2387        .with_validator(Arc::new(RejectAgeValidator));
2388
2389        let err = op.next().unwrap_err();
2390        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2391        // Node count should still be 2 (the node is created before validation, but the error
2392        // propagates - this tests the validation logic fires)
2393    }
2394
2395    // ── Reset behavior ──────────────────────────────────────────
2396
2397    #[test]
2398    fn test_create_node_reset_allows_re_execution() {
2399        let store = create_test_store();
2400
2401        let mut op = CreateNodeOperator::new(
2402            Arc::clone(&store),
2403            None,
2404            vec!["Person".to_string()],
2405            vec![],
2406            vec![LogicalType::Int64],
2407            0,
2408        );
2409
2410        // First execution
2411        assert!(op.next().unwrap().is_some());
2412        assert!(op.next().unwrap().is_none());
2413
2414        // Reset and re-execute
2415        op.reset();
2416        assert!(op.next().unwrap().is_some());
2417
2418        assert_eq!(store.node_count(), 2);
2419    }
2420
2421    // ── Operator name() ──────────────────────────────────────────
2422
2423    #[test]
2424    fn test_operator_names() {
2425        let store = create_test_store();
2426
2427        let op = CreateNodeOperator::new(
2428            Arc::clone(&store),
2429            None,
2430            vec![],
2431            vec![],
2432            vec![LogicalType::Int64],
2433            0,
2434        );
2435        assert_eq!(op.name(), "CreateNode");
2436
2437        let op = CreateEdgeOperator::new(
2438            Arc::clone(&store),
2439            Box::new(EmptyInput),
2440            0,
2441            1,
2442            "R".to_string(),
2443            vec![LogicalType::Int64],
2444        );
2445        assert_eq!(op.name(), "CreateEdge");
2446
2447        let op = DeleteNodeOperator::new(
2448            Arc::clone(&store),
2449            Box::new(EmptyInput),
2450            0,
2451            vec![LogicalType::Int64],
2452            false,
2453        );
2454        assert_eq!(op.name(), "DeleteNode");
2455
2456        let op = DeleteEdgeOperator::new(
2457            Arc::clone(&store),
2458            Box::new(EmptyInput),
2459            0,
2460            vec![LogicalType::Int64],
2461        );
2462        assert_eq!(op.name(), "DeleteEdge");
2463
2464        let op = AddLabelOperator::new(
2465            Arc::clone(&store),
2466            Box::new(EmptyInput),
2467            0,
2468            vec!["L".to_string()],
2469            vec![LogicalType::Int64],
2470        );
2471        assert_eq!(op.name(), "AddLabel");
2472
2473        let op = RemoveLabelOperator::new(
2474            Arc::clone(&store),
2475            Box::new(EmptyInput),
2476            0,
2477            vec!["L".to_string()],
2478            vec![LogicalType::Int64],
2479        );
2480        assert_eq!(op.name(), "RemoveLabel");
2481
2482        let op = SetPropertyOperator::new_for_node(
2483            Arc::clone(&store),
2484            Box::new(EmptyInput),
2485            0,
2486            vec![],
2487            vec![LogicalType::Int64],
2488        );
2489        assert_eq!(op.name(), "SetProperty");
2490    }
2491}