Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    fn validate_node_property(
28        &self,
29        labels: &[String],
30        key: &str,
31        value: &Value,
32    ) -> Result<(), OperatorError>;
33
34    /// Validates that all required properties are present after creating a node.
35    ///
36    /// Checks NOT NULL constraints for properties that were not explicitly set.
37    fn validate_node_complete(
38        &self,
39        labels: &[String],
40        properties: &[(String, Value)],
41    ) -> Result<(), OperatorError>;
42
43    /// Checks UNIQUE constraint for a node property value.
44    ///
45    /// Returns an error if a node with the same label already has this value.
46    fn check_unique_node_property(
47        &self,
48        labels: &[String],
49        key: &str,
50        value: &Value,
51    ) -> Result<(), OperatorError>;
52
53    /// Validates a single property value for an edge of the given type.
54    fn validate_edge_property(
55        &self,
56        edge_type: &str,
57        key: &str,
58        value: &Value,
59    ) -> Result<(), OperatorError>;
60
61    /// Validates that all required properties are present after creating an edge.
62    fn validate_edge_complete(
63        &self,
64        edge_type: &str,
65        properties: &[(String, Value)],
66    ) -> Result<(), OperatorError>;
67
68    /// Validates that the node labels are allowed by the bound graph type.
69    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70        let _ = labels;
71        Ok(())
72    }
73
74    /// Validates that the edge type is allowed by the bound graph type.
75    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76        let _ = edge_type;
77        Ok(())
78    }
79
80    /// Validates that edge endpoints have the correct node type labels.
81    fn validate_edge_endpoints(
82        &self,
83        edge_type: &str,
84        source_labels: &[String],
85        target_labels: &[String],
86    ) -> Result<(), OperatorError> {
87        let _ = (edge_type, source_labels, target_labels);
88        Ok(())
89    }
90
91    /// Injects default values for properties that are defined in a type but
92    /// not explicitly provided.
93    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94        let _ = (labels, properties);
95    }
96}
97
98/// Operator that creates new nodes.
99///
100/// For each input row, creates a new node with the specified labels
101/// and properties, then outputs the row with the new node.
102pub struct CreateNodeOperator {
103    /// The graph store to modify.
104    store: Arc<dyn GraphStoreMut>,
105    /// Input operator.
106    input: Option<Box<dyn Operator>>,
107    /// Labels for the new nodes.
108    labels: Vec<String>,
109    /// Properties to set (name -> column index or constant value).
110    properties: Vec<(String, PropertySource)>,
111    /// Output schema.
112    output_schema: Vec<LogicalType>,
113    /// Column index for the created node variable.
114    output_column: usize,
115    /// Whether this operator has been executed (for no-input case).
116    executed: bool,
117    /// Epoch for MVCC versioning.
118    viewing_epoch: Option<EpochId>,
119    /// Transaction ID for MVCC versioning.
120    transaction_id: Option<TransactionId>,
121    /// Optional constraint validator for schema enforcement.
122    validator: Option<Arc<dyn ConstraintValidator>>,
123    /// Optional write tracker for conflict detection.
124    write_tracker: Option<SharedWriteTracker>,
125}
126
127/// Source for a property value.
128#[derive(Debug, Clone)]
129pub enum PropertySource {
130    /// Get value from an input column.
131    Column(usize),
132    /// Use a constant value.
133    Constant(Value),
134    /// Access a named property from a map/node/edge in an input column.
135    PropertyAccess {
136        /// The column containing the map, node ID, or edge ID.
137        column: usize,
138        /// The property name to extract.
139        property: String,
140    },
141}
142
143impl PropertySource {
144    /// Resolves a property value from a data chunk row.
145    pub fn resolve(
146        &self,
147        chunk: &crate::execution::chunk::DataChunk,
148        row: usize,
149        store: &dyn GraphStore,
150    ) -> Value {
151        match self {
152            PropertySource::Column(col_idx) => chunk
153                .column(*col_idx)
154                .and_then(|c| c.get_value(row))
155                .unwrap_or(Value::Null),
156            PropertySource::Constant(v) => v.clone(),
157            PropertySource::PropertyAccess { column, property } => {
158                let Some(col) = chunk.column(*column) else {
159                    return Value::Null;
160                };
161                // Try node ID first, then edge ID, then map value
162                if let Some(node_id) = col.get_node_id(row) {
163                    store
164                        .get_node(node_id)
165                        .and_then(|node| node.get_property(property).cloned())
166                        .unwrap_or(Value::Null)
167                } else if let Some(edge_id) = col.get_edge_id(row) {
168                    store
169                        .get_edge(edge_id)
170                        .and_then(|edge| edge.get_property(property).cloned())
171                        .unwrap_or(Value::Null)
172                } else if let Some(Value::Map(map)) = col.get_value(row) {
173                    let key = PropertyKey::new(property);
174                    map.get(&key).cloned().unwrap_or(Value::Null)
175                } else {
176                    Value::Null
177                }
178            }
179        }
180    }
181}
182
183impl CreateNodeOperator {
184    /// Creates a new node creation operator.
185    ///
186    /// # Arguments
187    /// * `store` - The graph store to modify.
188    /// * `input` - Optional input operator (None for standalone CREATE).
189    /// * `labels` - Labels to assign to created nodes.
190    /// * `properties` - Properties to set on created nodes.
191    /// * `output_schema` - Schema of the output.
192    /// * `output_column` - Column index where the created node ID goes.
193    pub fn new(
194        store: Arc<dyn GraphStoreMut>,
195        input: Option<Box<dyn Operator>>,
196        labels: Vec<String>,
197        properties: Vec<(String, PropertySource)>,
198        output_schema: Vec<LogicalType>,
199        output_column: usize,
200    ) -> Self {
201        Self {
202            store,
203            input,
204            labels,
205            properties,
206            output_schema,
207            output_column,
208            executed: false,
209            viewing_epoch: None,
210            transaction_id: None,
211            validator: None,
212            write_tracker: None,
213        }
214    }
215
216    /// Sets the transaction context for MVCC versioning.
217    pub fn with_transaction_context(
218        mut self,
219        epoch: EpochId,
220        transaction_id: Option<TransactionId>,
221    ) -> Self {
222        self.viewing_epoch = Some(epoch);
223        self.transaction_id = transaction_id;
224        self
225    }
226
227    /// Sets the constraint validator for schema enforcement.
228    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229        self.validator = Some(validator);
230        self
231    }
232
233    /// Sets the write tracker for conflict detection.
234    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235        self.write_tracker = Some(tracker);
236        self
237    }
238}
239
240impl CreateNodeOperator {
241    /// Validates and sets properties on a newly created node.
242    fn validate_and_set_properties(
243        &self,
244        node_id: NodeId,
245        resolved_props: &mut Vec<(String, Value)>,
246    ) -> Result<(), OperatorError> {
247        // Phase 0: Validate that node labels are allowed by the bound graph type
248        if let Some(ref validator) = self.validator {
249            validator.validate_node_labels_allowed(&self.labels)?;
250        }
251
252        // Phase 0.5: Inject defaults for properties not explicitly provided
253        if let Some(ref validator) = self.validator {
254            validator.inject_defaults(&self.labels, resolved_props);
255        }
256
257        // Phase 1: Validate each property value
258        if let Some(ref validator) = self.validator {
259            for (name, value) in resolved_props.iter() {
260                validator.validate_node_property(&self.labels, name, value)?;
261                validator.check_unique_node_property(&self.labels, name, value)?;
262            }
263            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
264            validator.validate_node_complete(&self.labels, resolved_props)?;
265        }
266
267        // Phase 3: Write properties to the store
268        if let Some(tid) = self.transaction_id {
269            for (name, value) in resolved_props.iter() {
270                self.store
271                    .set_node_property_versioned(node_id, name, value.clone(), tid);
272            }
273        } else {
274            for (name, value) in resolved_props.iter() {
275                self.store.set_node_property(node_id, name, value.clone());
276            }
277        }
278        Ok(())
279    }
280}
281
282impl Operator for CreateNodeOperator {
283    fn next(&mut self) -> OperatorResult {
284        // Get transaction context for versioned creation
285        let epoch = self
286            .viewing_epoch
287            .unwrap_or_else(|| self.store.current_epoch());
288        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
289
290        if let Some(ref mut input) = self.input {
291            // For each input row, create a node
292            if let Some(chunk) = input.next()? {
293                let mut builder =
294                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
295
296                for row in chunk.selected_indices() {
297                    // Resolve all property values first (before creating node)
298                    let mut resolved_props: Vec<(String, Value)> = self
299                        .properties
300                        .iter()
301                        .map(|(name, source)| {
302                            let value =
303                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
304                            (name.clone(), value)
305                        })
306                        .collect();
307
308                    // Create the node with MVCC versioning
309                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
310                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
311
312                    // Record write for conflict detection
313                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
314                        tracker.record_node_write(tid, node_id)?;
315                    }
316
317                    // Validate and set properties
318                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
319
320                    // Copy input columns to output
321                    for col_idx in 0..chunk.column_count() {
322                        if col_idx < self.output_column
323                            && let (Some(src), Some(dst)) =
324                                (chunk.column(col_idx), builder.column_mut(col_idx))
325                        {
326                            if let Some(val) = src.get_value(row) {
327                                dst.push_value(val);
328                            } else {
329                                dst.push_value(Value::Null);
330                            }
331                        }
332                    }
333
334                    // Add the new node ID
335                    if let Some(dst) = builder.column_mut(self.output_column) {
336                        dst.push_value(Value::Int64(node_id.0 as i64));
337                    }
338
339                    builder.advance_row();
340                }
341
342                return Ok(Some(builder.finish()));
343            }
344            Ok(None)
345        } else {
346            // No input - create a single node
347            if self.executed {
348                return Ok(None);
349            }
350            self.executed = true;
351
352            // Resolve constant properties
353            let mut resolved_props: Vec<(String, Value)> = self
354                .properties
355                .iter()
356                .filter_map(|(name, source)| {
357                    if let PropertySource::Constant(value) = source {
358                        Some((name.clone(), value.clone()))
359                    } else {
360                        None
361                    }
362                })
363                .collect();
364
365            // Create the node with MVCC versioning
366            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
367            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
368
369            // Record write for conflict detection
370            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
371                tracker.record_node_write(tid, node_id)?;
372            }
373
374            // Validate and set properties
375            self.validate_and_set_properties(node_id, &mut resolved_props)?;
376
377            // Build output chunk with just the node ID
378            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
379            if let Some(dst) = builder.column_mut(self.output_column) {
380                dst.push_value(Value::Int64(node_id.0 as i64));
381            }
382            builder.advance_row();
383
384            Ok(Some(builder.finish()))
385        }
386    }
387
388    fn reset(&mut self) {
389        if let Some(ref mut input) = self.input {
390            input.reset();
391        }
392        self.executed = false;
393    }
394
395    fn name(&self) -> &'static str {
396        "CreateNode"
397    }
398}
399
400/// Operator that creates new edges.
401pub struct CreateEdgeOperator {
402    /// The graph store to modify.
403    store: Arc<dyn GraphStoreMut>,
404    /// Input operator.
405    input: Box<dyn Operator>,
406    /// Column index for the source node.
407    from_column: usize,
408    /// Column index for the target node.
409    to_column: usize,
410    /// Edge type.
411    edge_type: String,
412    /// Properties to set.
413    properties: Vec<(String, PropertySource)>,
414    /// Output schema.
415    output_schema: Vec<LogicalType>,
416    /// Column index for the created edge variable (if any).
417    output_column: Option<usize>,
418    /// Epoch for MVCC versioning.
419    viewing_epoch: Option<EpochId>,
420    /// Transaction ID for MVCC versioning.
421    transaction_id: Option<TransactionId>,
422    /// Optional constraint validator for schema enforcement.
423    validator: Option<Arc<dyn ConstraintValidator>>,
424    /// Optional write tracker for conflict detection.
425    write_tracker: Option<SharedWriteTracker>,
426}
427
428impl CreateEdgeOperator {
429    /// Creates a new edge creation operator.
430    ///
431    /// Use builder methods to set additional options:
432    /// - [`with_properties`](Self::with_properties) - set edge properties
433    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
434    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
435    pub fn new(
436        store: Arc<dyn GraphStoreMut>,
437        input: Box<dyn Operator>,
438        from_column: usize,
439        to_column: usize,
440        edge_type: String,
441        output_schema: Vec<LogicalType>,
442    ) -> Self {
443        Self {
444            store,
445            input,
446            from_column,
447            to_column,
448            edge_type,
449            properties: Vec::new(),
450            output_schema,
451            output_column: None,
452            viewing_epoch: None,
453            transaction_id: None,
454            validator: None,
455            write_tracker: None,
456        }
457    }
458
459    /// Sets the properties to assign to created edges.
460    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
461        self.properties = properties;
462        self
463    }
464
465    /// Sets the output column for the created edge ID.
466    pub fn with_output_column(mut self, column: usize) -> Self {
467        self.output_column = Some(column);
468        self
469    }
470
471    /// Sets the transaction context for MVCC versioning.
472    pub fn with_transaction_context(
473        mut self,
474        epoch: EpochId,
475        transaction_id: Option<TransactionId>,
476    ) -> Self {
477        self.viewing_epoch = Some(epoch);
478        self.transaction_id = transaction_id;
479        self
480    }
481
482    /// Sets the constraint validator for schema enforcement.
483    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
484        self.validator = Some(validator);
485        self
486    }
487
488    /// Sets the write tracker for conflict detection.
489    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
490        self.write_tracker = Some(tracker);
491        self
492    }
493}
494
495impl Operator for CreateEdgeOperator {
496    fn next(&mut self) -> OperatorResult {
497        // Get transaction context for versioned creation
498        let epoch = self
499            .viewing_epoch
500            .unwrap_or_else(|| self.store.current_epoch());
501        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
502
503        if let Some(chunk) = self.input.next()? {
504            let mut builder =
505                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
506
507            for row in chunk.selected_indices() {
508                // Get source and target node IDs
509                let from_id = chunk
510                    .column(self.from_column)
511                    .and_then(|c| c.get_value(row))
512                    .ok_or_else(|| {
513                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
514                    })?;
515
516                let to_id = chunk
517                    .column(self.to_column)
518                    .and_then(|c| c.get_value(row))
519                    .ok_or_else(|| {
520                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
521                    })?;
522
523                // Extract node IDs
524                let from_node_id = match from_id {
525                    Value::Int64(id) => NodeId(id as u64),
526                    _ => {
527                        return Err(OperatorError::TypeMismatch {
528                            expected: "Int64 (node ID)".to_string(),
529                            found: format!("{from_id:?}"),
530                        });
531                    }
532                };
533
534                let to_node_id = match to_id {
535                    Value::Int64(id) => NodeId(id as u64),
536                    _ => {
537                        return Err(OperatorError::TypeMismatch {
538                            expected: "Int64 (node ID)".to_string(),
539                            found: format!("{to_id:?}"),
540                        });
541                    }
542                };
543
544                // Validate graph type and edge endpoint constraints
545                if let Some(ref validator) = self.validator {
546                    validator.validate_edge_type_allowed(&self.edge_type)?;
547
548                    // Look up source and target node labels for endpoint validation
549                    let source_labels: Vec<String> = self
550                        .store
551                        .get_node(from_node_id)
552                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
553                        .unwrap_or_default();
554                    let target_labels: Vec<String> = self
555                        .store
556                        .get_node(to_node_id)
557                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
558                        .unwrap_or_default();
559                    validator.validate_edge_endpoints(
560                        &self.edge_type,
561                        &source_labels,
562                        &target_labels,
563                    )?;
564                }
565
566                // Resolve property values
567                let resolved_props: Vec<(String, Value)> = self
568                    .properties
569                    .iter()
570                    .map(|(name, source)| {
571                        let value =
572                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
573                        (name.clone(), value)
574                    })
575                    .collect();
576
577                // Validate constraints before writing
578                if let Some(ref validator) = self.validator {
579                    for (name, value) in &resolved_props {
580                        validator.validate_edge_property(&self.edge_type, name, value)?;
581                    }
582                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
583                }
584
585                // Create the edge with MVCC versioning
586                let edge_id = self.store.create_edge_versioned(
587                    from_node_id,
588                    to_node_id,
589                    &self.edge_type,
590                    epoch,
591                    tx,
592                );
593
594                // Record write for conflict detection
595                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
596                    tracker.record_edge_write(tid, edge_id)?;
597                }
598
599                // Set properties
600                if let Some(tid) = self.transaction_id {
601                    for (name, value) in resolved_props {
602                        self.store
603                            .set_edge_property_versioned(edge_id, &name, value, tid);
604                    }
605                } else {
606                    for (name, value) in resolved_props {
607                        self.store.set_edge_property(edge_id, &name, value);
608                    }
609                }
610
611                // Copy input columns
612                for col_idx in 0..chunk.column_count() {
613                    if let (Some(src), Some(dst)) =
614                        (chunk.column(col_idx), builder.column_mut(col_idx))
615                    {
616                        if let Some(val) = src.get_value(row) {
617                            dst.push_value(val);
618                        } else {
619                            dst.push_value(Value::Null);
620                        }
621                    }
622                }
623
624                // Add edge ID if requested
625                if let Some(out_col) = self.output_column
626                    && let Some(dst) = builder.column_mut(out_col)
627                {
628                    dst.push_value(Value::Int64(edge_id.0 as i64));
629                }
630
631                builder.advance_row();
632            }
633
634            return Ok(Some(builder.finish()));
635        }
636        Ok(None)
637    }
638
639    fn reset(&mut self) {
640        self.input.reset();
641    }
642
643    fn name(&self) -> &'static str {
644        "CreateEdge"
645    }
646}
647
648/// Operator that deletes nodes.
649pub struct DeleteNodeOperator {
650    /// The graph store to modify.
651    store: Arc<dyn GraphStoreMut>,
652    /// Input operator.
653    input: Box<dyn Operator>,
654    /// Column index for the node to delete.
655    node_column: usize,
656    /// Output schema.
657    output_schema: Vec<LogicalType>,
658    /// Whether to detach (delete connected edges) before deleting.
659    detach: bool,
660    /// Epoch for MVCC versioning.
661    viewing_epoch: Option<EpochId>,
662    /// Transaction ID for MVCC versioning.
663    transaction_id: Option<TransactionId>,
664    /// Optional write tracker for conflict detection.
665    write_tracker: Option<SharedWriteTracker>,
666}
667
668impl DeleteNodeOperator {
669    /// Creates a new node deletion operator.
670    pub fn new(
671        store: Arc<dyn GraphStoreMut>,
672        input: Box<dyn Operator>,
673        node_column: usize,
674        output_schema: Vec<LogicalType>,
675        detach: bool,
676    ) -> Self {
677        Self {
678            store,
679            input,
680            node_column,
681            output_schema,
682            detach,
683            viewing_epoch: None,
684            transaction_id: None,
685            write_tracker: None,
686        }
687    }
688
689    /// Sets the transaction context for MVCC versioning.
690    pub fn with_transaction_context(
691        mut self,
692        epoch: EpochId,
693        transaction_id: Option<TransactionId>,
694    ) -> Self {
695        self.viewing_epoch = Some(epoch);
696        self.transaction_id = transaction_id;
697        self
698    }
699
700    /// Sets the write tracker for conflict detection.
701    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
702        self.write_tracker = Some(tracker);
703        self
704    }
705}
706
707impl Operator for DeleteNodeOperator {
708    fn next(&mut self) -> OperatorResult {
709        // Get transaction context for versioned deletion
710        let epoch = self
711            .viewing_epoch
712            .unwrap_or_else(|| self.store.current_epoch());
713        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
714
715        if let Some(chunk) = self.input.next()? {
716            let mut builder =
717                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
718
719            for row in chunk.selected_indices() {
720                let node_val = chunk
721                    .column(self.node_column)
722                    .and_then(|c| c.get_value(row))
723                    .ok_or_else(|| {
724                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
725                    })?;
726
727                let node_id = match node_val {
728                    Value::Int64(id) => NodeId(id as u64),
729                    _ => {
730                        return Err(OperatorError::TypeMismatch {
731                            expected: "Int64 (node ID)".to_string(),
732                            found: format!("{node_val:?}"),
733                        });
734                    }
735                };
736
737                if self.detach {
738                    // Delete all connected edges first, using versioned deletion
739                    // so rollback can restore them
740                    let outgoing = self
741                        .store
742                        .edges_from(node_id, crate::graph::Direction::Outgoing);
743                    let incoming = self
744                        .store
745                        .edges_from(node_id, crate::graph::Direction::Incoming);
746                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
747                        self.store.delete_edge_versioned(edge_id, epoch, tx);
748                        if let (Some(tracker), Some(tid)) =
749                            (&self.write_tracker, self.transaction_id)
750                        {
751                            tracker.record_edge_write(tid, edge_id)?;
752                        }
753                    }
754                } else {
755                    // NODETACH: check that node has no connected edges
756                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
757                    if degree > 0 {
758                        return Err(OperatorError::ConstraintViolation(format!(
759                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
760                            degree
761                        )));
762                    }
763                }
764
765                // Delete the node with MVCC versioning
766                self.store.delete_node_versioned(node_id, epoch, tx);
767
768                // Record write for conflict detection
769                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
770                    tracker.record_node_write(tid, node_id)?;
771                }
772
773                // Pass through all input columns so downstream RETURN can
774                // reference the variable (e.g., count(n) after DELETE n).
775                for col_idx in 0..chunk.column_count() {
776                    if let (Some(src), Some(dst)) =
777                        (chunk.column(col_idx), builder.column_mut(col_idx))
778                    {
779                        if let Some(val) = src.get_value(row) {
780                            dst.push_value(val);
781                        } else {
782                            dst.push_value(Value::Null);
783                        }
784                    }
785                }
786                builder.advance_row();
787            }
788
789            return Ok(Some(builder.finish()));
790        }
791        Ok(None)
792    }
793
794    fn reset(&mut self) {
795        self.input.reset();
796    }
797
798    fn name(&self) -> &'static str {
799        "DeleteNode"
800    }
801}
802
803/// Operator that deletes edges.
804pub struct DeleteEdgeOperator {
805    /// The graph store to modify.
806    store: Arc<dyn GraphStoreMut>,
807    /// Input operator.
808    input: Box<dyn Operator>,
809    /// Column index for the edge to delete.
810    edge_column: usize,
811    /// Output schema.
812    output_schema: Vec<LogicalType>,
813    /// Epoch for MVCC versioning.
814    viewing_epoch: Option<EpochId>,
815    /// Transaction ID for MVCC versioning.
816    transaction_id: Option<TransactionId>,
817    /// Optional write tracker for conflict detection.
818    write_tracker: Option<SharedWriteTracker>,
819}
820
821impl DeleteEdgeOperator {
822    /// Creates a new edge deletion operator.
823    pub fn new(
824        store: Arc<dyn GraphStoreMut>,
825        input: Box<dyn Operator>,
826        edge_column: usize,
827        output_schema: Vec<LogicalType>,
828    ) -> Self {
829        Self {
830            store,
831            input,
832            edge_column,
833            output_schema,
834            viewing_epoch: None,
835            transaction_id: None,
836            write_tracker: None,
837        }
838    }
839
840    /// Sets the transaction context for MVCC versioning.
841    pub fn with_transaction_context(
842        mut self,
843        epoch: EpochId,
844        transaction_id: Option<TransactionId>,
845    ) -> Self {
846        self.viewing_epoch = Some(epoch);
847        self.transaction_id = transaction_id;
848        self
849    }
850
851    /// Sets the write tracker for conflict detection.
852    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
853        self.write_tracker = Some(tracker);
854        self
855    }
856}
857
858impl Operator for DeleteEdgeOperator {
859    fn next(&mut self) -> OperatorResult {
860        // Get transaction context for versioned deletion
861        let epoch = self
862            .viewing_epoch
863            .unwrap_or_else(|| self.store.current_epoch());
864        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
865
866        if let Some(chunk) = self.input.next()? {
867            let mut builder =
868                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
869
870            for row in chunk.selected_indices() {
871                let edge_val = chunk
872                    .column(self.edge_column)
873                    .and_then(|c| c.get_value(row))
874                    .ok_or_else(|| {
875                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
876                    })?;
877
878                let edge_id = match edge_val {
879                    Value::Int64(id) => EdgeId(id as u64),
880                    _ => {
881                        return Err(OperatorError::TypeMismatch {
882                            expected: "Int64 (edge ID)".to_string(),
883                            found: format!("{edge_val:?}"),
884                        });
885                    }
886                };
887
888                // Delete the edge with MVCC versioning
889                self.store.delete_edge_versioned(edge_id, epoch, tx);
890
891                // Record write for conflict detection
892                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
893                    tracker.record_edge_write(tid, edge_id)?;
894                }
895
896                // Pass through all input columns
897                for col_idx in 0..chunk.column_count() {
898                    if let (Some(src), Some(dst)) =
899                        (chunk.column(col_idx), builder.column_mut(col_idx))
900                    {
901                        if let Some(val) = src.get_value(row) {
902                            dst.push_value(val);
903                        } else {
904                            dst.push_value(Value::Null);
905                        }
906                    }
907                }
908                builder.advance_row();
909            }
910
911            return Ok(Some(builder.finish()));
912        }
913        Ok(None)
914    }
915
916    fn reset(&mut self) {
917        self.input.reset();
918    }
919
920    fn name(&self) -> &'static str {
921        "DeleteEdge"
922    }
923}
924
925/// Operator that adds labels to nodes.
926pub struct AddLabelOperator {
927    /// The graph store.
928    store: Arc<dyn GraphStoreMut>,
929    /// Child operator providing nodes.
930    input: Box<dyn Operator>,
931    /// Column index containing node IDs.
932    node_column: usize,
933    /// Labels to add.
934    labels: Vec<String>,
935    /// Output schema.
936    output_schema: Vec<LogicalType>,
937    /// Epoch for MVCC versioning.
938    viewing_epoch: Option<EpochId>,
939    /// Transaction ID for undo log tracking.
940    transaction_id: Option<TransactionId>,
941    /// Optional write tracker for conflict detection.
942    write_tracker: Option<SharedWriteTracker>,
943}
944
945impl AddLabelOperator {
946    /// Creates a new add label operator.
947    pub fn new(
948        store: Arc<dyn GraphStoreMut>,
949        input: Box<dyn Operator>,
950        node_column: usize,
951        labels: Vec<String>,
952        output_schema: Vec<LogicalType>,
953    ) -> Self {
954        Self {
955            store,
956            input,
957            node_column,
958            labels,
959            output_schema,
960            viewing_epoch: None,
961            transaction_id: None,
962            write_tracker: None,
963        }
964    }
965
966    /// Sets the transaction context for versioned label mutations.
967    pub fn with_transaction_context(
968        mut self,
969        epoch: EpochId,
970        transaction_id: Option<TransactionId>,
971    ) -> Self {
972        self.viewing_epoch = Some(epoch);
973        self.transaction_id = transaction_id;
974        self
975    }
976
977    /// Sets the write tracker for conflict detection.
978    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
979        self.write_tracker = Some(tracker);
980        self
981    }
982}
983
984impl Operator for AddLabelOperator {
985    fn next(&mut self) -> OperatorResult {
986        if let Some(chunk) = self.input.next()? {
987            let mut updated_count = 0;
988
989            for row in chunk.selected_indices() {
990                let node_val = chunk
991                    .column(self.node_column)
992                    .and_then(|c| c.get_value(row))
993                    .ok_or_else(|| {
994                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
995                    })?;
996
997                let node_id = match node_val {
998                    Value::Int64(id) => NodeId(id as u64),
999                    _ => {
1000                        return Err(OperatorError::TypeMismatch {
1001                            expected: "Int64 (node ID)".to_string(),
1002                            found: format!("{node_val:?}"),
1003                        });
1004                    }
1005                };
1006
1007                // Record write for conflict detection
1008                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1009                    tracker.record_node_write(tid, node_id)?;
1010                }
1011
1012                // Add all labels
1013                for label in &self.labels {
1014                    let added = if let Some(tid) = self.transaction_id {
1015                        self.store.add_label_versioned(node_id, label, tid)
1016                    } else {
1017                        self.store.add_label(node_id, label)
1018                    };
1019                    if added {
1020                        updated_count += 1;
1021                    }
1022                }
1023            }
1024
1025            // Return a chunk with the update count
1026            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1027            if let Some(dst) = builder.column_mut(0) {
1028                dst.push_value(Value::Int64(updated_count));
1029            }
1030            builder.advance_row();
1031
1032            return Ok(Some(builder.finish()));
1033        }
1034        Ok(None)
1035    }
1036
1037    fn reset(&mut self) {
1038        self.input.reset();
1039    }
1040
1041    fn name(&self) -> &'static str {
1042        "AddLabel"
1043    }
1044}
1045
1046/// Operator that removes labels from nodes.
1047pub struct RemoveLabelOperator {
1048    /// The graph store.
1049    store: Arc<dyn GraphStoreMut>,
1050    /// Child operator providing nodes.
1051    input: Box<dyn Operator>,
1052    /// Column index containing node IDs.
1053    node_column: usize,
1054    /// Labels to remove.
1055    labels: Vec<String>,
1056    /// Output schema.
1057    output_schema: Vec<LogicalType>,
1058    /// Epoch for MVCC versioning.
1059    viewing_epoch: Option<EpochId>,
1060    /// Transaction ID for undo log tracking.
1061    transaction_id: Option<TransactionId>,
1062    /// Optional write tracker for conflict detection.
1063    write_tracker: Option<SharedWriteTracker>,
1064}
1065
1066impl RemoveLabelOperator {
1067    /// Creates a new remove label operator.
1068    pub fn new(
1069        store: Arc<dyn GraphStoreMut>,
1070        input: Box<dyn Operator>,
1071        node_column: usize,
1072        labels: Vec<String>,
1073        output_schema: Vec<LogicalType>,
1074    ) -> Self {
1075        Self {
1076            store,
1077            input,
1078            node_column,
1079            labels,
1080            output_schema,
1081            viewing_epoch: None,
1082            transaction_id: None,
1083            write_tracker: None,
1084        }
1085    }
1086
1087    /// Sets the transaction context for versioned label mutations.
1088    pub fn with_transaction_context(
1089        mut self,
1090        epoch: EpochId,
1091        transaction_id: Option<TransactionId>,
1092    ) -> Self {
1093        self.viewing_epoch = Some(epoch);
1094        self.transaction_id = transaction_id;
1095        self
1096    }
1097
1098    /// Sets the write tracker for conflict detection.
1099    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1100        self.write_tracker = Some(tracker);
1101        self
1102    }
1103}
1104
1105impl Operator for RemoveLabelOperator {
1106    fn next(&mut self) -> OperatorResult {
1107        if let Some(chunk) = self.input.next()? {
1108            let mut updated_count = 0;
1109
1110            for row in chunk.selected_indices() {
1111                let node_val = chunk
1112                    .column(self.node_column)
1113                    .and_then(|c| c.get_value(row))
1114                    .ok_or_else(|| {
1115                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1116                    })?;
1117
1118                let node_id = match node_val {
1119                    Value::Int64(id) => NodeId(id as u64),
1120                    _ => {
1121                        return Err(OperatorError::TypeMismatch {
1122                            expected: "Int64 (node ID)".to_string(),
1123                            found: format!("{node_val:?}"),
1124                        });
1125                    }
1126                };
1127
1128                // Record write for conflict detection
1129                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1130                    tracker.record_node_write(tid, node_id)?;
1131                }
1132
1133                // Remove all labels
1134                for label in &self.labels {
1135                    let removed = if let Some(tid) = self.transaction_id {
1136                        self.store.remove_label_versioned(node_id, label, tid)
1137                    } else {
1138                        self.store.remove_label(node_id, label)
1139                    };
1140                    if removed {
1141                        updated_count += 1;
1142                    }
1143                }
1144            }
1145
1146            // Return a chunk with the update count
1147            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1148            if let Some(dst) = builder.column_mut(0) {
1149                dst.push_value(Value::Int64(updated_count));
1150            }
1151            builder.advance_row();
1152
1153            return Ok(Some(builder.finish()));
1154        }
1155        Ok(None)
1156    }
1157
1158    fn reset(&mut self) {
1159        self.input.reset();
1160    }
1161
1162    fn name(&self) -> &'static str {
1163        "RemoveLabel"
1164    }
1165}
1166
1167/// Operator that sets properties on nodes or edges.
1168///
1169/// This operator reads node/edge IDs from a column and sets the
1170/// specified properties on each entity.
1171pub struct SetPropertyOperator {
1172    /// The graph store.
1173    store: Arc<dyn GraphStoreMut>,
1174    /// Child operator providing entities.
1175    input: Box<dyn Operator>,
1176    /// Column index containing entity IDs (node or edge).
1177    entity_column: usize,
1178    /// Whether the entity is an edge (false = node).
1179    is_edge: bool,
1180    /// Properties to set (name -> source).
1181    properties: Vec<(String, PropertySource)>,
1182    /// Output schema.
1183    output_schema: Vec<LogicalType>,
1184    /// Whether to replace all properties (true) or merge (false) for map assignments.
1185    replace: bool,
1186    /// Optional constraint validator for schema enforcement.
1187    validator: Option<Arc<dyn ConstraintValidator>>,
1188    /// Entity labels (for node constraint validation).
1189    labels: Vec<String>,
1190    /// Edge type (for edge constraint validation).
1191    edge_type_name: Option<String>,
1192    /// Epoch for MVCC versioning.
1193    viewing_epoch: Option<EpochId>,
1194    /// Transaction ID for undo log tracking.
1195    transaction_id: Option<TransactionId>,
1196    /// Optional write tracker for conflict detection.
1197    write_tracker: Option<SharedWriteTracker>,
1198}
1199
1200impl SetPropertyOperator {
1201    /// Creates a new set property operator for nodes.
1202    pub fn new_for_node(
1203        store: Arc<dyn GraphStoreMut>,
1204        input: Box<dyn Operator>,
1205        node_column: usize,
1206        properties: Vec<(String, PropertySource)>,
1207        output_schema: Vec<LogicalType>,
1208    ) -> Self {
1209        Self {
1210            store,
1211            input,
1212            entity_column: node_column,
1213            is_edge: false,
1214            properties,
1215            output_schema,
1216            replace: false,
1217            validator: None,
1218            labels: Vec::new(),
1219            edge_type_name: None,
1220            viewing_epoch: None,
1221            transaction_id: None,
1222            write_tracker: None,
1223        }
1224    }
1225
1226    /// Creates a new set property operator for edges.
1227    pub fn new_for_edge(
1228        store: Arc<dyn GraphStoreMut>,
1229        input: Box<dyn Operator>,
1230        edge_column: usize,
1231        properties: Vec<(String, PropertySource)>,
1232        output_schema: Vec<LogicalType>,
1233    ) -> Self {
1234        Self {
1235            store,
1236            input,
1237            entity_column: edge_column,
1238            is_edge: true,
1239            properties,
1240            output_schema,
1241            replace: false,
1242            validator: None,
1243            labels: Vec::new(),
1244            edge_type_name: None,
1245            viewing_epoch: None,
1246            transaction_id: None,
1247            write_tracker: None,
1248        }
1249    }
1250
1251    /// Sets whether this operator replaces all properties (for map assignment).
1252    pub fn with_replace(mut self, replace: bool) -> Self {
1253        self.replace = replace;
1254        self
1255    }
1256
1257    /// Sets the constraint validator for schema enforcement.
1258    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1259        self.validator = Some(validator);
1260        self
1261    }
1262
1263    /// Sets the entity labels (for node constraint validation).
1264    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1265        self.labels = labels;
1266        self
1267    }
1268
1269    /// Sets the edge type name (for edge constraint validation).
1270    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1271        self.edge_type_name = Some(edge_type);
1272        self
1273    }
1274
1275    /// Sets the transaction context for versioned property mutations.
1276    ///
1277    /// When a transaction ID is provided, property changes are recorded in
1278    /// an undo log so they can be restored on rollback.
1279    pub fn with_transaction_context(
1280        mut self,
1281        epoch: EpochId,
1282        transaction_id: Option<TransactionId>,
1283    ) -> Self {
1284        self.viewing_epoch = Some(epoch);
1285        self.transaction_id = transaction_id;
1286        self
1287    }
1288
1289    /// Sets the write tracker for conflict detection.
1290    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1291        self.write_tracker = Some(tracker);
1292        self
1293    }
1294}
1295
1296impl Operator for SetPropertyOperator {
1297    fn next(&mut self) -> OperatorResult {
1298        if let Some(chunk) = self.input.next()? {
1299            let mut builder =
1300                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1301
1302            for row in chunk.selected_indices() {
1303                let entity_val = chunk
1304                    .column(self.entity_column)
1305                    .and_then(|c| c.get_value(row))
1306                    .ok_or_else(|| {
1307                        OperatorError::ColumnNotFound(format!(
1308                            "entity column {}",
1309                            self.entity_column
1310                        ))
1311                    })?;
1312
1313                let entity_id = match entity_val {
1314                    Value::Int64(id) => id as u64,
1315                    _ => {
1316                        return Err(OperatorError::TypeMismatch {
1317                            expected: "Int64 (entity ID)".to_string(),
1318                            found: format!("{entity_val:?}"),
1319                        });
1320                    }
1321                };
1322
1323                // Record write for conflict detection
1324                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1325                    if self.is_edge {
1326                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1327                    } else {
1328                        tracker.record_node_write(tid, NodeId(entity_id))?;
1329                    }
1330                }
1331
1332                // Resolve all property values
1333                let resolved_props: Vec<(String, Value)> = self
1334                    .properties
1335                    .iter()
1336                    .map(|(name, source)| {
1337                        let value =
1338                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1339                        (name.clone(), value)
1340                    })
1341                    .collect();
1342
1343                // Validate constraints before writing
1344                if let Some(ref validator) = self.validator {
1345                    if self.is_edge {
1346                        if let Some(ref et) = self.edge_type_name {
1347                            for (name, value) in &resolved_props {
1348                                validator.validate_edge_property(et, name, value)?;
1349                            }
1350                        }
1351                    } else {
1352                        for (name, value) in &resolved_props {
1353                            validator.validate_node_property(&self.labels, name, value)?;
1354                            validator.check_unique_node_property(&self.labels, name, value)?;
1355                        }
1356                    }
1357                }
1358
1359                // Write all properties (use versioned methods when inside a transaction)
1360                let tx_id = self.transaction_id;
1361                for (prop_name, value) in resolved_props {
1362                    if prop_name == "*" {
1363                        // Map assignment: value should be a Map
1364                        if let Value::Map(map) = value {
1365                            if self.replace {
1366                                // Replace: remove all existing properties first
1367                                if self.is_edge {
1368                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1369                                        let keys: Vec<String> = edge
1370                                            .properties
1371                                            .iter()
1372                                            .map(|(k, _)| k.as_str().to_string())
1373                                            .collect();
1374                                        for key in keys {
1375                                            if let Some(tid) = tx_id {
1376                                                self.store.remove_edge_property_versioned(
1377                                                    EdgeId(entity_id),
1378                                                    &key,
1379                                                    tid,
1380                                                );
1381                                            } else {
1382                                                self.store
1383                                                    .remove_edge_property(EdgeId(entity_id), &key);
1384                                            }
1385                                        }
1386                                    }
1387                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1388                                    let keys: Vec<String> = node
1389                                        .properties
1390                                        .iter()
1391                                        .map(|(k, _)| k.as_str().to_string())
1392                                        .collect();
1393                                    for key in keys {
1394                                        if let Some(tid) = tx_id {
1395                                            self.store.remove_node_property_versioned(
1396                                                NodeId(entity_id),
1397                                                &key,
1398                                                tid,
1399                                            );
1400                                        } else {
1401                                            self.store
1402                                                .remove_node_property(NodeId(entity_id), &key);
1403                                        }
1404                                    }
1405                                }
1406                            }
1407                            // Set each map entry
1408                            for (key, val) in map.iter() {
1409                                if self.is_edge {
1410                                    if let Some(tid) = tx_id {
1411                                        self.store.set_edge_property_versioned(
1412                                            EdgeId(entity_id),
1413                                            key.as_str(),
1414                                            val.clone(),
1415                                            tid,
1416                                        );
1417                                    } else {
1418                                        self.store.set_edge_property(
1419                                            EdgeId(entity_id),
1420                                            key.as_str(),
1421                                            val.clone(),
1422                                        );
1423                                    }
1424                                } else if let Some(tid) = tx_id {
1425                                    self.store.set_node_property_versioned(
1426                                        NodeId(entity_id),
1427                                        key.as_str(),
1428                                        val.clone(),
1429                                        tid,
1430                                    );
1431                                } else {
1432                                    self.store.set_node_property(
1433                                        NodeId(entity_id),
1434                                        key.as_str(),
1435                                        val.clone(),
1436                                    );
1437                                }
1438                            }
1439                        }
1440                    } else if self.is_edge {
1441                        if let Some(tid) = tx_id {
1442                            self.store.set_edge_property_versioned(
1443                                EdgeId(entity_id),
1444                                &prop_name,
1445                                value,
1446                                tid,
1447                            );
1448                        } else {
1449                            self.store
1450                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1451                        }
1452                    } else if let Some(tid) = tx_id {
1453                        self.store.set_node_property_versioned(
1454                            NodeId(entity_id),
1455                            &prop_name,
1456                            value,
1457                            tid,
1458                        );
1459                    } else {
1460                        self.store
1461                            .set_node_property(NodeId(entity_id), &prop_name, value);
1462                    }
1463                }
1464
1465                // Copy input columns to output
1466                for col_idx in 0..chunk.column_count() {
1467                    if let (Some(src), Some(dst)) =
1468                        (chunk.column(col_idx), builder.column_mut(col_idx))
1469                    {
1470                        if let Some(val) = src.get_value(row) {
1471                            dst.push_value(val);
1472                        } else {
1473                            dst.push_value(Value::Null);
1474                        }
1475                    }
1476                }
1477
1478                builder.advance_row();
1479            }
1480
1481            return Ok(Some(builder.finish()));
1482        }
1483        Ok(None)
1484    }
1485
1486    fn reset(&mut self) {
1487        self.input.reset();
1488    }
1489
1490    fn name(&self) -> &'static str {
1491        "SetProperty"
1492    }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497    use super::*;
1498    use crate::execution::DataChunk;
1499    use crate::execution::chunk::DataChunkBuilder;
1500    use crate::graph::lpg::LpgStore;
1501
1502    // ── Helpers ────────────────────────────────────────────────────
1503
1504    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1505        Arc::new(LpgStore::new().unwrap())
1506    }
1507
1508    struct MockInput {
1509        chunk: Option<DataChunk>,
1510    }
1511
1512    impl MockInput {
1513        fn boxed(chunk: DataChunk) -> Box<Self> {
1514            Box::new(Self { chunk: Some(chunk) })
1515        }
1516    }
1517
1518    impl Operator for MockInput {
1519        fn next(&mut self) -> OperatorResult {
1520            Ok(self.chunk.take())
1521        }
1522        fn reset(&mut self) {}
1523        fn name(&self) -> &'static str {
1524            "MockInput"
1525        }
1526    }
1527
1528    struct EmptyInput;
1529    impl Operator for EmptyInput {
1530        fn next(&mut self) -> OperatorResult {
1531            Ok(None)
1532        }
1533        fn reset(&mut self) {}
1534        fn name(&self) -> &'static str {
1535            "EmptyInput"
1536        }
1537    }
1538
1539    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1540        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1541        for id in ids {
1542            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1543            builder.advance_row();
1544        }
1545        builder.finish()
1546    }
1547
1548    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1549        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1550        for id in ids {
1551            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1552            builder.advance_row();
1553        }
1554        builder.finish()
1555    }
1556
1557    // ── CreateNodeOperator ──────────────────────────────────────
1558
1559    #[test]
1560    fn test_create_node_standalone() {
1561        let store = create_test_store();
1562
1563        let mut op = CreateNodeOperator::new(
1564            Arc::clone(&store),
1565            None,
1566            vec!["Person".to_string()],
1567            vec![(
1568                "name".to_string(),
1569                PropertySource::Constant(Value::String("Alix".into())),
1570            )],
1571            vec![LogicalType::Int64],
1572            0,
1573        );
1574
1575        let chunk = op.next().unwrap().unwrap();
1576        assert_eq!(chunk.row_count(), 1);
1577
1578        // Second call should return None (standalone executes once)
1579        assert!(op.next().unwrap().is_none());
1580
1581        assert_eq!(store.node_count(), 1);
1582    }
1583
1584    #[test]
1585    fn test_create_edge() {
1586        let store = create_test_store();
1587
1588        let node1 = store.create_node(&["Person"]);
1589        let node2 = store.create_node(&["Person"]);
1590
1591        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1592        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1593        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1594        builder.advance_row();
1595
1596        let mut op = CreateEdgeOperator::new(
1597            Arc::clone(&store),
1598            MockInput::boxed(builder.finish()),
1599            0,
1600            1,
1601            "KNOWS".to_string(),
1602            vec![LogicalType::Int64, LogicalType::Int64],
1603        );
1604
1605        let _chunk = op.next().unwrap().unwrap();
1606        assert_eq!(store.edge_count(), 1);
1607    }
1608
1609    #[test]
1610    fn test_delete_node() {
1611        let store = create_test_store();
1612
1613        let node_id = store.create_node(&["Person"]);
1614        assert_eq!(store.node_count(), 1);
1615
1616        let mut op = DeleteNodeOperator::new(
1617            Arc::clone(&store),
1618            MockInput::boxed(node_id_chunk(&[node_id])),
1619            0,
1620            vec![LogicalType::Node],
1621            false,
1622        );
1623
1624        let chunk = op.next().unwrap().unwrap();
1625        // Pass-through: output row contains the original node ID
1626        assert_eq!(chunk.row_count(), 1);
1627        assert_eq!(store.node_count(), 0);
1628    }
1629
1630    // ── DeleteEdgeOperator ───────────────────────────────────────
1631
1632    #[test]
1633    fn test_delete_edge() {
1634        let store = create_test_store();
1635
1636        let n1 = store.create_node(&["Person"]);
1637        let n2 = store.create_node(&["Person"]);
1638        let eid = store.create_edge(n1, n2, "KNOWS");
1639        assert_eq!(store.edge_count(), 1);
1640
1641        let mut op = DeleteEdgeOperator::new(
1642            Arc::clone(&store),
1643            MockInput::boxed(edge_id_chunk(&[eid])),
1644            0,
1645            vec![LogicalType::Node],
1646        );
1647
1648        let chunk = op.next().unwrap().unwrap();
1649        assert_eq!(chunk.row_count(), 1);
1650        assert_eq!(store.edge_count(), 0);
1651    }
1652
1653    #[test]
1654    fn test_delete_edge_no_input_returns_none() {
1655        let store = create_test_store();
1656
1657        let mut op = DeleteEdgeOperator::new(
1658            Arc::clone(&store),
1659            Box::new(EmptyInput),
1660            0,
1661            vec![LogicalType::Int64],
1662        );
1663
1664        assert!(op.next().unwrap().is_none());
1665    }
1666
1667    #[test]
1668    fn test_delete_multiple_edges() {
1669        let store = create_test_store();
1670
1671        let n1 = store.create_node(&["N"]);
1672        let n2 = store.create_node(&["N"]);
1673        let e1 = store.create_edge(n1, n2, "R");
1674        let e2 = store.create_edge(n2, n1, "S");
1675        assert_eq!(store.edge_count(), 2);
1676
1677        let mut op = DeleteEdgeOperator::new(
1678            Arc::clone(&store),
1679            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1680            0,
1681            vec![LogicalType::Node],
1682        );
1683
1684        let chunk = op.next().unwrap().unwrap();
1685        assert_eq!(chunk.row_count(), 2);
1686        assert_eq!(store.edge_count(), 0);
1687    }
1688
1689    // ── DeleteNodeOperator with DETACH ───────────────────────────
1690
1691    #[test]
1692    fn test_delete_node_detach() {
1693        let store = create_test_store();
1694
1695        let n1 = store.create_node(&["Person"]);
1696        let n2 = store.create_node(&["Person"]);
1697        store.create_edge(n1, n2, "KNOWS");
1698        store.create_edge(n2, n1, "FOLLOWS");
1699        assert_eq!(store.edge_count(), 2);
1700
1701        let mut op = DeleteNodeOperator::new(
1702            Arc::clone(&store),
1703            MockInput::boxed(node_id_chunk(&[n1])),
1704            0,
1705            vec![LogicalType::Node],
1706            true, // detach = true
1707        );
1708
1709        let chunk = op.next().unwrap().unwrap();
1710        assert_eq!(chunk.row_count(), 1);
1711        assert_eq!(store.node_count(), 1);
1712        assert_eq!(store.edge_count(), 0); // edges detached
1713    }
1714
1715    // ── AddLabelOperator ─────────────────────────────────────────
1716
1717    #[test]
1718    fn test_add_label() {
1719        let store = create_test_store();
1720
1721        let node = store.create_node(&["Person"]);
1722
1723        let mut op = AddLabelOperator::new(
1724            Arc::clone(&store),
1725            MockInput::boxed(node_id_chunk(&[node])),
1726            0,
1727            vec!["Employee".to_string()],
1728            vec![LogicalType::Int64],
1729        );
1730
1731        let chunk = op.next().unwrap().unwrap();
1732        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1733        assert_eq!(updated, 1);
1734
1735        // Verify label was added
1736        let node_data = store.get_node(node).unwrap();
1737        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1738        assert!(labels.contains(&"Person"));
1739        assert!(labels.contains(&"Employee"));
1740    }
1741
1742    #[test]
1743    fn test_add_multiple_labels() {
1744        let store = create_test_store();
1745
1746        let node = store.create_node(&["Base"]);
1747
1748        let mut op = AddLabelOperator::new(
1749            Arc::clone(&store),
1750            MockInput::boxed(node_id_chunk(&[node])),
1751            0,
1752            vec!["LabelA".to_string(), "LabelB".to_string()],
1753            vec![LogicalType::Int64],
1754        );
1755
1756        let chunk = op.next().unwrap().unwrap();
1757        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1758        assert_eq!(updated, 2); // 2 labels added
1759
1760        let node_data = store.get_node(node).unwrap();
1761        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1762        assert!(labels.contains(&"LabelA"));
1763        assert!(labels.contains(&"LabelB"));
1764    }
1765
1766    #[test]
1767    fn test_add_label_no_input_returns_none() {
1768        let store = create_test_store();
1769
1770        let mut op = AddLabelOperator::new(
1771            Arc::clone(&store),
1772            Box::new(EmptyInput),
1773            0,
1774            vec!["Foo".to_string()],
1775            vec![LogicalType::Int64],
1776        );
1777
1778        assert!(op.next().unwrap().is_none());
1779    }
1780
1781    // ── RemoveLabelOperator ──────────────────────────────────────
1782
1783    #[test]
1784    fn test_remove_label() {
1785        let store = create_test_store();
1786
1787        let node = store.create_node(&["Person", "Employee"]);
1788
1789        let mut op = RemoveLabelOperator::new(
1790            Arc::clone(&store),
1791            MockInput::boxed(node_id_chunk(&[node])),
1792            0,
1793            vec!["Employee".to_string()],
1794            vec![LogicalType::Int64],
1795        );
1796
1797        let chunk = op.next().unwrap().unwrap();
1798        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1799        assert_eq!(updated, 1);
1800
1801        // Verify label was removed
1802        let node_data = store.get_node(node).unwrap();
1803        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1804        assert!(labels.contains(&"Person"));
1805        assert!(!labels.contains(&"Employee"));
1806    }
1807
1808    #[test]
1809    fn test_remove_nonexistent_label() {
1810        let store = create_test_store();
1811
1812        let node = store.create_node(&["Person"]);
1813
1814        let mut op = RemoveLabelOperator::new(
1815            Arc::clone(&store),
1816            MockInput::boxed(node_id_chunk(&[node])),
1817            0,
1818            vec!["NonExistent".to_string()],
1819            vec![LogicalType::Int64],
1820        );
1821
1822        let chunk = op.next().unwrap().unwrap();
1823        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1824        assert_eq!(updated, 0); // nothing removed
1825    }
1826
1827    // ── SetPropertyOperator ──────────────────────────────────────
1828
1829    #[test]
1830    fn test_set_node_property_constant() {
1831        let store = create_test_store();
1832
1833        let node = store.create_node(&["Person"]);
1834
1835        let mut op = SetPropertyOperator::new_for_node(
1836            Arc::clone(&store),
1837            MockInput::boxed(node_id_chunk(&[node])),
1838            0,
1839            vec![(
1840                "name".to_string(),
1841                PropertySource::Constant(Value::String("Alix".into())),
1842            )],
1843            vec![LogicalType::Int64],
1844        );
1845
1846        let chunk = op.next().unwrap().unwrap();
1847        assert_eq!(chunk.row_count(), 1);
1848
1849        // Verify property was set
1850        let node_data = store.get_node(node).unwrap();
1851        assert_eq!(
1852            node_data
1853                .properties
1854                .get(&grafeo_common::types::PropertyKey::new("name")),
1855            Some(&Value::String("Alix".into()))
1856        );
1857    }
1858
1859    #[test]
1860    fn test_set_node_property_from_column() {
1861        let store = create_test_store();
1862
1863        let node = store.create_node(&["Person"]);
1864
1865        // Input: column 0 = node ID, column 1 = property value
1866        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1867        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1868        builder
1869            .column_mut(1)
1870            .unwrap()
1871            .push_value(Value::String("Gus".into()));
1872        builder.advance_row();
1873
1874        let mut op = SetPropertyOperator::new_for_node(
1875            Arc::clone(&store),
1876            MockInput::boxed(builder.finish()),
1877            0,
1878            vec![("name".to_string(), PropertySource::Column(1))],
1879            vec![LogicalType::Int64, LogicalType::String],
1880        );
1881
1882        let chunk = op.next().unwrap().unwrap();
1883        assert_eq!(chunk.row_count(), 1);
1884
1885        let node_data = store.get_node(node).unwrap();
1886        assert_eq!(
1887            node_data
1888                .properties
1889                .get(&grafeo_common::types::PropertyKey::new("name")),
1890            Some(&Value::String("Gus".into()))
1891        );
1892    }
1893
1894    #[test]
1895    fn test_set_edge_property() {
1896        let store = create_test_store();
1897
1898        let n1 = store.create_node(&["N"]);
1899        let n2 = store.create_node(&["N"]);
1900        let eid = store.create_edge(n1, n2, "KNOWS");
1901
1902        let mut op = SetPropertyOperator::new_for_edge(
1903            Arc::clone(&store),
1904            MockInput::boxed(edge_id_chunk(&[eid])),
1905            0,
1906            vec![(
1907                "weight".to_string(),
1908                PropertySource::Constant(Value::Float64(0.75)),
1909            )],
1910            vec![LogicalType::Int64],
1911        );
1912
1913        let chunk = op.next().unwrap().unwrap();
1914        assert_eq!(chunk.row_count(), 1);
1915
1916        let edge_data = store.get_edge(eid).unwrap();
1917        assert_eq!(
1918            edge_data
1919                .properties
1920                .get(&grafeo_common::types::PropertyKey::new("weight")),
1921            Some(&Value::Float64(0.75))
1922        );
1923    }
1924
1925    #[test]
1926    fn test_set_multiple_properties() {
1927        let store = create_test_store();
1928
1929        let node = store.create_node(&["Person"]);
1930
1931        let mut op = SetPropertyOperator::new_for_node(
1932            Arc::clone(&store),
1933            MockInput::boxed(node_id_chunk(&[node])),
1934            0,
1935            vec![
1936                (
1937                    "name".to_string(),
1938                    PropertySource::Constant(Value::String("Alix".into())),
1939                ),
1940                (
1941                    "age".to_string(),
1942                    PropertySource::Constant(Value::Int64(30)),
1943                ),
1944            ],
1945            vec![LogicalType::Int64],
1946        );
1947
1948        op.next().unwrap().unwrap();
1949
1950        let node_data = store.get_node(node).unwrap();
1951        assert_eq!(
1952            node_data
1953                .properties
1954                .get(&grafeo_common::types::PropertyKey::new("name")),
1955            Some(&Value::String("Alix".into()))
1956        );
1957        assert_eq!(
1958            node_data
1959                .properties
1960                .get(&grafeo_common::types::PropertyKey::new("age")),
1961            Some(&Value::Int64(30))
1962        );
1963    }
1964
1965    #[test]
1966    fn test_set_property_no_input_returns_none() {
1967        let store = create_test_store();
1968
1969        let mut op = SetPropertyOperator::new_for_node(
1970            Arc::clone(&store),
1971            Box::new(EmptyInput),
1972            0,
1973            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1974            vec![LogicalType::Int64],
1975        );
1976
1977        assert!(op.next().unwrap().is_none());
1978    }
1979
1980    // ── Error paths ──────────────────────────────────────────────
1981
1982    #[test]
1983    fn test_delete_node_without_detach_errors_when_edges_exist() {
1984        let store = create_test_store();
1985
1986        let n1 = store.create_node(&["Person"]);
1987        let n2 = store.create_node(&["Person"]);
1988        store.create_edge(n1, n2, "KNOWS");
1989
1990        let mut op = DeleteNodeOperator::new(
1991            Arc::clone(&store),
1992            MockInput::boxed(node_id_chunk(&[n1])),
1993            0,
1994            vec![LogicalType::Int64],
1995            false, // no detach
1996        );
1997
1998        let err = op.next().unwrap_err();
1999        match err {
2000            OperatorError::ConstraintViolation(msg) => {
2001                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2002            }
2003            other => panic!("expected ConstraintViolation, got {other:?}"),
2004        }
2005        // Node should still exist
2006        assert_eq!(store.node_count(), 2);
2007    }
2008
2009    // ── CreateNodeOperator with input ───────────────────────────
2010
2011    #[test]
2012    fn test_create_node_with_input_operator() {
2013        let store = create_test_store();
2014
2015        // Seed node to provide input rows
2016        let existing = store.create_node(&["Seed"]);
2017
2018        let mut op = CreateNodeOperator::new(
2019            Arc::clone(&store),
2020            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2021            vec!["Created".to_string()],
2022            vec![(
2023                "source".to_string(),
2024                PropertySource::Constant(Value::String("from_input".into())),
2025            )],
2026            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2027            1,                                            // output column for new node ID
2028        );
2029
2030        let chunk = op.next().unwrap().unwrap();
2031        assert_eq!(chunk.row_count(), 1);
2032
2033        // Should have created one new node (2 total: Seed + Created)
2034        assert_eq!(store.node_count(), 2);
2035
2036        // Exhausted
2037        assert!(op.next().unwrap().is_none());
2038    }
2039
2040    // ── CreateEdgeOperator with properties and output column ────
2041
2042    #[test]
2043    fn test_create_edge_with_properties_and_output_column() {
2044        let store = create_test_store();
2045
2046        let n1 = store.create_node(&["Person"]);
2047        let n2 = store.create_node(&["Person"]);
2048
2049        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2050        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2051        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2052        builder.advance_row();
2053
2054        let mut op = CreateEdgeOperator::new(
2055            Arc::clone(&store),
2056            MockInput::boxed(builder.finish()),
2057            0,
2058            1,
2059            "KNOWS".to_string(),
2060            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2061        )
2062        .with_properties(vec![(
2063            "since".to_string(),
2064            PropertySource::Constant(Value::Int64(2024)),
2065        )])
2066        .with_output_column(2);
2067
2068        let chunk = op.next().unwrap().unwrap();
2069        assert_eq!(chunk.row_count(), 1);
2070        assert_eq!(store.edge_count(), 1);
2071
2072        // Verify the output chunk contains the edge ID in column 2
2073        let edge_id_raw = chunk
2074            .column(2)
2075            .and_then(|c| c.get_int64(0))
2076            .expect("edge ID should be in output column 2");
2077        let edge_id = EdgeId(edge_id_raw as u64);
2078
2079        // Verify the edge has the property
2080        let edge = store.get_edge(edge_id).expect("edge should exist");
2081        assert_eq!(
2082            edge.properties
2083                .get(&grafeo_common::types::PropertyKey::new("since")),
2084            Some(&Value::Int64(2024))
2085        );
2086    }
2087
2088    // ── SetPropertyOperator with map replacement ────────────────
2089
2090    #[test]
2091    fn test_set_property_map_replace() {
2092        use std::collections::BTreeMap;
2093
2094        let store = create_test_store();
2095
2096        let node = store.create_node(&["Person"]);
2097        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2098
2099        let mut map = BTreeMap::new();
2100        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2101
2102        let mut op = SetPropertyOperator::new_for_node(
2103            Arc::clone(&store),
2104            MockInput::boxed(node_id_chunk(&[node])),
2105            0,
2106            vec![(
2107                "*".to_string(),
2108                PropertySource::Constant(Value::Map(Arc::new(map))),
2109            )],
2110            vec![LogicalType::Int64],
2111        )
2112        .with_replace(true);
2113
2114        op.next().unwrap().unwrap();
2115
2116        let node_data = store.get_node(node).unwrap();
2117        // Old property should be gone
2118        assert!(
2119            node_data
2120                .properties
2121                .get(&PropertyKey::new("old_prop"))
2122                .is_none()
2123        );
2124        // New property should exist
2125        assert_eq!(
2126            node_data.properties.get(&PropertyKey::new("new_key")),
2127            Some(&Value::String("new_val".into()))
2128        );
2129    }
2130
2131    // ── SetPropertyOperator with map merge (no replace) ─────────
2132
2133    #[test]
2134    fn test_set_property_map_merge() {
2135        use std::collections::BTreeMap;
2136
2137        let store = create_test_store();
2138
2139        let node = store.create_node(&["Person"]);
2140        store.set_node_property(node, "existing", Value::Int64(42));
2141
2142        let mut map = BTreeMap::new();
2143        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2144
2145        let mut op = SetPropertyOperator::new_for_node(
2146            Arc::clone(&store),
2147            MockInput::boxed(node_id_chunk(&[node])),
2148            0,
2149            vec![(
2150                "*".to_string(),
2151                PropertySource::Constant(Value::Map(Arc::new(map))),
2152            )],
2153            vec![LogicalType::Int64],
2154        ); // replace defaults to false
2155
2156        op.next().unwrap().unwrap();
2157
2158        let node_data = store.get_node(node).unwrap();
2159        // Existing property should still be there
2160        assert_eq!(
2161            node_data.properties.get(&PropertyKey::new("existing")),
2162            Some(&Value::Int64(42))
2163        );
2164        // New property should also exist
2165        assert_eq!(
2166            node_data.properties.get(&PropertyKey::new("added")),
2167            Some(&Value::String("hello".into()))
2168        );
2169    }
2170
2171    // ── PropertySource::PropertyAccess ──────────────────────────
2172
2173    #[test]
2174    fn test_property_source_property_access() {
2175        let store = create_test_store();
2176
2177        let source_node = store.create_node(&["Source"]);
2178        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2179
2180        let target_node = store.create_node(&["Target"]);
2181
2182        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2183        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2184        builder.column_mut(0).unwrap().push_node_id(source_node);
2185        builder
2186            .column_mut(1)
2187            .unwrap()
2188            .push_int64(target_node.0 as i64);
2189        builder.advance_row();
2190
2191        let mut op = SetPropertyOperator::new_for_node(
2192            Arc::clone(&store),
2193            MockInput::boxed(builder.finish()),
2194            1, // entity column = target node
2195            vec![(
2196                "copied_name".to_string(),
2197                PropertySource::PropertyAccess {
2198                    column: 0,
2199                    property: "name".to_string(),
2200                },
2201            )],
2202            vec![LogicalType::Node, LogicalType::Int64],
2203        );
2204
2205        op.next().unwrap().unwrap();
2206
2207        let target_data = store.get_node(target_node).unwrap();
2208        assert_eq!(
2209            target_data.properties.get(&PropertyKey::new("copied_name")),
2210            Some(&Value::String("Alix".into()))
2211        );
2212    }
2213
2214    // ── ConstraintValidator integration ─────────────────────────
2215
2216    #[test]
2217    fn test_create_node_with_constraint_validator() {
2218        let store = create_test_store();
2219
2220        struct RejectAgeValidator;
2221        impl ConstraintValidator for RejectAgeValidator {
2222            fn validate_node_property(
2223                &self,
2224                _labels: &[String],
2225                key: &str,
2226                _value: &Value,
2227            ) -> Result<(), OperatorError> {
2228                if key == "forbidden" {
2229                    return Err(OperatorError::ConstraintViolation(
2230                        "property 'forbidden' is not allowed".to_string(),
2231                    ));
2232                }
2233                Ok(())
2234            }
2235            fn validate_node_complete(
2236                &self,
2237                _labels: &[String],
2238                _properties: &[(String, Value)],
2239            ) -> Result<(), OperatorError> {
2240                Ok(())
2241            }
2242            fn check_unique_node_property(
2243                &self,
2244                _labels: &[String],
2245                _key: &str,
2246                _value: &Value,
2247            ) -> Result<(), OperatorError> {
2248                Ok(())
2249            }
2250            fn validate_edge_property(
2251                &self,
2252                _edge_type: &str,
2253                _key: &str,
2254                _value: &Value,
2255            ) -> Result<(), OperatorError> {
2256                Ok(())
2257            }
2258            fn validate_edge_complete(
2259                &self,
2260                _edge_type: &str,
2261                _properties: &[(String, Value)],
2262            ) -> Result<(), OperatorError> {
2263                Ok(())
2264            }
2265        }
2266
2267        // Valid property should succeed
2268        let mut op = CreateNodeOperator::new(
2269            Arc::clone(&store),
2270            None,
2271            vec!["Thing".to_string()],
2272            vec![(
2273                "name".to_string(),
2274                PropertySource::Constant(Value::String("ok".into())),
2275            )],
2276            vec![LogicalType::Int64],
2277            0,
2278        )
2279        .with_validator(Arc::new(RejectAgeValidator));
2280
2281        assert!(op.next().is_ok());
2282        assert_eq!(store.node_count(), 1);
2283
2284        // Forbidden property should fail
2285        let mut op = CreateNodeOperator::new(
2286            Arc::clone(&store),
2287            None,
2288            vec!["Thing".to_string()],
2289            vec![(
2290                "forbidden".to_string(),
2291                PropertySource::Constant(Value::Int64(1)),
2292            )],
2293            vec![LogicalType::Int64],
2294            0,
2295        )
2296        .with_validator(Arc::new(RejectAgeValidator));
2297
2298        let err = op.next().unwrap_err();
2299        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2300        // Node count should still be 2 (the node is created before validation, but the error
2301        // propagates - this tests the validation logic fires)
2302    }
2303
2304    // ── Reset behavior ──────────────────────────────────────────
2305
2306    #[test]
2307    fn test_create_node_reset_allows_re_execution() {
2308        let store = create_test_store();
2309
2310        let mut op = CreateNodeOperator::new(
2311            Arc::clone(&store),
2312            None,
2313            vec!["Person".to_string()],
2314            vec![],
2315            vec![LogicalType::Int64],
2316            0,
2317        );
2318
2319        // First execution
2320        assert!(op.next().unwrap().is_some());
2321        assert!(op.next().unwrap().is_none());
2322
2323        // Reset and re-execute
2324        op.reset();
2325        assert!(op.next().unwrap().is_some());
2326
2327        assert_eq!(store.node_count(), 2);
2328    }
2329
2330    // ── Operator name() ──────────────────────────────────────────
2331
2332    #[test]
2333    fn test_operator_names() {
2334        let store = create_test_store();
2335
2336        let op = CreateNodeOperator::new(
2337            Arc::clone(&store),
2338            None,
2339            vec![],
2340            vec![],
2341            vec![LogicalType::Int64],
2342            0,
2343        );
2344        assert_eq!(op.name(), "CreateNode");
2345
2346        let op = CreateEdgeOperator::new(
2347            Arc::clone(&store),
2348            Box::new(EmptyInput),
2349            0,
2350            1,
2351            "R".to_string(),
2352            vec![LogicalType::Int64],
2353        );
2354        assert_eq!(op.name(), "CreateEdge");
2355
2356        let op = DeleteNodeOperator::new(
2357            Arc::clone(&store),
2358            Box::new(EmptyInput),
2359            0,
2360            vec![LogicalType::Int64],
2361            false,
2362        );
2363        assert_eq!(op.name(), "DeleteNode");
2364
2365        let op = DeleteEdgeOperator::new(
2366            Arc::clone(&store),
2367            Box::new(EmptyInput),
2368            0,
2369            vec![LogicalType::Int64],
2370        );
2371        assert_eq!(op.name(), "DeleteEdge");
2372
2373        let op = AddLabelOperator::new(
2374            Arc::clone(&store),
2375            Box::new(EmptyInput),
2376            0,
2377            vec!["L".to_string()],
2378            vec![LogicalType::Int64],
2379        );
2380        assert_eq!(op.name(), "AddLabel");
2381
2382        let op = RemoveLabelOperator::new(
2383            Arc::clone(&store),
2384            Box::new(EmptyInput),
2385            0,
2386            vec!["L".to_string()],
2387            vec![LogicalType::Int64],
2388        );
2389        assert_eq!(op.name(), "RemoveLabel");
2390
2391        let op = SetPropertyOperator::new_for_node(
2392            Arc::clone(&store),
2393            Box::new(EmptyInput),
2394            0,
2395            vec![],
2396            vec![LogicalType::Int64],
2397        );
2398        assert_eq!(op.name(), "SetProperty");
2399    }
2400}