Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    fn validate_node_property(
28        &self,
29        labels: &[String],
30        key: &str,
31        value: &Value,
32    ) -> Result<(), OperatorError>;
33
34    /// Validates that all required properties are present after creating a node.
35    ///
36    /// Checks NOT NULL constraints for properties that were not explicitly set.
37    fn validate_node_complete(
38        &self,
39        labels: &[String],
40        properties: &[(String, Value)],
41    ) -> Result<(), OperatorError>;
42
43    /// Checks UNIQUE constraint for a node property value.
44    ///
45    /// Returns an error if a node with the same label already has this value.
46    fn check_unique_node_property(
47        &self,
48        labels: &[String],
49        key: &str,
50        value: &Value,
51    ) -> Result<(), OperatorError>;
52
53    /// Validates a single property value for an edge of the given type.
54    fn validate_edge_property(
55        &self,
56        edge_type: &str,
57        key: &str,
58        value: &Value,
59    ) -> Result<(), OperatorError>;
60
61    /// Validates that all required properties are present after creating an edge.
62    fn validate_edge_complete(
63        &self,
64        edge_type: &str,
65        properties: &[(String, Value)],
66    ) -> Result<(), OperatorError>;
67
68    /// Validates that the node labels are allowed by the bound graph type.
69    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70        let _ = labels;
71        Ok(())
72    }
73
74    /// Validates that the edge type is allowed by the bound graph type.
75    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76        let _ = edge_type;
77        Ok(())
78    }
79
80    /// Validates that edge endpoints have the correct node type labels.
81    fn validate_edge_endpoints(
82        &self,
83        edge_type: &str,
84        source_labels: &[String],
85        target_labels: &[String],
86    ) -> Result<(), OperatorError> {
87        let _ = (edge_type, source_labels, target_labels);
88        Ok(())
89    }
90
91    /// Injects default values for properties that are defined in a type but
92    /// not explicitly provided.
93    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94        let _ = (labels, properties);
95    }
96}
97
98/// Operator that creates new nodes.
99///
100/// For each input row, creates a new node with the specified labels
101/// and properties, then outputs the row with the new node.
102pub struct CreateNodeOperator {
103    /// The graph store to modify.
104    store: Arc<dyn GraphStoreMut>,
105    /// Input operator.
106    input: Option<Box<dyn Operator>>,
107    /// Labels for the new nodes.
108    labels: Vec<String>,
109    /// Properties to set (name -> column index or constant value).
110    properties: Vec<(String, PropertySource)>,
111    /// Output schema.
112    output_schema: Vec<LogicalType>,
113    /// Column index for the created node variable.
114    output_column: usize,
115    /// Whether this operator has been executed (for no-input case).
116    executed: bool,
117    /// Epoch for MVCC versioning.
118    viewing_epoch: Option<EpochId>,
119    /// Transaction ID for MVCC versioning.
120    transaction_id: Option<TransactionId>,
121    /// Optional constraint validator for schema enforcement.
122    validator: Option<Arc<dyn ConstraintValidator>>,
123    /// Optional write tracker for conflict detection.
124    write_tracker: Option<SharedWriteTracker>,
125}
126
127/// Source for a property value.
128#[derive(Debug, Clone)]
129pub enum PropertySource {
130    /// Get value from an input column.
131    Column(usize),
132    /// Use a constant value.
133    Constant(Value),
134    /// Access a named property from a map/node/edge in an input column.
135    PropertyAccess {
136        /// The column containing the map, node ID, or edge ID.
137        column: usize,
138        /// The property name to extract.
139        property: String,
140    },
141}
142
143impl PropertySource {
144    /// Resolves a property value from a data chunk row.
145    pub fn resolve(
146        &self,
147        chunk: &crate::execution::chunk::DataChunk,
148        row: usize,
149        store: &dyn GraphStore,
150    ) -> Value {
151        match self {
152            PropertySource::Column(col_idx) => chunk
153                .column(*col_idx)
154                .and_then(|c| c.get_value(row))
155                .unwrap_or(Value::Null),
156            PropertySource::Constant(v) => v.clone(),
157            PropertySource::PropertyAccess { column, property } => {
158                let Some(col) = chunk.column(*column) else {
159                    return Value::Null;
160                };
161                // Try node ID first, then edge ID, then map value
162                if let Some(node_id) = col.get_node_id(row) {
163                    store
164                        .get_node(node_id)
165                        .and_then(|node| node.get_property(property).cloned())
166                        .unwrap_or(Value::Null)
167                } else if let Some(edge_id) = col.get_edge_id(row) {
168                    store
169                        .get_edge(edge_id)
170                        .and_then(|edge| edge.get_property(property).cloned())
171                        .unwrap_or(Value::Null)
172                } else if let Some(Value::Map(map)) = col.get_value(row) {
173                    let key = PropertyKey::new(property);
174                    map.get(&key).cloned().unwrap_or(Value::Null)
175                } else {
176                    Value::Null
177                }
178            }
179        }
180    }
181}
182
183impl CreateNodeOperator {
184    /// Creates a new node creation operator.
185    ///
186    /// # Arguments
187    /// * `store` - The graph store to modify.
188    /// * `input` - Optional input operator (None for standalone CREATE).
189    /// * `labels` - Labels to assign to created nodes.
190    /// * `properties` - Properties to set on created nodes.
191    /// * `output_schema` - Schema of the output.
192    /// * `output_column` - Column index where the created node ID goes.
193    pub fn new(
194        store: Arc<dyn GraphStoreMut>,
195        input: Option<Box<dyn Operator>>,
196        labels: Vec<String>,
197        properties: Vec<(String, PropertySource)>,
198        output_schema: Vec<LogicalType>,
199        output_column: usize,
200    ) -> Self {
201        Self {
202            store,
203            input,
204            labels,
205            properties,
206            output_schema,
207            output_column,
208            executed: false,
209            viewing_epoch: None,
210            transaction_id: None,
211            validator: None,
212            write_tracker: None,
213        }
214    }
215
216    /// Sets the transaction context for MVCC versioning.
217    pub fn with_transaction_context(
218        mut self,
219        epoch: EpochId,
220        transaction_id: Option<TransactionId>,
221    ) -> Self {
222        self.viewing_epoch = Some(epoch);
223        self.transaction_id = transaction_id;
224        self
225    }
226
227    /// Sets the constraint validator for schema enforcement.
228    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229        self.validator = Some(validator);
230        self
231    }
232
233    /// Sets the write tracker for conflict detection.
234    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235        self.write_tracker = Some(tracker);
236        self
237    }
238}
239
240impl CreateNodeOperator {
241    /// Validates and sets properties on a newly created node.
242    fn validate_and_set_properties(
243        &self,
244        node_id: NodeId,
245        resolved_props: &mut Vec<(String, Value)>,
246    ) -> Result<(), OperatorError> {
247        // Phase 0: Validate that node labels are allowed by the bound graph type
248        if let Some(ref validator) = self.validator {
249            validator.validate_node_labels_allowed(&self.labels)?;
250        }
251
252        // Phase 0.5: Inject defaults for properties not explicitly provided
253        if let Some(ref validator) = self.validator {
254            validator.inject_defaults(&self.labels, resolved_props);
255        }
256
257        // Phase 1: Validate each property value
258        if let Some(ref validator) = self.validator {
259            for (name, value) in resolved_props.iter() {
260                validator.validate_node_property(&self.labels, name, value)?;
261                validator.check_unique_node_property(&self.labels, name, value)?;
262            }
263            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
264            validator.validate_node_complete(&self.labels, resolved_props)?;
265        }
266
267        // Phase 3: Write properties to the store
268        for (name, value) in resolved_props.iter() {
269            self.store.set_node_property(node_id, name, value.clone());
270        }
271        Ok(())
272    }
273}
274
275impl Operator for CreateNodeOperator {
276    fn next(&mut self) -> OperatorResult {
277        // Get transaction context for versioned creation
278        let epoch = self
279            .viewing_epoch
280            .unwrap_or_else(|| self.store.current_epoch());
281        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
282
283        if let Some(ref mut input) = self.input {
284            // For each input row, create a node
285            if let Some(chunk) = input.next()? {
286                let mut builder =
287                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
288
289                for row in chunk.selected_indices() {
290                    // Resolve all property values first (before creating node)
291                    let mut resolved_props: Vec<(String, Value)> = self
292                        .properties
293                        .iter()
294                        .map(|(name, source)| {
295                            let value =
296                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
297                            (name.clone(), value)
298                        })
299                        .collect();
300
301                    // Create the node with MVCC versioning
302                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
303                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
304
305                    // Record write for conflict detection
306                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
307                        tracker.record_node_write(tid, node_id);
308                    }
309
310                    // Validate and set properties
311                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
312
313                    // Copy input columns to output
314                    for col_idx in 0..chunk.column_count() {
315                        if col_idx < self.output_column
316                            && let (Some(src), Some(dst)) =
317                                (chunk.column(col_idx), builder.column_mut(col_idx))
318                        {
319                            if let Some(val) = src.get_value(row) {
320                                dst.push_value(val);
321                            } else {
322                                dst.push_value(Value::Null);
323                            }
324                        }
325                    }
326
327                    // Add the new node ID
328                    if let Some(dst) = builder.column_mut(self.output_column) {
329                        dst.push_value(Value::Int64(node_id.0 as i64));
330                    }
331
332                    builder.advance_row();
333                }
334
335                return Ok(Some(builder.finish()));
336            }
337            Ok(None)
338        } else {
339            // No input - create a single node
340            if self.executed {
341                return Ok(None);
342            }
343            self.executed = true;
344
345            // Resolve constant properties
346            let mut resolved_props: Vec<(String, Value)> = self
347                .properties
348                .iter()
349                .filter_map(|(name, source)| {
350                    if let PropertySource::Constant(value) = source {
351                        Some((name.clone(), value.clone()))
352                    } else {
353                        None
354                    }
355                })
356                .collect();
357
358            // Create the node with MVCC versioning
359            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
360            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
361
362            // Record write for conflict detection
363            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
364                tracker.record_node_write(tid, node_id);
365            }
366
367            // Validate and set properties
368            self.validate_and_set_properties(node_id, &mut resolved_props)?;
369
370            // Build output chunk with just the node ID
371            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
372            if let Some(dst) = builder.column_mut(self.output_column) {
373                dst.push_value(Value::Int64(node_id.0 as i64));
374            }
375            builder.advance_row();
376
377            Ok(Some(builder.finish()))
378        }
379    }
380
381    fn reset(&mut self) {
382        if let Some(ref mut input) = self.input {
383            input.reset();
384        }
385        self.executed = false;
386    }
387
388    fn name(&self) -> &'static str {
389        "CreateNode"
390    }
391}
392
393/// Operator that creates new edges.
394pub struct CreateEdgeOperator {
395    /// The graph store to modify.
396    store: Arc<dyn GraphStoreMut>,
397    /// Input operator.
398    input: Box<dyn Operator>,
399    /// Column index for the source node.
400    from_column: usize,
401    /// Column index for the target node.
402    to_column: usize,
403    /// Edge type.
404    edge_type: String,
405    /// Properties to set.
406    properties: Vec<(String, PropertySource)>,
407    /// Output schema.
408    output_schema: Vec<LogicalType>,
409    /// Column index for the created edge variable (if any).
410    output_column: Option<usize>,
411    /// Epoch for MVCC versioning.
412    viewing_epoch: Option<EpochId>,
413    /// Transaction ID for MVCC versioning.
414    transaction_id: Option<TransactionId>,
415    /// Optional constraint validator for schema enforcement.
416    validator: Option<Arc<dyn ConstraintValidator>>,
417    /// Optional write tracker for conflict detection.
418    write_tracker: Option<SharedWriteTracker>,
419}
420
421impl CreateEdgeOperator {
422    /// Creates a new edge creation operator.
423    ///
424    /// Use builder methods to set additional options:
425    /// - [`with_properties`](Self::with_properties) - set edge properties
426    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
427    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
428    pub fn new(
429        store: Arc<dyn GraphStoreMut>,
430        input: Box<dyn Operator>,
431        from_column: usize,
432        to_column: usize,
433        edge_type: String,
434        output_schema: Vec<LogicalType>,
435    ) -> Self {
436        Self {
437            store,
438            input,
439            from_column,
440            to_column,
441            edge_type,
442            properties: Vec::new(),
443            output_schema,
444            output_column: None,
445            viewing_epoch: None,
446            transaction_id: None,
447            validator: None,
448            write_tracker: None,
449        }
450    }
451
452    /// Sets the properties to assign to created edges.
453    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
454        self.properties = properties;
455        self
456    }
457
458    /// Sets the output column for the created edge ID.
459    pub fn with_output_column(mut self, column: usize) -> Self {
460        self.output_column = Some(column);
461        self
462    }
463
464    /// Sets the transaction context for MVCC versioning.
465    pub fn with_transaction_context(
466        mut self,
467        epoch: EpochId,
468        transaction_id: Option<TransactionId>,
469    ) -> Self {
470        self.viewing_epoch = Some(epoch);
471        self.transaction_id = transaction_id;
472        self
473    }
474
475    /// Sets the constraint validator for schema enforcement.
476    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
477        self.validator = Some(validator);
478        self
479    }
480
481    /// Sets the write tracker for conflict detection.
482    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
483        self.write_tracker = Some(tracker);
484        self
485    }
486}
487
488impl Operator for CreateEdgeOperator {
489    fn next(&mut self) -> OperatorResult {
490        // Get transaction context for versioned creation
491        let epoch = self
492            .viewing_epoch
493            .unwrap_or_else(|| self.store.current_epoch());
494        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
495
496        if let Some(chunk) = self.input.next()? {
497            let mut builder =
498                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
499
500            for row in chunk.selected_indices() {
501                // Get source and target node IDs
502                let from_id = chunk
503                    .column(self.from_column)
504                    .and_then(|c| c.get_value(row))
505                    .ok_or_else(|| {
506                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
507                    })?;
508
509                let to_id = chunk
510                    .column(self.to_column)
511                    .and_then(|c| c.get_value(row))
512                    .ok_or_else(|| {
513                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
514                    })?;
515
516                // Extract node IDs
517                let from_node_id = match from_id {
518                    Value::Int64(id) => NodeId(id as u64),
519                    _ => {
520                        return Err(OperatorError::TypeMismatch {
521                            expected: "Int64 (node ID)".to_string(),
522                            found: format!("{from_id:?}"),
523                        });
524                    }
525                };
526
527                let to_node_id = match to_id {
528                    Value::Int64(id) => NodeId(id as u64),
529                    _ => {
530                        return Err(OperatorError::TypeMismatch {
531                            expected: "Int64 (node ID)".to_string(),
532                            found: format!("{to_id:?}"),
533                        });
534                    }
535                };
536
537                // Validate graph type and edge endpoint constraints
538                if let Some(ref validator) = self.validator {
539                    validator.validate_edge_type_allowed(&self.edge_type)?;
540
541                    // Look up source and target node labels for endpoint validation
542                    let source_labels: Vec<String> = self
543                        .store
544                        .get_node(from_node_id)
545                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
546                        .unwrap_or_default();
547                    let target_labels: Vec<String> = self
548                        .store
549                        .get_node(to_node_id)
550                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
551                        .unwrap_or_default();
552                    validator.validate_edge_endpoints(
553                        &self.edge_type,
554                        &source_labels,
555                        &target_labels,
556                    )?;
557                }
558
559                // Resolve property values
560                let resolved_props: Vec<(String, Value)> = self
561                    .properties
562                    .iter()
563                    .map(|(name, source)| {
564                        let value =
565                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
566                        (name.clone(), value)
567                    })
568                    .collect();
569
570                // Validate constraints before writing
571                if let Some(ref validator) = self.validator {
572                    for (name, value) in &resolved_props {
573                        validator.validate_edge_property(&self.edge_type, name, value)?;
574                    }
575                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
576                }
577
578                // Create the edge with MVCC versioning
579                let edge_id = self.store.create_edge_versioned(
580                    from_node_id,
581                    to_node_id,
582                    &self.edge_type,
583                    epoch,
584                    tx,
585                );
586
587                // Record write for conflict detection
588                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
589                    tracker.record_edge_write(tid, edge_id);
590                }
591
592                // Set properties
593                for (name, value) in resolved_props {
594                    self.store.set_edge_property(edge_id, &name, value);
595                }
596
597                // Copy input columns
598                for col_idx in 0..chunk.column_count() {
599                    if let (Some(src), Some(dst)) =
600                        (chunk.column(col_idx), builder.column_mut(col_idx))
601                    {
602                        if let Some(val) = src.get_value(row) {
603                            dst.push_value(val);
604                        } else {
605                            dst.push_value(Value::Null);
606                        }
607                    }
608                }
609
610                // Add edge ID if requested
611                if let Some(out_col) = self.output_column
612                    && let Some(dst) = builder.column_mut(out_col)
613                {
614                    dst.push_value(Value::Int64(edge_id.0 as i64));
615                }
616
617                builder.advance_row();
618            }
619
620            return Ok(Some(builder.finish()));
621        }
622        Ok(None)
623    }
624
625    fn reset(&mut self) {
626        self.input.reset();
627    }
628
629    fn name(&self) -> &'static str {
630        "CreateEdge"
631    }
632}
633
634/// Operator that deletes nodes.
635pub struct DeleteNodeOperator {
636    /// The graph store to modify.
637    store: Arc<dyn GraphStoreMut>,
638    /// Input operator.
639    input: Box<dyn Operator>,
640    /// Column index for the node to delete.
641    node_column: usize,
642    /// Output schema.
643    output_schema: Vec<LogicalType>,
644    /// Whether to detach (delete connected edges) before deleting.
645    detach: bool,
646    /// Epoch for MVCC versioning.
647    viewing_epoch: Option<EpochId>,
648    /// Transaction ID for MVCC versioning.
649    transaction_id: Option<TransactionId>,
650    /// Optional write tracker for conflict detection.
651    write_tracker: Option<SharedWriteTracker>,
652}
653
654impl DeleteNodeOperator {
655    /// Creates a new node deletion operator.
656    pub fn new(
657        store: Arc<dyn GraphStoreMut>,
658        input: Box<dyn Operator>,
659        node_column: usize,
660        output_schema: Vec<LogicalType>,
661        detach: bool,
662    ) -> Self {
663        Self {
664            store,
665            input,
666            node_column,
667            output_schema,
668            detach,
669            viewing_epoch: None,
670            transaction_id: None,
671            write_tracker: None,
672        }
673    }
674
675    /// Sets the transaction context for MVCC versioning.
676    pub fn with_transaction_context(
677        mut self,
678        epoch: EpochId,
679        transaction_id: Option<TransactionId>,
680    ) -> Self {
681        self.viewing_epoch = Some(epoch);
682        self.transaction_id = transaction_id;
683        self
684    }
685
686    /// Sets the write tracker for conflict detection.
687    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
688        self.write_tracker = Some(tracker);
689        self
690    }
691}
692
693impl Operator for DeleteNodeOperator {
694    fn next(&mut self) -> OperatorResult {
695        // Get transaction context for versioned deletion
696        let epoch = self
697            .viewing_epoch
698            .unwrap_or_else(|| self.store.current_epoch());
699        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
700
701        if let Some(chunk) = self.input.next()? {
702            let mut builder =
703                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
704
705            for row in chunk.selected_indices() {
706                let node_val = chunk
707                    .column(self.node_column)
708                    .and_then(|c| c.get_value(row))
709                    .ok_or_else(|| {
710                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
711                    })?;
712
713                let node_id = match node_val {
714                    Value::Int64(id) => NodeId(id as u64),
715                    _ => {
716                        return Err(OperatorError::TypeMismatch {
717                            expected: "Int64 (node ID)".to_string(),
718                            found: format!("{node_val:?}"),
719                        });
720                    }
721                };
722
723                if self.detach {
724                    // Delete all connected edges first, using versioned deletion
725                    // so rollback can restore them
726                    let outgoing = self
727                        .store
728                        .edges_from(node_id, crate::graph::Direction::Outgoing);
729                    let incoming = self
730                        .store
731                        .edges_from(node_id, crate::graph::Direction::Incoming);
732                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
733                        self.store.delete_edge_versioned(edge_id, epoch, tx);
734                        if let (Some(tracker), Some(tid)) =
735                            (&self.write_tracker, self.transaction_id)
736                        {
737                            tracker.record_edge_write(tid, edge_id);
738                        }
739                    }
740                } else {
741                    // NODETACH: check that node has no connected edges
742                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
743                    if degree > 0 {
744                        return Err(OperatorError::ConstraintViolation(format!(
745                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
746                            degree
747                        )));
748                    }
749                }
750
751                // Delete the node with MVCC versioning
752                self.store.delete_node_versioned(node_id, epoch, tx);
753
754                // Record write for conflict detection
755                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
756                    tracker.record_node_write(tid, node_id);
757                }
758
759                // Pass through all input columns so downstream RETURN can
760                // reference the variable (e.g., count(n) after DELETE n).
761                for col_idx in 0..chunk.column_count() {
762                    if let (Some(src), Some(dst)) =
763                        (chunk.column(col_idx), builder.column_mut(col_idx))
764                    {
765                        if let Some(val) = src.get_value(row) {
766                            dst.push_value(val);
767                        } else {
768                            dst.push_value(Value::Null);
769                        }
770                    }
771                }
772                builder.advance_row();
773            }
774
775            return Ok(Some(builder.finish()));
776        }
777        Ok(None)
778    }
779
780    fn reset(&mut self) {
781        self.input.reset();
782    }
783
784    fn name(&self) -> &'static str {
785        "DeleteNode"
786    }
787}
788
789/// Operator that deletes edges.
790pub struct DeleteEdgeOperator {
791    /// The graph store to modify.
792    store: Arc<dyn GraphStoreMut>,
793    /// Input operator.
794    input: Box<dyn Operator>,
795    /// Column index for the edge to delete.
796    edge_column: usize,
797    /// Output schema.
798    output_schema: Vec<LogicalType>,
799    /// Epoch for MVCC versioning.
800    viewing_epoch: Option<EpochId>,
801    /// Transaction ID for MVCC versioning.
802    transaction_id: Option<TransactionId>,
803    /// Optional write tracker for conflict detection.
804    write_tracker: Option<SharedWriteTracker>,
805}
806
807impl DeleteEdgeOperator {
808    /// Creates a new edge deletion operator.
809    pub fn new(
810        store: Arc<dyn GraphStoreMut>,
811        input: Box<dyn Operator>,
812        edge_column: usize,
813        output_schema: Vec<LogicalType>,
814    ) -> Self {
815        Self {
816            store,
817            input,
818            edge_column,
819            output_schema,
820            viewing_epoch: None,
821            transaction_id: None,
822            write_tracker: None,
823        }
824    }
825
826    /// Sets the transaction context for MVCC versioning.
827    pub fn with_transaction_context(
828        mut self,
829        epoch: EpochId,
830        transaction_id: Option<TransactionId>,
831    ) -> Self {
832        self.viewing_epoch = Some(epoch);
833        self.transaction_id = transaction_id;
834        self
835    }
836
837    /// Sets the write tracker for conflict detection.
838    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
839        self.write_tracker = Some(tracker);
840        self
841    }
842}
843
844impl Operator for DeleteEdgeOperator {
845    fn next(&mut self) -> OperatorResult {
846        // Get transaction context for versioned deletion
847        let epoch = self
848            .viewing_epoch
849            .unwrap_or_else(|| self.store.current_epoch());
850        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
851
852        if let Some(chunk) = self.input.next()? {
853            let mut builder =
854                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
855
856            for row in chunk.selected_indices() {
857                let edge_val = chunk
858                    .column(self.edge_column)
859                    .and_then(|c| c.get_value(row))
860                    .ok_or_else(|| {
861                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
862                    })?;
863
864                let edge_id = match edge_val {
865                    Value::Int64(id) => EdgeId(id as u64),
866                    _ => {
867                        return Err(OperatorError::TypeMismatch {
868                            expected: "Int64 (edge ID)".to_string(),
869                            found: format!("{edge_val:?}"),
870                        });
871                    }
872                };
873
874                // Delete the edge with MVCC versioning
875                self.store.delete_edge_versioned(edge_id, epoch, tx);
876
877                // Record write for conflict detection
878                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
879                    tracker.record_edge_write(tid, edge_id);
880                }
881
882                // Pass through all input columns
883                for col_idx in 0..chunk.column_count() {
884                    if let (Some(src), Some(dst)) =
885                        (chunk.column(col_idx), builder.column_mut(col_idx))
886                    {
887                        if let Some(val) = src.get_value(row) {
888                            dst.push_value(val);
889                        } else {
890                            dst.push_value(Value::Null);
891                        }
892                    }
893                }
894                builder.advance_row();
895            }
896
897            return Ok(Some(builder.finish()));
898        }
899        Ok(None)
900    }
901
902    fn reset(&mut self) {
903        self.input.reset();
904    }
905
906    fn name(&self) -> &'static str {
907        "DeleteEdge"
908    }
909}
910
911/// Operator that adds labels to nodes.
912pub struct AddLabelOperator {
913    /// The graph store.
914    store: Arc<dyn GraphStoreMut>,
915    /// Child operator providing nodes.
916    input: Box<dyn Operator>,
917    /// Column index containing node IDs.
918    node_column: usize,
919    /// Labels to add.
920    labels: Vec<String>,
921    /// Output schema.
922    output_schema: Vec<LogicalType>,
923    /// Epoch for MVCC versioning.
924    viewing_epoch: Option<EpochId>,
925    /// Transaction ID for undo log tracking.
926    transaction_id: Option<TransactionId>,
927    /// Optional write tracker for conflict detection.
928    write_tracker: Option<SharedWriteTracker>,
929}
930
931impl AddLabelOperator {
932    /// Creates a new add label operator.
933    pub fn new(
934        store: Arc<dyn GraphStoreMut>,
935        input: Box<dyn Operator>,
936        node_column: usize,
937        labels: Vec<String>,
938        output_schema: Vec<LogicalType>,
939    ) -> Self {
940        Self {
941            store,
942            input,
943            node_column,
944            labels,
945            output_schema,
946            viewing_epoch: None,
947            transaction_id: None,
948            write_tracker: None,
949        }
950    }
951
952    /// Sets the transaction context for versioned label mutations.
953    pub fn with_transaction_context(
954        mut self,
955        epoch: EpochId,
956        transaction_id: Option<TransactionId>,
957    ) -> Self {
958        self.viewing_epoch = Some(epoch);
959        self.transaction_id = transaction_id;
960        self
961    }
962
963    /// Sets the write tracker for conflict detection.
964    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
965        self.write_tracker = Some(tracker);
966        self
967    }
968}
969
970impl Operator for AddLabelOperator {
971    fn next(&mut self) -> OperatorResult {
972        if let Some(chunk) = self.input.next()? {
973            let mut updated_count = 0;
974
975            for row in chunk.selected_indices() {
976                let node_val = chunk
977                    .column(self.node_column)
978                    .and_then(|c| c.get_value(row))
979                    .ok_or_else(|| {
980                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
981                    })?;
982
983                let node_id = match node_val {
984                    Value::Int64(id) => NodeId(id as u64),
985                    _ => {
986                        return Err(OperatorError::TypeMismatch {
987                            expected: "Int64 (node ID)".to_string(),
988                            found: format!("{node_val:?}"),
989                        });
990                    }
991                };
992
993                // Record write for conflict detection
994                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
995                    tracker.record_node_write(tid, node_id);
996                }
997
998                // Add all labels
999                for label in &self.labels {
1000                    let added = if let Some(tid) = self.transaction_id {
1001                        self.store.add_label_versioned(node_id, label, tid)
1002                    } else {
1003                        self.store.add_label(node_id, label)
1004                    };
1005                    if added {
1006                        updated_count += 1;
1007                    }
1008                }
1009            }
1010
1011            // Return a chunk with the update count
1012            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1013            if let Some(dst) = builder.column_mut(0) {
1014                dst.push_value(Value::Int64(updated_count));
1015            }
1016            builder.advance_row();
1017
1018            return Ok(Some(builder.finish()));
1019        }
1020        Ok(None)
1021    }
1022
1023    fn reset(&mut self) {
1024        self.input.reset();
1025    }
1026
1027    fn name(&self) -> &'static str {
1028        "AddLabel"
1029    }
1030}
1031
1032/// Operator that removes labels from nodes.
1033pub struct RemoveLabelOperator {
1034    /// The graph store.
1035    store: Arc<dyn GraphStoreMut>,
1036    /// Child operator providing nodes.
1037    input: Box<dyn Operator>,
1038    /// Column index containing node IDs.
1039    node_column: usize,
1040    /// Labels to remove.
1041    labels: Vec<String>,
1042    /// Output schema.
1043    output_schema: Vec<LogicalType>,
1044    /// Epoch for MVCC versioning.
1045    viewing_epoch: Option<EpochId>,
1046    /// Transaction ID for undo log tracking.
1047    transaction_id: Option<TransactionId>,
1048    /// Optional write tracker for conflict detection.
1049    write_tracker: Option<SharedWriteTracker>,
1050}
1051
1052impl RemoveLabelOperator {
1053    /// Creates a new remove label operator.
1054    pub fn new(
1055        store: Arc<dyn GraphStoreMut>,
1056        input: Box<dyn Operator>,
1057        node_column: usize,
1058        labels: Vec<String>,
1059        output_schema: Vec<LogicalType>,
1060    ) -> Self {
1061        Self {
1062            store,
1063            input,
1064            node_column,
1065            labels,
1066            output_schema,
1067            viewing_epoch: None,
1068            transaction_id: None,
1069            write_tracker: None,
1070        }
1071    }
1072
1073    /// Sets the transaction context for versioned label mutations.
1074    pub fn with_transaction_context(
1075        mut self,
1076        epoch: EpochId,
1077        transaction_id: Option<TransactionId>,
1078    ) -> Self {
1079        self.viewing_epoch = Some(epoch);
1080        self.transaction_id = transaction_id;
1081        self
1082    }
1083
1084    /// Sets the write tracker for conflict detection.
1085    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1086        self.write_tracker = Some(tracker);
1087        self
1088    }
1089}
1090
1091impl Operator for RemoveLabelOperator {
1092    fn next(&mut self) -> OperatorResult {
1093        if let Some(chunk) = self.input.next()? {
1094            let mut updated_count = 0;
1095
1096            for row in chunk.selected_indices() {
1097                let node_val = chunk
1098                    .column(self.node_column)
1099                    .and_then(|c| c.get_value(row))
1100                    .ok_or_else(|| {
1101                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1102                    })?;
1103
1104                let node_id = match node_val {
1105                    Value::Int64(id) => NodeId(id as u64),
1106                    _ => {
1107                        return Err(OperatorError::TypeMismatch {
1108                            expected: "Int64 (node ID)".to_string(),
1109                            found: format!("{node_val:?}"),
1110                        });
1111                    }
1112                };
1113
1114                // Record write for conflict detection
1115                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1116                    tracker.record_node_write(tid, node_id);
1117                }
1118
1119                // Remove all labels
1120                for label in &self.labels {
1121                    let removed = if let Some(tid) = self.transaction_id {
1122                        self.store.remove_label_versioned(node_id, label, tid)
1123                    } else {
1124                        self.store.remove_label(node_id, label)
1125                    };
1126                    if removed {
1127                        updated_count += 1;
1128                    }
1129                }
1130            }
1131
1132            // Return a chunk with the update count
1133            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1134            if let Some(dst) = builder.column_mut(0) {
1135                dst.push_value(Value::Int64(updated_count));
1136            }
1137            builder.advance_row();
1138
1139            return Ok(Some(builder.finish()));
1140        }
1141        Ok(None)
1142    }
1143
1144    fn reset(&mut self) {
1145        self.input.reset();
1146    }
1147
1148    fn name(&self) -> &'static str {
1149        "RemoveLabel"
1150    }
1151}
1152
1153/// Operator that sets properties on nodes or edges.
1154///
1155/// This operator reads node/edge IDs from a column and sets the
1156/// specified properties on each entity.
1157pub struct SetPropertyOperator {
1158    /// The graph store.
1159    store: Arc<dyn GraphStoreMut>,
1160    /// Child operator providing entities.
1161    input: Box<dyn Operator>,
1162    /// Column index containing entity IDs (node or edge).
1163    entity_column: usize,
1164    /// Whether the entity is an edge (false = node).
1165    is_edge: bool,
1166    /// Properties to set (name -> source).
1167    properties: Vec<(String, PropertySource)>,
1168    /// Output schema.
1169    output_schema: Vec<LogicalType>,
1170    /// Whether to replace all properties (true) or merge (false) for map assignments.
1171    replace: bool,
1172    /// Optional constraint validator for schema enforcement.
1173    validator: Option<Arc<dyn ConstraintValidator>>,
1174    /// Entity labels (for node constraint validation).
1175    labels: Vec<String>,
1176    /// Edge type (for edge constraint validation).
1177    edge_type_name: Option<String>,
1178    /// Epoch for MVCC versioning.
1179    viewing_epoch: Option<EpochId>,
1180    /// Transaction ID for undo log tracking.
1181    transaction_id: Option<TransactionId>,
1182    /// Optional write tracker for conflict detection.
1183    write_tracker: Option<SharedWriteTracker>,
1184}
1185
1186impl SetPropertyOperator {
1187    /// Creates a new set property operator for nodes.
1188    pub fn new_for_node(
1189        store: Arc<dyn GraphStoreMut>,
1190        input: Box<dyn Operator>,
1191        node_column: usize,
1192        properties: Vec<(String, PropertySource)>,
1193        output_schema: Vec<LogicalType>,
1194    ) -> Self {
1195        Self {
1196            store,
1197            input,
1198            entity_column: node_column,
1199            is_edge: false,
1200            properties,
1201            output_schema,
1202            replace: false,
1203            validator: None,
1204            labels: Vec::new(),
1205            edge_type_name: None,
1206            viewing_epoch: None,
1207            transaction_id: None,
1208            write_tracker: None,
1209        }
1210    }
1211
1212    /// Creates a new set property operator for edges.
1213    pub fn new_for_edge(
1214        store: Arc<dyn GraphStoreMut>,
1215        input: Box<dyn Operator>,
1216        edge_column: usize,
1217        properties: Vec<(String, PropertySource)>,
1218        output_schema: Vec<LogicalType>,
1219    ) -> Self {
1220        Self {
1221            store,
1222            input,
1223            entity_column: edge_column,
1224            is_edge: true,
1225            properties,
1226            output_schema,
1227            replace: false,
1228            validator: None,
1229            labels: Vec::new(),
1230            edge_type_name: None,
1231            viewing_epoch: None,
1232            transaction_id: None,
1233            write_tracker: None,
1234        }
1235    }
1236
1237    /// Sets whether this operator replaces all properties (for map assignment).
1238    pub fn with_replace(mut self, replace: bool) -> Self {
1239        self.replace = replace;
1240        self
1241    }
1242
1243    /// Sets the constraint validator for schema enforcement.
1244    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1245        self.validator = Some(validator);
1246        self
1247    }
1248
1249    /// Sets the entity labels (for node constraint validation).
1250    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1251        self.labels = labels;
1252        self
1253    }
1254
1255    /// Sets the edge type name (for edge constraint validation).
1256    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1257        self.edge_type_name = Some(edge_type);
1258        self
1259    }
1260
1261    /// Sets the transaction context for versioned property mutations.
1262    ///
1263    /// When a transaction ID is provided, property changes are recorded in
1264    /// an undo log so they can be restored on rollback.
1265    pub fn with_transaction_context(
1266        mut self,
1267        epoch: EpochId,
1268        transaction_id: Option<TransactionId>,
1269    ) -> Self {
1270        self.viewing_epoch = Some(epoch);
1271        self.transaction_id = transaction_id;
1272        self
1273    }
1274
1275    /// Sets the write tracker for conflict detection.
1276    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1277        self.write_tracker = Some(tracker);
1278        self
1279    }
1280}
1281
1282impl Operator for SetPropertyOperator {
1283    fn next(&mut self) -> OperatorResult {
1284        if let Some(chunk) = self.input.next()? {
1285            let mut builder =
1286                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1287
1288            for row in chunk.selected_indices() {
1289                let entity_val = chunk
1290                    .column(self.entity_column)
1291                    .and_then(|c| c.get_value(row))
1292                    .ok_or_else(|| {
1293                        OperatorError::ColumnNotFound(format!(
1294                            "entity column {}",
1295                            self.entity_column
1296                        ))
1297                    })?;
1298
1299                let entity_id = match entity_val {
1300                    Value::Int64(id) => id as u64,
1301                    _ => {
1302                        return Err(OperatorError::TypeMismatch {
1303                            expected: "Int64 (entity ID)".to_string(),
1304                            found: format!("{entity_val:?}"),
1305                        });
1306                    }
1307                };
1308
1309                // Record write for conflict detection
1310                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1311                    if self.is_edge {
1312                        tracker.record_edge_write(tid, EdgeId(entity_id));
1313                    } else {
1314                        tracker.record_node_write(tid, NodeId(entity_id));
1315                    }
1316                }
1317
1318                // Resolve all property values
1319                let resolved_props: Vec<(String, Value)> = self
1320                    .properties
1321                    .iter()
1322                    .map(|(name, source)| {
1323                        let value =
1324                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1325                        (name.clone(), value)
1326                    })
1327                    .collect();
1328
1329                // Validate constraints before writing
1330                if let Some(ref validator) = self.validator {
1331                    if self.is_edge {
1332                        if let Some(ref et) = self.edge_type_name {
1333                            for (name, value) in &resolved_props {
1334                                validator.validate_edge_property(et, name, value)?;
1335                            }
1336                        }
1337                    } else {
1338                        for (name, value) in &resolved_props {
1339                            validator.validate_node_property(&self.labels, name, value)?;
1340                            validator.check_unique_node_property(&self.labels, name, value)?;
1341                        }
1342                    }
1343                }
1344
1345                // Write all properties (use versioned methods when inside a transaction)
1346                let tx_id = self.transaction_id;
1347                for (prop_name, value) in resolved_props {
1348                    if prop_name == "*" {
1349                        // Map assignment: value should be a Map
1350                        if let Value::Map(map) = value {
1351                            if self.replace {
1352                                // Replace: remove all existing properties first
1353                                if self.is_edge {
1354                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1355                                        let keys: Vec<String> = edge
1356                                            .properties
1357                                            .iter()
1358                                            .map(|(k, _)| k.as_str().to_string())
1359                                            .collect();
1360                                        for key in keys {
1361                                            if let Some(tid) = tx_id {
1362                                                self.store.remove_edge_property_versioned(
1363                                                    EdgeId(entity_id),
1364                                                    &key,
1365                                                    tid,
1366                                                );
1367                                            } else {
1368                                                self.store
1369                                                    .remove_edge_property(EdgeId(entity_id), &key);
1370                                            }
1371                                        }
1372                                    }
1373                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1374                                    let keys: Vec<String> = node
1375                                        .properties
1376                                        .iter()
1377                                        .map(|(k, _)| k.as_str().to_string())
1378                                        .collect();
1379                                    for key in keys {
1380                                        if let Some(tid) = tx_id {
1381                                            self.store.remove_node_property_versioned(
1382                                                NodeId(entity_id),
1383                                                &key,
1384                                                tid,
1385                                            );
1386                                        } else {
1387                                            self.store
1388                                                .remove_node_property(NodeId(entity_id), &key);
1389                                        }
1390                                    }
1391                                }
1392                            }
1393                            // Set each map entry
1394                            for (key, val) in map.iter() {
1395                                if self.is_edge {
1396                                    if let Some(tid) = tx_id {
1397                                        self.store.set_edge_property_versioned(
1398                                            EdgeId(entity_id),
1399                                            key.as_str(),
1400                                            val.clone(),
1401                                            tid,
1402                                        );
1403                                    } else {
1404                                        self.store.set_edge_property(
1405                                            EdgeId(entity_id),
1406                                            key.as_str(),
1407                                            val.clone(),
1408                                        );
1409                                    }
1410                                } else if let Some(tid) = tx_id {
1411                                    self.store.set_node_property_versioned(
1412                                        NodeId(entity_id),
1413                                        key.as_str(),
1414                                        val.clone(),
1415                                        tid,
1416                                    );
1417                                } else {
1418                                    self.store.set_node_property(
1419                                        NodeId(entity_id),
1420                                        key.as_str(),
1421                                        val.clone(),
1422                                    );
1423                                }
1424                            }
1425                        }
1426                    } else if self.is_edge {
1427                        if let Some(tid) = tx_id {
1428                            self.store.set_edge_property_versioned(
1429                                EdgeId(entity_id),
1430                                &prop_name,
1431                                value,
1432                                tid,
1433                            );
1434                        } else {
1435                            self.store
1436                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1437                        }
1438                    } else if let Some(tid) = tx_id {
1439                        self.store.set_node_property_versioned(
1440                            NodeId(entity_id),
1441                            &prop_name,
1442                            value,
1443                            tid,
1444                        );
1445                    } else {
1446                        self.store
1447                            .set_node_property(NodeId(entity_id), &prop_name, value);
1448                    }
1449                }
1450
1451                // Copy input columns to output
1452                for col_idx in 0..chunk.column_count() {
1453                    if let (Some(src), Some(dst)) =
1454                        (chunk.column(col_idx), builder.column_mut(col_idx))
1455                    {
1456                        if let Some(val) = src.get_value(row) {
1457                            dst.push_value(val);
1458                        } else {
1459                            dst.push_value(Value::Null);
1460                        }
1461                    }
1462                }
1463
1464                builder.advance_row();
1465            }
1466
1467            return Ok(Some(builder.finish()));
1468        }
1469        Ok(None)
1470    }
1471
1472    fn reset(&mut self) {
1473        self.input.reset();
1474    }
1475
1476    fn name(&self) -> &'static str {
1477        "SetProperty"
1478    }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483    use super::*;
1484    use crate::execution::DataChunk;
1485    use crate::execution::chunk::DataChunkBuilder;
1486    use crate::graph::lpg::LpgStore;
1487
1488    // ── Helpers ────────────────────────────────────────────────────
1489
1490    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1491        Arc::new(LpgStore::new().unwrap())
1492    }
1493
1494    struct MockInput {
1495        chunk: Option<DataChunk>,
1496    }
1497
1498    impl MockInput {
1499        fn boxed(chunk: DataChunk) -> Box<Self> {
1500            Box::new(Self { chunk: Some(chunk) })
1501        }
1502    }
1503
1504    impl Operator for MockInput {
1505        fn next(&mut self) -> OperatorResult {
1506            Ok(self.chunk.take())
1507        }
1508        fn reset(&mut self) {}
1509        fn name(&self) -> &'static str {
1510            "MockInput"
1511        }
1512    }
1513
1514    struct EmptyInput;
1515    impl Operator for EmptyInput {
1516        fn next(&mut self) -> OperatorResult {
1517            Ok(None)
1518        }
1519        fn reset(&mut self) {}
1520        fn name(&self) -> &'static str {
1521            "EmptyInput"
1522        }
1523    }
1524
1525    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1526        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1527        for id in ids {
1528            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1529            builder.advance_row();
1530        }
1531        builder.finish()
1532    }
1533
1534    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1535        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1536        for id in ids {
1537            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1538            builder.advance_row();
1539        }
1540        builder.finish()
1541    }
1542
1543    // ── CreateNodeOperator ──────────────────────────────────────
1544
1545    #[test]
1546    fn test_create_node_standalone() {
1547        let store = create_test_store();
1548
1549        let mut op = CreateNodeOperator::new(
1550            Arc::clone(&store),
1551            None,
1552            vec!["Person".to_string()],
1553            vec![(
1554                "name".to_string(),
1555                PropertySource::Constant(Value::String("Alix".into())),
1556            )],
1557            vec![LogicalType::Int64],
1558            0,
1559        );
1560
1561        let chunk = op.next().unwrap().unwrap();
1562        assert_eq!(chunk.row_count(), 1);
1563
1564        // Second call should return None (standalone executes once)
1565        assert!(op.next().unwrap().is_none());
1566
1567        assert_eq!(store.node_count(), 1);
1568    }
1569
1570    #[test]
1571    fn test_create_edge() {
1572        let store = create_test_store();
1573
1574        let node1 = store.create_node(&["Person"]);
1575        let node2 = store.create_node(&["Person"]);
1576
1577        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1578        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1579        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1580        builder.advance_row();
1581
1582        let mut op = CreateEdgeOperator::new(
1583            Arc::clone(&store),
1584            MockInput::boxed(builder.finish()),
1585            0,
1586            1,
1587            "KNOWS".to_string(),
1588            vec![LogicalType::Int64, LogicalType::Int64],
1589        );
1590
1591        let _chunk = op.next().unwrap().unwrap();
1592        assert_eq!(store.edge_count(), 1);
1593    }
1594
1595    #[test]
1596    fn test_delete_node() {
1597        let store = create_test_store();
1598
1599        let node_id = store.create_node(&["Person"]);
1600        assert_eq!(store.node_count(), 1);
1601
1602        let mut op = DeleteNodeOperator::new(
1603            Arc::clone(&store),
1604            MockInput::boxed(node_id_chunk(&[node_id])),
1605            0,
1606            vec![LogicalType::Node],
1607            false,
1608        );
1609
1610        let chunk = op.next().unwrap().unwrap();
1611        // Pass-through: output row contains the original node ID
1612        assert_eq!(chunk.row_count(), 1);
1613        assert_eq!(store.node_count(), 0);
1614    }
1615
1616    // ── DeleteEdgeOperator ───────────────────────────────────────
1617
1618    #[test]
1619    fn test_delete_edge() {
1620        let store = create_test_store();
1621
1622        let n1 = store.create_node(&["Person"]);
1623        let n2 = store.create_node(&["Person"]);
1624        let eid = store.create_edge(n1, n2, "KNOWS");
1625        assert_eq!(store.edge_count(), 1);
1626
1627        let mut op = DeleteEdgeOperator::new(
1628            Arc::clone(&store),
1629            MockInput::boxed(edge_id_chunk(&[eid])),
1630            0,
1631            vec![LogicalType::Node],
1632        );
1633
1634        let chunk = op.next().unwrap().unwrap();
1635        assert_eq!(chunk.row_count(), 1);
1636        assert_eq!(store.edge_count(), 0);
1637    }
1638
1639    #[test]
1640    fn test_delete_edge_no_input_returns_none() {
1641        let store = create_test_store();
1642
1643        let mut op = DeleteEdgeOperator::new(
1644            Arc::clone(&store),
1645            Box::new(EmptyInput),
1646            0,
1647            vec![LogicalType::Int64],
1648        );
1649
1650        assert!(op.next().unwrap().is_none());
1651    }
1652
1653    #[test]
1654    fn test_delete_multiple_edges() {
1655        let store = create_test_store();
1656
1657        let n1 = store.create_node(&["N"]);
1658        let n2 = store.create_node(&["N"]);
1659        let e1 = store.create_edge(n1, n2, "R");
1660        let e2 = store.create_edge(n2, n1, "S");
1661        assert_eq!(store.edge_count(), 2);
1662
1663        let mut op = DeleteEdgeOperator::new(
1664            Arc::clone(&store),
1665            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1666            0,
1667            vec![LogicalType::Node],
1668        );
1669
1670        let chunk = op.next().unwrap().unwrap();
1671        assert_eq!(chunk.row_count(), 2);
1672        assert_eq!(store.edge_count(), 0);
1673    }
1674
1675    // ── DeleteNodeOperator with DETACH ───────────────────────────
1676
1677    #[test]
1678    fn test_delete_node_detach() {
1679        let store = create_test_store();
1680
1681        let n1 = store.create_node(&["Person"]);
1682        let n2 = store.create_node(&["Person"]);
1683        store.create_edge(n1, n2, "KNOWS");
1684        store.create_edge(n2, n1, "FOLLOWS");
1685        assert_eq!(store.edge_count(), 2);
1686
1687        let mut op = DeleteNodeOperator::new(
1688            Arc::clone(&store),
1689            MockInput::boxed(node_id_chunk(&[n1])),
1690            0,
1691            vec![LogicalType::Node],
1692            true, // detach = true
1693        );
1694
1695        let chunk = op.next().unwrap().unwrap();
1696        assert_eq!(chunk.row_count(), 1);
1697        assert_eq!(store.node_count(), 1);
1698        assert_eq!(store.edge_count(), 0); // edges detached
1699    }
1700
1701    // ── AddLabelOperator ─────────────────────────────────────────
1702
1703    #[test]
1704    fn test_add_label() {
1705        let store = create_test_store();
1706
1707        let node = store.create_node(&["Person"]);
1708
1709        let mut op = AddLabelOperator::new(
1710            Arc::clone(&store),
1711            MockInput::boxed(node_id_chunk(&[node])),
1712            0,
1713            vec!["Employee".to_string()],
1714            vec![LogicalType::Int64],
1715        );
1716
1717        let chunk = op.next().unwrap().unwrap();
1718        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1719        assert_eq!(updated, 1);
1720
1721        // Verify label was added
1722        let node_data = store.get_node(node).unwrap();
1723        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1724        assert!(labels.contains(&"Person"));
1725        assert!(labels.contains(&"Employee"));
1726    }
1727
1728    #[test]
1729    fn test_add_multiple_labels() {
1730        let store = create_test_store();
1731
1732        let node = store.create_node(&["Base"]);
1733
1734        let mut op = AddLabelOperator::new(
1735            Arc::clone(&store),
1736            MockInput::boxed(node_id_chunk(&[node])),
1737            0,
1738            vec!["LabelA".to_string(), "LabelB".to_string()],
1739            vec![LogicalType::Int64],
1740        );
1741
1742        let chunk = op.next().unwrap().unwrap();
1743        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1744        assert_eq!(updated, 2); // 2 labels added
1745
1746        let node_data = store.get_node(node).unwrap();
1747        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1748        assert!(labels.contains(&"LabelA"));
1749        assert!(labels.contains(&"LabelB"));
1750    }
1751
1752    #[test]
1753    fn test_add_label_no_input_returns_none() {
1754        let store = create_test_store();
1755
1756        let mut op = AddLabelOperator::new(
1757            Arc::clone(&store),
1758            Box::new(EmptyInput),
1759            0,
1760            vec!["Foo".to_string()],
1761            vec![LogicalType::Int64],
1762        );
1763
1764        assert!(op.next().unwrap().is_none());
1765    }
1766
1767    // ── RemoveLabelOperator ──────────────────────────────────────
1768
1769    #[test]
1770    fn test_remove_label() {
1771        let store = create_test_store();
1772
1773        let node = store.create_node(&["Person", "Employee"]);
1774
1775        let mut op = RemoveLabelOperator::new(
1776            Arc::clone(&store),
1777            MockInput::boxed(node_id_chunk(&[node])),
1778            0,
1779            vec!["Employee".to_string()],
1780            vec![LogicalType::Int64],
1781        );
1782
1783        let chunk = op.next().unwrap().unwrap();
1784        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1785        assert_eq!(updated, 1);
1786
1787        // Verify label was removed
1788        let node_data = store.get_node(node).unwrap();
1789        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1790        assert!(labels.contains(&"Person"));
1791        assert!(!labels.contains(&"Employee"));
1792    }
1793
1794    #[test]
1795    fn test_remove_nonexistent_label() {
1796        let store = create_test_store();
1797
1798        let node = store.create_node(&["Person"]);
1799
1800        let mut op = RemoveLabelOperator::new(
1801            Arc::clone(&store),
1802            MockInput::boxed(node_id_chunk(&[node])),
1803            0,
1804            vec!["NonExistent".to_string()],
1805            vec![LogicalType::Int64],
1806        );
1807
1808        let chunk = op.next().unwrap().unwrap();
1809        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1810        assert_eq!(updated, 0); // nothing removed
1811    }
1812
1813    // ── SetPropertyOperator ──────────────────────────────────────
1814
1815    #[test]
1816    fn test_set_node_property_constant() {
1817        let store = create_test_store();
1818
1819        let node = store.create_node(&["Person"]);
1820
1821        let mut op = SetPropertyOperator::new_for_node(
1822            Arc::clone(&store),
1823            MockInput::boxed(node_id_chunk(&[node])),
1824            0,
1825            vec![(
1826                "name".to_string(),
1827                PropertySource::Constant(Value::String("Alix".into())),
1828            )],
1829            vec![LogicalType::Int64],
1830        );
1831
1832        let chunk = op.next().unwrap().unwrap();
1833        assert_eq!(chunk.row_count(), 1);
1834
1835        // Verify property was set
1836        let node_data = store.get_node(node).unwrap();
1837        assert_eq!(
1838            node_data
1839                .properties
1840                .get(&grafeo_common::types::PropertyKey::new("name")),
1841            Some(&Value::String("Alix".into()))
1842        );
1843    }
1844
1845    #[test]
1846    fn test_set_node_property_from_column() {
1847        let store = create_test_store();
1848
1849        let node = store.create_node(&["Person"]);
1850
1851        // Input: column 0 = node ID, column 1 = property value
1852        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1853        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1854        builder
1855            .column_mut(1)
1856            .unwrap()
1857            .push_value(Value::String("Gus".into()));
1858        builder.advance_row();
1859
1860        let mut op = SetPropertyOperator::new_for_node(
1861            Arc::clone(&store),
1862            MockInput::boxed(builder.finish()),
1863            0,
1864            vec![("name".to_string(), PropertySource::Column(1))],
1865            vec![LogicalType::Int64, LogicalType::String],
1866        );
1867
1868        let chunk = op.next().unwrap().unwrap();
1869        assert_eq!(chunk.row_count(), 1);
1870
1871        let node_data = store.get_node(node).unwrap();
1872        assert_eq!(
1873            node_data
1874                .properties
1875                .get(&grafeo_common::types::PropertyKey::new("name")),
1876            Some(&Value::String("Gus".into()))
1877        );
1878    }
1879
1880    #[test]
1881    fn test_set_edge_property() {
1882        let store = create_test_store();
1883
1884        let n1 = store.create_node(&["N"]);
1885        let n2 = store.create_node(&["N"]);
1886        let eid = store.create_edge(n1, n2, "KNOWS");
1887
1888        let mut op = SetPropertyOperator::new_for_edge(
1889            Arc::clone(&store),
1890            MockInput::boxed(edge_id_chunk(&[eid])),
1891            0,
1892            vec![(
1893                "weight".to_string(),
1894                PropertySource::Constant(Value::Float64(0.75)),
1895            )],
1896            vec![LogicalType::Int64],
1897        );
1898
1899        let chunk = op.next().unwrap().unwrap();
1900        assert_eq!(chunk.row_count(), 1);
1901
1902        let edge_data = store.get_edge(eid).unwrap();
1903        assert_eq!(
1904            edge_data
1905                .properties
1906                .get(&grafeo_common::types::PropertyKey::new("weight")),
1907            Some(&Value::Float64(0.75))
1908        );
1909    }
1910
1911    #[test]
1912    fn test_set_multiple_properties() {
1913        let store = create_test_store();
1914
1915        let node = store.create_node(&["Person"]);
1916
1917        let mut op = SetPropertyOperator::new_for_node(
1918            Arc::clone(&store),
1919            MockInput::boxed(node_id_chunk(&[node])),
1920            0,
1921            vec![
1922                (
1923                    "name".to_string(),
1924                    PropertySource::Constant(Value::String("Alix".into())),
1925                ),
1926                (
1927                    "age".to_string(),
1928                    PropertySource::Constant(Value::Int64(30)),
1929                ),
1930            ],
1931            vec![LogicalType::Int64],
1932        );
1933
1934        op.next().unwrap().unwrap();
1935
1936        let node_data = store.get_node(node).unwrap();
1937        assert_eq!(
1938            node_data
1939                .properties
1940                .get(&grafeo_common::types::PropertyKey::new("name")),
1941            Some(&Value::String("Alix".into()))
1942        );
1943        assert_eq!(
1944            node_data
1945                .properties
1946                .get(&grafeo_common::types::PropertyKey::new("age")),
1947            Some(&Value::Int64(30))
1948        );
1949    }
1950
1951    #[test]
1952    fn test_set_property_no_input_returns_none() {
1953        let store = create_test_store();
1954
1955        let mut op = SetPropertyOperator::new_for_node(
1956            Arc::clone(&store),
1957            Box::new(EmptyInput),
1958            0,
1959            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1960            vec![LogicalType::Int64],
1961        );
1962
1963        assert!(op.next().unwrap().is_none());
1964    }
1965
1966    // ── Error paths ──────────────────────────────────────────────
1967
1968    #[test]
1969    fn test_delete_node_without_detach_errors_when_edges_exist() {
1970        let store = create_test_store();
1971
1972        let n1 = store.create_node(&["Person"]);
1973        let n2 = store.create_node(&["Person"]);
1974        store.create_edge(n1, n2, "KNOWS");
1975
1976        let mut op = DeleteNodeOperator::new(
1977            Arc::clone(&store),
1978            MockInput::boxed(node_id_chunk(&[n1])),
1979            0,
1980            vec![LogicalType::Int64],
1981            false, // no detach
1982        );
1983
1984        let err = op.next().unwrap_err();
1985        match err {
1986            OperatorError::ConstraintViolation(msg) => {
1987                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1988            }
1989            other => panic!("expected ConstraintViolation, got {other:?}"),
1990        }
1991        // Node should still exist
1992        assert_eq!(store.node_count(), 2);
1993    }
1994
1995    // ── CreateNodeOperator with input ───────────────────────────
1996
1997    #[test]
1998    fn test_create_node_with_input_operator() {
1999        let store = create_test_store();
2000
2001        // Seed node to provide input rows
2002        let existing = store.create_node(&["Seed"]);
2003
2004        let mut op = CreateNodeOperator::new(
2005            Arc::clone(&store),
2006            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2007            vec!["Created".to_string()],
2008            vec![(
2009                "source".to_string(),
2010                PropertySource::Constant(Value::String("from_input".into())),
2011            )],
2012            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2013            1,                                            // output column for new node ID
2014        );
2015
2016        let chunk = op.next().unwrap().unwrap();
2017        assert_eq!(chunk.row_count(), 1);
2018
2019        // Should have created one new node (2 total: Seed + Created)
2020        assert_eq!(store.node_count(), 2);
2021
2022        // Exhausted
2023        assert!(op.next().unwrap().is_none());
2024    }
2025
2026    // ── CreateEdgeOperator with properties and output column ────
2027
2028    #[test]
2029    fn test_create_edge_with_properties_and_output_column() {
2030        let store = create_test_store();
2031
2032        let n1 = store.create_node(&["Person"]);
2033        let n2 = store.create_node(&["Person"]);
2034
2035        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2036        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2037        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2038        builder.advance_row();
2039
2040        let mut op = CreateEdgeOperator::new(
2041            Arc::clone(&store),
2042            MockInput::boxed(builder.finish()),
2043            0,
2044            1,
2045            "KNOWS".to_string(),
2046            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2047        )
2048        .with_properties(vec![(
2049            "since".to_string(),
2050            PropertySource::Constant(Value::Int64(2024)),
2051        )])
2052        .with_output_column(2);
2053
2054        let chunk = op.next().unwrap().unwrap();
2055        assert_eq!(chunk.row_count(), 1);
2056        assert_eq!(store.edge_count(), 1);
2057
2058        // Verify the output chunk contains the edge ID in column 2
2059        let edge_id_raw = chunk
2060            .column(2)
2061            .and_then(|c| c.get_int64(0))
2062            .expect("edge ID should be in output column 2");
2063        let edge_id = EdgeId(edge_id_raw as u64);
2064
2065        // Verify the edge has the property
2066        let edge = store.get_edge(edge_id).expect("edge should exist");
2067        assert_eq!(
2068            edge.properties
2069                .get(&grafeo_common::types::PropertyKey::new("since")),
2070            Some(&Value::Int64(2024))
2071        );
2072    }
2073
2074    // ── SetPropertyOperator with map replacement ────────────────
2075
2076    #[test]
2077    fn test_set_property_map_replace() {
2078        use std::collections::BTreeMap;
2079
2080        let store = create_test_store();
2081
2082        let node = store.create_node(&["Person"]);
2083        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2084
2085        let mut map = BTreeMap::new();
2086        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2087
2088        let mut op = SetPropertyOperator::new_for_node(
2089            Arc::clone(&store),
2090            MockInput::boxed(node_id_chunk(&[node])),
2091            0,
2092            vec![(
2093                "*".to_string(),
2094                PropertySource::Constant(Value::Map(Arc::new(map))),
2095            )],
2096            vec![LogicalType::Int64],
2097        )
2098        .with_replace(true);
2099
2100        op.next().unwrap().unwrap();
2101
2102        let node_data = store.get_node(node).unwrap();
2103        // Old property should be gone
2104        assert!(
2105            node_data
2106                .properties
2107                .get(&PropertyKey::new("old_prop"))
2108                .is_none()
2109        );
2110        // New property should exist
2111        assert_eq!(
2112            node_data.properties.get(&PropertyKey::new("new_key")),
2113            Some(&Value::String("new_val".into()))
2114        );
2115    }
2116
2117    // ── SetPropertyOperator with map merge (no replace) ─────────
2118
2119    #[test]
2120    fn test_set_property_map_merge() {
2121        use std::collections::BTreeMap;
2122
2123        let store = create_test_store();
2124
2125        let node = store.create_node(&["Person"]);
2126        store.set_node_property(node, "existing", Value::Int64(42));
2127
2128        let mut map = BTreeMap::new();
2129        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2130
2131        let mut op = SetPropertyOperator::new_for_node(
2132            Arc::clone(&store),
2133            MockInput::boxed(node_id_chunk(&[node])),
2134            0,
2135            vec![(
2136                "*".to_string(),
2137                PropertySource::Constant(Value::Map(Arc::new(map))),
2138            )],
2139            vec![LogicalType::Int64],
2140        ); // replace defaults to false
2141
2142        op.next().unwrap().unwrap();
2143
2144        let node_data = store.get_node(node).unwrap();
2145        // Existing property should still be there
2146        assert_eq!(
2147            node_data.properties.get(&PropertyKey::new("existing")),
2148            Some(&Value::Int64(42))
2149        );
2150        // New property should also exist
2151        assert_eq!(
2152            node_data.properties.get(&PropertyKey::new("added")),
2153            Some(&Value::String("hello".into()))
2154        );
2155    }
2156
2157    // ── PropertySource::PropertyAccess ──────────────────────────
2158
2159    #[test]
2160    fn test_property_source_property_access() {
2161        let store = create_test_store();
2162
2163        let source_node = store.create_node(&["Source"]);
2164        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2165
2166        let target_node = store.create_node(&["Target"]);
2167
2168        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2169        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2170        builder.column_mut(0).unwrap().push_node_id(source_node);
2171        builder
2172            .column_mut(1)
2173            .unwrap()
2174            .push_int64(target_node.0 as i64);
2175        builder.advance_row();
2176
2177        let mut op = SetPropertyOperator::new_for_node(
2178            Arc::clone(&store),
2179            MockInput::boxed(builder.finish()),
2180            1, // entity column = target node
2181            vec![(
2182                "copied_name".to_string(),
2183                PropertySource::PropertyAccess {
2184                    column: 0,
2185                    property: "name".to_string(),
2186                },
2187            )],
2188            vec![LogicalType::Node, LogicalType::Int64],
2189        );
2190
2191        op.next().unwrap().unwrap();
2192
2193        let target_data = store.get_node(target_node).unwrap();
2194        assert_eq!(
2195            target_data.properties.get(&PropertyKey::new("copied_name")),
2196            Some(&Value::String("Alix".into()))
2197        );
2198    }
2199
2200    // ── ConstraintValidator integration ─────────────────────────
2201
2202    #[test]
2203    fn test_create_node_with_constraint_validator() {
2204        let store = create_test_store();
2205
2206        struct RejectAgeValidator;
2207        impl ConstraintValidator for RejectAgeValidator {
2208            fn validate_node_property(
2209                &self,
2210                _labels: &[String],
2211                key: &str,
2212                _value: &Value,
2213            ) -> Result<(), OperatorError> {
2214                if key == "forbidden" {
2215                    return Err(OperatorError::ConstraintViolation(
2216                        "property 'forbidden' is not allowed".to_string(),
2217                    ));
2218                }
2219                Ok(())
2220            }
2221            fn validate_node_complete(
2222                &self,
2223                _labels: &[String],
2224                _properties: &[(String, Value)],
2225            ) -> Result<(), OperatorError> {
2226                Ok(())
2227            }
2228            fn check_unique_node_property(
2229                &self,
2230                _labels: &[String],
2231                _key: &str,
2232                _value: &Value,
2233            ) -> Result<(), OperatorError> {
2234                Ok(())
2235            }
2236            fn validate_edge_property(
2237                &self,
2238                _edge_type: &str,
2239                _key: &str,
2240                _value: &Value,
2241            ) -> Result<(), OperatorError> {
2242                Ok(())
2243            }
2244            fn validate_edge_complete(
2245                &self,
2246                _edge_type: &str,
2247                _properties: &[(String, Value)],
2248            ) -> Result<(), OperatorError> {
2249                Ok(())
2250            }
2251        }
2252
2253        // Valid property should succeed
2254        let mut op = CreateNodeOperator::new(
2255            Arc::clone(&store),
2256            None,
2257            vec!["Thing".to_string()],
2258            vec![(
2259                "name".to_string(),
2260                PropertySource::Constant(Value::String("ok".into())),
2261            )],
2262            vec![LogicalType::Int64],
2263            0,
2264        )
2265        .with_validator(Arc::new(RejectAgeValidator));
2266
2267        assert!(op.next().is_ok());
2268        assert_eq!(store.node_count(), 1);
2269
2270        // Forbidden property should fail
2271        let mut op = CreateNodeOperator::new(
2272            Arc::clone(&store),
2273            None,
2274            vec!["Thing".to_string()],
2275            vec![(
2276                "forbidden".to_string(),
2277                PropertySource::Constant(Value::Int64(1)),
2278            )],
2279            vec![LogicalType::Int64],
2280            0,
2281        )
2282        .with_validator(Arc::new(RejectAgeValidator));
2283
2284        let err = op.next().unwrap_err();
2285        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2286        // Node count should still be 2 (the node is created before validation, but the error
2287        // propagates - this tests the validation logic fires)
2288    }
2289
2290    // ── Reset behavior ──────────────────────────────────────────
2291
2292    #[test]
2293    fn test_create_node_reset_allows_re_execution() {
2294        let store = create_test_store();
2295
2296        let mut op = CreateNodeOperator::new(
2297            Arc::clone(&store),
2298            None,
2299            vec!["Person".to_string()],
2300            vec![],
2301            vec![LogicalType::Int64],
2302            0,
2303        );
2304
2305        // First execution
2306        assert!(op.next().unwrap().is_some());
2307        assert!(op.next().unwrap().is_none());
2308
2309        // Reset and re-execute
2310        op.reset();
2311        assert!(op.next().unwrap().is_some());
2312
2313        assert_eq!(store.node_count(), 2);
2314    }
2315
2316    // ── Operator name() ──────────────────────────────────────────
2317
2318    #[test]
2319    fn test_operator_names() {
2320        let store = create_test_store();
2321
2322        let op = CreateNodeOperator::new(
2323            Arc::clone(&store),
2324            None,
2325            vec![],
2326            vec![],
2327            vec![LogicalType::Int64],
2328            0,
2329        );
2330        assert_eq!(op.name(), "CreateNode");
2331
2332        let op = CreateEdgeOperator::new(
2333            Arc::clone(&store),
2334            Box::new(EmptyInput),
2335            0,
2336            1,
2337            "R".to_string(),
2338            vec![LogicalType::Int64],
2339        );
2340        assert_eq!(op.name(), "CreateEdge");
2341
2342        let op = DeleteNodeOperator::new(
2343            Arc::clone(&store),
2344            Box::new(EmptyInput),
2345            0,
2346            vec![LogicalType::Int64],
2347            false,
2348        );
2349        assert_eq!(op.name(), "DeleteNode");
2350
2351        let op = DeleteEdgeOperator::new(
2352            Arc::clone(&store),
2353            Box::new(EmptyInput),
2354            0,
2355            vec![LogicalType::Int64],
2356        );
2357        assert_eq!(op.name(), "DeleteEdge");
2358
2359        let op = AddLabelOperator::new(
2360            Arc::clone(&store),
2361            Box::new(EmptyInput),
2362            0,
2363            vec!["L".to_string()],
2364            vec![LogicalType::Int64],
2365        );
2366        assert_eq!(op.name(), "AddLabel");
2367
2368        let op = RemoveLabelOperator::new(
2369            Arc::clone(&store),
2370            Box::new(EmptyInput),
2371            0,
2372            vec!["L".to_string()],
2373            vec![LogicalType::Int64],
2374        );
2375        assert_eq!(op.name(), "RemoveLabel");
2376
2377        let op = SetPropertyOperator::new_for_node(
2378            Arc::clone(&store),
2379            Box::new(EmptyInput),
2380            0,
2381            vec![],
2382            vec![LogicalType::Int64],
2383        );
2384        assert_eq!(op.name(), "SetProperty");
2385    }
2386}