Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    fn validate_node_property(
28        &self,
29        labels: &[String],
30        key: &str,
31        value: &Value,
32    ) -> Result<(), OperatorError>;
33
34    /// Validates that all required properties are present after creating a node.
35    ///
36    /// Checks NOT NULL constraints for properties that were not explicitly set.
37    fn validate_node_complete(
38        &self,
39        labels: &[String],
40        properties: &[(String, Value)],
41    ) -> Result<(), OperatorError>;
42
43    /// Checks UNIQUE constraint for a node property value.
44    ///
45    /// Returns an error if a node with the same label already has this value.
46    fn check_unique_node_property(
47        &self,
48        labels: &[String],
49        key: &str,
50        value: &Value,
51    ) -> Result<(), OperatorError>;
52
53    /// Validates a single property value for an edge of the given type.
54    fn validate_edge_property(
55        &self,
56        edge_type: &str,
57        key: &str,
58        value: &Value,
59    ) -> Result<(), OperatorError>;
60
61    /// Validates that all required properties are present after creating an edge.
62    fn validate_edge_complete(
63        &self,
64        edge_type: &str,
65        properties: &[(String, Value)],
66    ) -> Result<(), OperatorError>;
67
68    /// Validates that the node labels are allowed by the bound graph type.
69    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70        let _ = labels;
71        Ok(())
72    }
73
74    /// Validates that the edge type is allowed by the bound graph type.
75    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76        let _ = edge_type;
77        Ok(())
78    }
79
80    /// Validates that edge endpoints have the correct node type labels.
81    fn validate_edge_endpoints(
82        &self,
83        edge_type: &str,
84        source_labels: &[String],
85        target_labels: &[String],
86    ) -> Result<(), OperatorError> {
87        let _ = (edge_type, source_labels, target_labels);
88        Ok(())
89    }
90
91    /// Injects default values for properties that are defined in a type but
92    /// not explicitly provided.
93    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94        let _ = (labels, properties);
95    }
96}
97
98/// Operator that creates new nodes.
99///
100/// For each input row, creates a new node with the specified labels
101/// and properties, then outputs the row with the new node.
102pub struct CreateNodeOperator {
103    /// The graph store to modify.
104    store: Arc<dyn GraphStoreMut>,
105    /// Input operator.
106    input: Option<Box<dyn Operator>>,
107    /// Labels for the new nodes.
108    labels: Vec<String>,
109    /// Properties to set (name -> column index or constant value).
110    properties: Vec<(String, PropertySource)>,
111    /// Output schema.
112    output_schema: Vec<LogicalType>,
113    /// Column index for the created node variable.
114    output_column: usize,
115    /// Whether this operator has been executed (for no-input case).
116    executed: bool,
117    /// Epoch for MVCC versioning.
118    viewing_epoch: Option<EpochId>,
119    /// Transaction ID for MVCC versioning.
120    transaction_id: Option<TransactionId>,
121    /// Optional constraint validator for schema enforcement.
122    validator: Option<Arc<dyn ConstraintValidator>>,
123    /// Optional write tracker for conflict detection.
124    write_tracker: Option<SharedWriteTracker>,
125}
126
127/// Source for a property value.
128#[derive(Debug, Clone)]
129pub enum PropertySource {
130    /// Get value from an input column.
131    Column(usize),
132    /// Use a constant value.
133    Constant(Value),
134    /// Access a named property from a map/node/edge in an input column.
135    PropertyAccess {
136        /// The column containing the map, node ID, or edge ID.
137        column: usize,
138        /// The property name to extract.
139        property: String,
140    },
141}
142
143impl PropertySource {
144    /// Resolves a property value from a data chunk row.
145    pub fn resolve(
146        &self,
147        chunk: &crate::execution::chunk::DataChunk,
148        row: usize,
149        store: &dyn GraphStore,
150    ) -> Value {
151        match self {
152            PropertySource::Column(col_idx) => chunk
153                .column(*col_idx)
154                .and_then(|c| c.get_value(row))
155                .unwrap_or(Value::Null),
156            PropertySource::Constant(v) => v.clone(),
157            PropertySource::PropertyAccess { column, property } => {
158                let Some(col) = chunk.column(*column) else {
159                    return Value::Null;
160                };
161                // Try node ID first, then edge ID, then map value
162                if let Some(node_id) = col.get_node_id(row) {
163                    store
164                        .get_node(node_id)
165                        .and_then(|node| node.get_property(property).cloned())
166                        .unwrap_or(Value::Null)
167                } else if let Some(edge_id) = col.get_edge_id(row) {
168                    store
169                        .get_edge(edge_id)
170                        .and_then(|edge| edge.get_property(property).cloned())
171                        .unwrap_or(Value::Null)
172                } else if let Some(Value::Map(map)) = col.get_value(row) {
173                    let key = PropertyKey::new(property);
174                    map.get(&key).cloned().unwrap_or(Value::Null)
175                } else {
176                    Value::Null
177                }
178            }
179        }
180    }
181}
182
183impl CreateNodeOperator {
184    /// Creates a new node creation operator.
185    ///
186    /// # Arguments
187    /// * `store` - The graph store to modify.
188    /// * `input` - Optional input operator (None for standalone CREATE).
189    /// * `labels` - Labels to assign to created nodes.
190    /// * `properties` - Properties to set on created nodes.
191    /// * `output_schema` - Schema of the output.
192    /// * `output_column` - Column index where the created node ID goes.
193    pub fn new(
194        store: Arc<dyn GraphStoreMut>,
195        input: Option<Box<dyn Operator>>,
196        labels: Vec<String>,
197        properties: Vec<(String, PropertySource)>,
198        output_schema: Vec<LogicalType>,
199        output_column: usize,
200    ) -> Self {
201        Self {
202            store,
203            input,
204            labels,
205            properties,
206            output_schema,
207            output_column,
208            executed: false,
209            viewing_epoch: None,
210            transaction_id: None,
211            validator: None,
212            write_tracker: None,
213        }
214    }
215
216    /// Sets the transaction context for MVCC versioning.
217    pub fn with_transaction_context(
218        mut self,
219        epoch: EpochId,
220        transaction_id: Option<TransactionId>,
221    ) -> Self {
222        self.viewing_epoch = Some(epoch);
223        self.transaction_id = transaction_id;
224        self
225    }
226
227    /// Sets the constraint validator for schema enforcement.
228    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229        self.validator = Some(validator);
230        self
231    }
232
233    /// Sets the write tracker for conflict detection.
234    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235        self.write_tracker = Some(tracker);
236        self
237    }
238}
239
240impl CreateNodeOperator {
241    /// Validates and sets properties on a newly created node.
242    fn validate_and_set_properties(
243        &self,
244        node_id: NodeId,
245        resolved_props: &mut Vec<(String, Value)>,
246    ) -> Result<(), OperatorError> {
247        // Phase 0: Validate that node labels are allowed by the bound graph type
248        if let Some(ref validator) = self.validator {
249            validator.validate_node_labels_allowed(&self.labels)?;
250        }
251
252        // Phase 0.5: Inject defaults for properties not explicitly provided
253        if let Some(ref validator) = self.validator {
254            validator.inject_defaults(&self.labels, resolved_props);
255        }
256
257        // Phase 1: Validate each property value
258        if let Some(ref validator) = self.validator {
259            for (name, value) in resolved_props.iter() {
260                validator.validate_node_property(&self.labels, name, value)?;
261                validator.check_unique_node_property(&self.labels, name, value)?;
262            }
263            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
264            validator.validate_node_complete(&self.labels, resolved_props)?;
265        }
266
267        // Phase 3: Write properties to the store
268        if let Some(tid) = self.transaction_id {
269            for (name, value) in resolved_props.iter() {
270                self.store
271                    .set_node_property_versioned(node_id, name, value.clone(), tid);
272            }
273        } else {
274            for (name, value) in resolved_props.iter() {
275                self.store.set_node_property(node_id, name, value.clone());
276            }
277        }
278        Ok(())
279    }
280}
281
282impl Operator for CreateNodeOperator {
283    fn next(&mut self) -> OperatorResult {
284        // Get transaction context for versioned creation
285        let epoch = self
286            .viewing_epoch
287            .unwrap_or_else(|| self.store.current_epoch());
288        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
289
290        if let Some(ref mut input) = self.input {
291            // For each input row, create a node
292            if let Some(chunk) = input.next()? {
293                let mut builder =
294                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
295
296                for row in chunk.selected_indices() {
297                    // Resolve all property values first (before creating node)
298                    let mut resolved_props: Vec<(String, Value)> = self
299                        .properties
300                        .iter()
301                        .map(|(name, source)| {
302                            let value =
303                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
304                            (name.clone(), value)
305                        })
306                        .collect();
307
308                    // Create the node with MVCC versioning
309                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
310                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
311
312                    // Record write for conflict detection
313                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
314                        tracker.record_node_write(tid, node_id)?;
315                    }
316
317                    // Validate and set properties
318                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
319
320                    // Copy input columns to output
321                    for col_idx in 0..chunk.column_count() {
322                        if col_idx < self.output_column
323                            && let (Some(src), Some(dst)) =
324                                (chunk.column(col_idx), builder.column_mut(col_idx))
325                        {
326                            if let Some(val) = src.get_value(row) {
327                                dst.push_value(val);
328                            } else {
329                                dst.push_value(Value::Null);
330                            }
331                        }
332                    }
333
334                    // Add the new node ID
335                    if let Some(dst) = builder.column_mut(self.output_column) {
336                        dst.push_value(Value::Int64(node_id.0 as i64));
337                    }
338
339                    builder.advance_row();
340                }
341
342                return Ok(Some(builder.finish()));
343            }
344            Ok(None)
345        } else {
346            // No input - create a single node
347            if self.executed {
348                return Ok(None);
349            }
350            self.executed = true;
351
352            // Resolve constant properties
353            let mut resolved_props: Vec<(String, Value)> = self
354                .properties
355                .iter()
356                .filter_map(|(name, source)| {
357                    if let PropertySource::Constant(value) = source {
358                        Some((name.clone(), value.clone()))
359                    } else {
360                        None
361                    }
362                })
363                .collect();
364
365            // Create the node with MVCC versioning
366            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
367            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
368
369            // Record write for conflict detection
370            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
371                tracker.record_node_write(tid, node_id)?;
372            }
373
374            // Validate and set properties
375            self.validate_and_set_properties(node_id, &mut resolved_props)?;
376
377            // Build output chunk with just the node ID
378            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
379            if let Some(dst) = builder.column_mut(self.output_column) {
380                dst.push_value(Value::Int64(node_id.0 as i64));
381            }
382            builder.advance_row();
383
384            Ok(Some(builder.finish()))
385        }
386    }
387
388    fn reset(&mut self) {
389        if let Some(ref mut input) = self.input {
390            input.reset();
391        }
392        self.executed = false;
393    }
394
395    fn name(&self) -> &'static str {
396        "CreateNode"
397    }
398}
399
400/// Operator that creates new edges.
401pub struct CreateEdgeOperator {
402    /// The graph store to modify.
403    store: Arc<dyn GraphStoreMut>,
404    /// Input operator.
405    input: Box<dyn Operator>,
406    /// Column index for the source node.
407    from_column: usize,
408    /// Column index for the target node.
409    to_column: usize,
410    /// Edge type.
411    edge_type: String,
412    /// Properties to set.
413    properties: Vec<(String, PropertySource)>,
414    /// Output schema.
415    output_schema: Vec<LogicalType>,
416    /// Column index for the created edge variable (if any).
417    output_column: Option<usize>,
418    /// Epoch for MVCC versioning.
419    viewing_epoch: Option<EpochId>,
420    /// Transaction ID for MVCC versioning.
421    transaction_id: Option<TransactionId>,
422    /// Optional constraint validator for schema enforcement.
423    validator: Option<Arc<dyn ConstraintValidator>>,
424    /// Optional write tracker for conflict detection.
425    write_tracker: Option<SharedWriteTracker>,
426}
427
428impl CreateEdgeOperator {
429    /// Creates a new edge creation operator.
430    ///
431    /// Use builder methods to set additional options:
432    /// - [`with_properties`](Self::with_properties) - set edge properties
433    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
434    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
435    pub fn new(
436        store: Arc<dyn GraphStoreMut>,
437        input: Box<dyn Operator>,
438        from_column: usize,
439        to_column: usize,
440        edge_type: String,
441        output_schema: Vec<LogicalType>,
442    ) -> Self {
443        Self {
444            store,
445            input,
446            from_column,
447            to_column,
448            edge_type,
449            properties: Vec::new(),
450            output_schema,
451            output_column: None,
452            viewing_epoch: None,
453            transaction_id: None,
454            validator: None,
455            write_tracker: None,
456        }
457    }
458
459    /// Sets the properties to assign to created edges.
460    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
461        self.properties = properties;
462        self
463    }
464
465    /// Sets the output column for the created edge ID.
466    pub fn with_output_column(mut self, column: usize) -> Self {
467        self.output_column = Some(column);
468        self
469    }
470
471    /// Sets the transaction context for MVCC versioning.
472    pub fn with_transaction_context(
473        mut self,
474        epoch: EpochId,
475        transaction_id: Option<TransactionId>,
476    ) -> Self {
477        self.viewing_epoch = Some(epoch);
478        self.transaction_id = transaction_id;
479        self
480    }
481
482    /// Sets the constraint validator for schema enforcement.
483    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
484        self.validator = Some(validator);
485        self
486    }
487
488    /// Sets the write tracker for conflict detection.
489    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
490        self.write_tracker = Some(tracker);
491        self
492    }
493}
494
495impl Operator for CreateEdgeOperator {
496    fn next(&mut self) -> OperatorResult {
497        // Get transaction context for versioned creation
498        let epoch = self
499            .viewing_epoch
500            .unwrap_or_else(|| self.store.current_epoch());
501        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
502
503        if let Some(chunk) = self.input.next()? {
504            let mut builder =
505                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
506
507            for row in chunk.selected_indices() {
508                // Get source and target node IDs
509                let from_id = chunk
510                    .column(self.from_column)
511                    .and_then(|c| c.get_value(row))
512                    .ok_or_else(|| {
513                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
514                    })?;
515
516                let to_id = chunk
517                    .column(self.to_column)
518                    .and_then(|c| c.get_value(row))
519                    .ok_or_else(|| {
520                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
521                    })?;
522
523                // Extract node IDs
524                let from_node_id = match from_id {
525                    Value::Int64(id) => NodeId(id as u64),
526                    _ => {
527                        return Err(OperatorError::TypeMismatch {
528                            expected: "Int64 (node ID)".to_string(),
529                            found: format!("{from_id:?}"),
530                        });
531                    }
532                };
533
534                let to_node_id = match to_id {
535                    Value::Int64(id) => NodeId(id as u64),
536                    _ => {
537                        return Err(OperatorError::TypeMismatch {
538                            expected: "Int64 (node ID)".to_string(),
539                            found: format!("{to_id:?}"),
540                        });
541                    }
542                };
543
544                // Validate graph type and edge endpoint constraints
545                if let Some(ref validator) = self.validator {
546                    validator.validate_edge_type_allowed(&self.edge_type)?;
547
548                    // Look up source and target node labels for endpoint validation
549                    let source_labels: Vec<String> = self
550                        .store
551                        .get_node(from_node_id)
552                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
553                        .unwrap_or_default();
554                    let target_labels: Vec<String> = self
555                        .store
556                        .get_node(to_node_id)
557                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
558                        .unwrap_or_default();
559                    validator.validate_edge_endpoints(
560                        &self.edge_type,
561                        &source_labels,
562                        &target_labels,
563                    )?;
564                }
565
566                // Resolve property values
567                let resolved_props: Vec<(String, Value)> = self
568                    .properties
569                    .iter()
570                    .map(|(name, source)| {
571                        let value =
572                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
573                        (name.clone(), value)
574                    })
575                    .collect();
576
577                // Validate constraints before writing
578                if let Some(ref validator) = self.validator {
579                    for (name, value) in &resolved_props {
580                        validator.validate_edge_property(&self.edge_type, name, value)?;
581                    }
582                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
583                }
584
585                // Create the edge with MVCC versioning
586                let edge_id = self.store.create_edge_versioned(
587                    from_node_id,
588                    to_node_id,
589                    &self.edge_type,
590                    epoch,
591                    tx,
592                );
593
594                // Record write for conflict detection
595                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
596                    tracker.record_edge_write(tid, edge_id)?;
597                }
598
599                // Set properties
600                if let Some(tid) = self.transaction_id {
601                    for (name, value) in resolved_props {
602                        self.store
603                            .set_edge_property_versioned(edge_id, &name, value, tid);
604                    }
605                } else {
606                    for (name, value) in resolved_props {
607                        self.store.set_edge_property(edge_id, &name, value);
608                    }
609                }
610
611                // Copy input columns
612                for col_idx in 0..chunk.column_count() {
613                    if let (Some(src), Some(dst)) =
614                        (chunk.column(col_idx), builder.column_mut(col_idx))
615                    {
616                        if let Some(val) = src.get_value(row) {
617                            dst.push_value(val);
618                        } else {
619                            dst.push_value(Value::Null);
620                        }
621                    }
622                }
623
624                // Add edge ID if requested
625                if let Some(out_col) = self.output_column
626                    && let Some(dst) = builder.column_mut(out_col)
627                {
628                    dst.push_value(Value::Int64(edge_id.0 as i64));
629                }
630
631                builder.advance_row();
632            }
633
634            return Ok(Some(builder.finish()));
635        }
636        Ok(None)
637    }
638
639    fn reset(&mut self) {
640        self.input.reset();
641    }
642
643    fn name(&self) -> &'static str {
644        "CreateEdge"
645    }
646}
647
648/// Operator that deletes nodes.
649pub struct DeleteNodeOperator {
650    /// The graph store to modify.
651    store: Arc<dyn GraphStoreMut>,
652    /// Input operator.
653    input: Box<dyn Operator>,
654    /// Column index for the node to delete.
655    node_column: usize,
656    /// Output schema.
657    output_schema: Vec<LogicalType>,
658    /// Whether to detach (delete connected edges) before deleting.
659    detach: bool,
660    /// Epoch for MVCC versioning.
661    viewing_epoch: Option<EpochId>,
662    /// Transaction ID for MVCC versioning.
663    transaction_id: Option<TransactionId>,
664    /// Optional write tracker for conflict detection.
665    write_tracker: Option<SharedWriteTracker>,
666}
667
668impl DeleteNodeOperator {
669    /// Creates a new node deletion operator.
670    pub fn new(
671        store: Arc<dyn GraphStoreMut>,
672        input: Box<dyn Operator>,
673        node_column: usize,
674        output_schema: Vec<LogicalType>,
675        detach: bool,
676    ) -> Self {
677        Self {
678            store,
679            input,
680            node_column,
681            output_schema,
682            detach,
683            viewing_epoch: None,
684            transaction_id: None,
685            write_tracker: None,
686        }
687    }
688
689    /// Sets the transaction context for MVCC versioning.
690    pub fn with_transaction_context(
691        mut self,
692        epoch: EpochId,
693        transaction_id: Option<TransactionId>,
694    ) -> Self {
695        self.viewing_epoch = Some(epoch);
696        self.transaction_id = transaction_id;
697        self
698    }
699
700    /// Sets the write tracker for conflict detection.
701    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
702        self.write_tracker = Some(tracker);
703        self
704    }
705}
706
707impl Operator for DeleteNodeOperator {
708    fn next(&mut self) -> OperatorResult {
709        // Get transaction context for versioned deletion
710        let epoch = self
711            .viewing_epoch
712            .unwrap_or_else(|| self.store.current_epoch());
713        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
714
715        if let Some(chunk) = self.input.next()? {
716            let mut builder =
717                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
718
719            for row in chunk.selected_indices() {
720                let node_val = chunk
721                    .column(self.node_column)
722                    .and_then(|c| c.get_value(row))
723                    .ok_or_else(|| {
724                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
725                    })?;
726
727                let node_id = match node_val {
728                    Value::Int64(id) => NodeId(id as u64),
729                    _ => {
730                        return Err(OperatorError::TypeMismatch {
731                            expected: "Int64 (node ID)".to_string(),
732                            found: format!("{node_val:?}"),
733                        });
734                    }
735                };
736
737                if self.detach {
738                    // Delete all connected edges first, using versioned deletion
739                    // so rollback can restore them
740                    let outgoing = self
741                        .store
742                        .edges_from(node_id, crate::graph::Direction::Outgoing);
743                    let incoming = self
744                        .store
745                        .edges_from(node_id, crate::graph::Direction::Incoming);
746                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
747                        self.store.delete_edge_versioned(edge_id, epoch, tx);
748                        if let (Some(tracker), Some(tid)) =
749                            (&self.write_tracker, self.transaction_id)
750                        {
751                            tracker.record_edge_write(tid, edge_id)?;
752                        }
753                    }
754                } else {
755                    // NODETACH: check that node has no connected edges
756                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
757                    if degree > 0 {
758                        return Err(OperatorError::ConstraintViolation(format!(
759                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
760                            degree
761                        )));
762                    }
763                }
764
765                // Delete the node with MVCC versioning
766                self.store.delete_node_versioned(node_id, epoch, tx);
767
768                // Record write for conflict detection
769                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
770                    tracker.record_node_write(tid, node_id)?;
771                }
772
773                // Pass through all input columns so downstream RETURN can
774                // reference the variable (e.g., count(n) after DELETE n).
775                for col_idx in 0..chunk.column_count() {
776                    if let (Some(src), Some(dst)) =
777                        (chunk.column(col_idx), builder.column_mut(col_idx))
778                    {
779                        if let Some(val) = src.get_value(row) {
780                            dst.push_value(val);
781                        } else {
782                            dst.push_value(Value::Null);
783                        }
784                    }
785                }
786                builder.advance_row();
787            }
788
789            return Ok(Some(builder.finish()));
790        }
791        Ok(None)
792    }
793
794    fn reset(&mut self) {
795        self.input.reset();
796    }
797
798    fn name(&self) -> &'static str {
799        "DeleteNode"
800    }
801}
802
803/// Operator that deletes edges.
804pub struct DeleteEdgeOperator {
805    /// The graph store to modify.
806    store: Arc<dyn GraphStoreMut>,
807    /// Input operator.
808    input: Box<dyn Operator>,
809    /// Column index for the edge to delete.
810    edge_column: usize,
811    /// Output schema.
812    output_schema: Vec<LogicalType>,
813    /// Epoch for MVCC versioning.
814    viewing_epoch: Option<EpochId>,
815    /// Transaction ID for MVCC versioning.
816    transaction_id: Option<TransactionId>,
817    /// Optional write tracker for conflict detection.
818    write_tracker: Option<SharedWriteTracker>,
819}
820
821impl DeleteEdgeOperator {
822    /// Creates a new edge deletion operator.
823    pub fn new(
824        store: Arc<dyn GraphStoreMut>,
825        input: Box<dyn Operator>,
826        edge_column: usize,
827        output_schema: Vec<LogicalType>,
828    ) -> Self {
829        Self {
830            store,
831            input,
832            edge_column,
833            output_schema,
834            viewing_epoch: None,
835            transaction_id: None,
836            write_tracker: None,
837        }
838    }
839
840    /// Sets the transaction context for MVCC versioning.
841    pub fn with_transaction_context(
842        mut self,
843        epoch: EpochId,
844        transaction_id: Option<TransactionId>,
845    ) -> Self {
846        self.viewing_epoch = Some(epoch);
847        self.transaction_id = transaction_id;
848        self
849    }
850
851    /// Sets the write tracker for conflict detection.
852    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
853        self.write_tracker = Some(tracker);
854        self
855    }
856}
857
858impl Operator for DeleteEdgeOperator {
859    fn next(&mut self) -> OperatorResult {
860        // Get transaction context for versioned deletion
861        let epoch = self
862            .viewing_epoch
863            .unwrap_or_else(|| self.store.current_epoch());
864        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
865
866        if let Some(chunk) = self.input.next()? {
867            let mut builder =
868                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
869
870            for row in chunk.selected_indices() {
871                let edge_val = chunk
872                    .column(self.edge_column)
873                    .and_then(|c| c.get_value(row))
874                    .ok_or_else(|| {
875                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
876                    })?;
877
878                let edge_id = match edge_val {
879                    Value::Int64(id) => EdgeId(id as u64),
880                    _ => {
881                        return Err(OperatorError::TypeMismatch {
882                            expected: "Int64 (edge ID)".to_string(),
883                            found: format!("{edge_val:?}"),
884                        });
885                    }
886                };
887
888                // Delete the edge with MVCC versioning
889                self.store.delete_edge_versioned(edge_id, epoch, tx);
890
891                // Record write for conflict detection
892                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
893                    tracker.record_edge_write(tid, edge_id)?;
894                }
895
896                // Pass through all input columns
897                for col_idx in 0..chunk.column_count() {
898                    if let (Some(src), Some(dst)) =
899                        (chunk.column(col_idx), builder.column_mut(col_idx))
900                    {
901                        if let Some(val) = src.get_value(row) {
902                            dst.push_value(val);
903                        } else {
904                            dst.push_value(Value::Null);
905                        }
906                    }
907                }
908                builder.advance_row();
909            }
910
911            return Ok(Some(builder.finish()));
912        }
913        Ok(None)
914    }
915
916    fn reset(&mut self) {
917        self.input.reset();
918    }
919
920    fn name(&self) -> &'static str {
921        "DeleteEdge"
922    }
923}
924
925/// Operator that adds labels to nodes.
926pub struct AddLabelOperator {
927    /// The graph store.
928    store: Arc<dyn GraphStoreMut>,
929    /// Child operator providing nodes.
930    input: Box<dyn Operator>,
931    /// Column index containing node IDs.
932    node_column: usize,
933    /// Labels to add.
934    labels: Vec<String>,
935    /// Output schema.
936    output_schema: Vec<LogicalType>,
937    /// Column index for the update count (last column).
938    count_column: usize,
939    /// Epoch for MVCC versioning.
940    viewing_epoch: Option<EpochId>,
941    /// Transaction ID for undo log tracking.
942    transaction_id: Option<TransactionId>,
943    /// Optional write tracker for conflict detection.
944    write_tracker: Option<SharedWriteTracker>,
945}
946
947impl AddLabelOperator {
948    /// Creates a new add label operator.
949    pub fn new(
950        store: Arc<dyn GraphStoreMut>,
951        input: Box<dyn Operator>,
952        node_column: usize,
953        labels: Vec<String>,
954        output_schema: Vec<LogicalType>,
955    ) -> Self {
956        let count_column = output_schema.len() - 1;
957        Self {
958            store,
959            input,
960            node_column,
961            labels,
962            count_column,
963            output_schema,
964            viewing_epoch: None,
965            transaction_id: None,
966            write_tracker: None,
967        }
968    }
969
970    /// Sets the transaction context for versioned label mutations.
971    pub fn with_transaction_context(
972        mut self,
973        epoch: EpochId,
974        transaction_id: Option<TransactionId>,
975    ) -> Self {
976        self.viewing_epoch = Some(epoch);
977        self.transaction_id = transaction_id;
978        self
979    }
980
981    /// Sets the write tracker for conflict detection.
982    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
983        self.write_tracker = Some(tracker);
984        self
985    }
986}
987
988impl Operator for AddLabelOperator {
989    fn next(&mut self) -> OperatorResult {
990        if let Some(chunk) = self.input.next()? {
991            let mut builder =
992                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
993
994            for row in chunk.selected_indices() {
995                let node_val = chunk
996                    .column(self.node_column)
997                    .and_then(|c| c.get_value(row))
998                    .ok_or_else(|| {
999                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1000                    })?;
1001
1002                let node_id = match node_val {
1003                    Value::Int64(id) => NodeId(id as u64),
1004                    _ => {
1005                        return Err(OperatorError::TypeMismatch {
1006                            expected: "Int64 (node ID)".to_string(),
1007                            found: format!("{node_val:?}"),
1008                        });
1009                    }
1010                };
1011
1012                // Record write for conflict detection
1013                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1014                    tracker.record_node_write(tid, node_id)?;
1015                }
1016
1017                // Add all labels
1018                let mut row_count: i64 = 0;
1019                for label in &self.labels {
1020                    let added = if let Some(tid) = self.transaction_id {
1021                        self.store.add_label_versioned(node_id, label, tid)
1022                    } else {
1023                        self.store.add_label(node_id, label)
1024                    };
1025                    if added {
1026                        row_count += 1;
1027                    }
1028                }
1029
1030                // Copy input columns to output (pass-through)
1031                for col_idx in 0..chunk.column_count() {
1032                    if let (Some(src), Some(dst)) =
1033                        (chunk.column(col_idx), builder.column_mut(col_idx))
1034                    {
1035                        if let Some(val) = src.get_value(row) {
1036                            dst.push_value(val);
1037                        } else {
1038                            dst.push_value(Value::Null);
1039                        }
1040                    }
1041                }
1042                // Append the update count column
1043                if let Some(dst) = builder.column_mut(self.count_column) {
1044                    dst.push_value(Value::Int64(row_count));
1045                }
1046
1047                builder.advance_row();
1048            }
1049
1050            return Ok(Some(builder.finish()));
1051        }
1052        Ok(None)
1053    }
1054
1055    fn reset(&mut self) {
1056        self.input.reset();
1057    }
1058
1059    fn name(&self) -> &'static str {
1060        "AddLabel"
1061    }
1062}
1063
1064/// Operator that removes labels from nodes.
1065pub struct RemoveLabelOperator {
1066    /// The graph store.
1067    store: Arc<dyn GraphStoreMut>,
1068    /// Child operator providing nodes.
1069    input: Box<dyn Operator>,
1070    /// Column index containing node IDs.
1071    node_column: usize,
1072    /// Labels to remove.
1073    labels: Vec<String>,
1074    /// Output schema.
1075    output_schema: Vec<LogicalType>,
1076    /// Column index for the update count (last column).
1077    count_column: usize,
1078    /// Epoch for MVCC versioning.
1079    viewing_epoch: Option<EpochId>,
1080    /// Transaction ID for undo log tracking.
1081    transaction_id: Option<TransactionId>,
1082    /// Optional write tracker for conflict detection.
1083    write_tracker: Option<SharedWriteTracker>,
1084}
1085
1086impl RemoveLabelOperator {
1087    /// Creates a new remove label operator.
1088    pub fn new(
1089        store: Arc<dyn GraphStoreMut>,
1090        input: Box<dyn Operator>,
1091        node_column: usize,
1092        labels: Vec<String>,
1093        output_schema: Vec<LogicalType>,
1094    ) -> Self {
1095        let count_column = output_schema.len() - 1;
1096        Self {
1097            store,
1098            input,
1099            node_column,
1100            labels,
1101            count_column,
1102            output_schema,
1103            viewing_epoch: None,
1104            transaction_id: None,
1105            write_tracker: None,
1106        }
1107    }
1108
1109    /// Sets the transaction context for versioned label mutations.
1110    pub fn with_transaction_context(
1111        mut self,
1112        epoch: EpochId,
1113        transaction_id: Option<TransactionId>,
1114    ) -> Self {
1115        self.viewing_epoch = Some(epoch);
1116        self.transaction_id = transaction_id;
1117        self
1118    }
1119
1120    /// Sets the write tracker for conflict detection.
1121    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1122        self.write_tracker = Some(tracker);
1123        self
1124    }
1125}
1126
1127impl Operator for RemoveLabelOperator {
1128    fn next(&mut self) -> OperatorResult {
1129        if let Some(chunk) = self.input.next()? {
1130            let mut builder =
1131                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1132
1133            for row in chunk.selected_indices() {
1134                let node_val = chunk
1135                    .column(self.node_column)
1136                    .and_then(|c| c.get_value(row))
1137                    .ok_or_else(|| {
1138                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1139                    })?;
1140
1141                let node_id = match node_val {
1142                    Value::Int64(id) => NodeId(id as u64),
1143                    _ => {
1144                        return Err(OperatorError::TypeMismatch {
1145                            expected: "Int64 (node ID)".to_string(),
1146                            found: format!("{node_val:?}"),
1147                        });
1148                    }
1149                };
1150
1151                // Record write for conflict detection
1152                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1153                    tracker.record_node_write(tid, node_id)?;
1154                }
1155
1156                // Remove all labels
1157                let mut row_count: i64 = 0;
1158                for label in &self.labels {
1159                    let removed = if let Some(tid) = self.transaction_id {
1160                        self.store.remove_label_versioned(node_id, label, tid)
1161                    } else {
1162                        self.store.remove_label(node_id, label)
1163                    };
1164                    if removed {
1165                        row_count += 1;
1166                    }
1167                }
1168
1169                // Copy input columns to output (pass-through)
1170                for col_idx in 0..chunk.column_count() {
1171                    if let (Some(src), Some(dst)) =
1172                        (chunk.column(col_idx), builder.column_mut(col_idx))
1173                    {
1174                        if let Some(val) = src.get_value(row) {
1175                            dst.push_value(val);
1176                        } else {
1177                            dst.push_value(Value::Null);
1178                        }
1179                    }
1180                }
1181                // Append the update count column
1182                if let Some(dst) = builder.column_mut(self.count_column) {
1183                    dst.push_value(Value::Int64(row_count));
1184                }
1185
1186                builder.advance_row();
1187            }
1188
1189            return Ok(Some(builder.finish()));
1190        }
1191        Ok(None)
1192    }
1193
1194    fn reset(&mut self) {
1195        self.input.reset();
1196    }
1197
1198    fn name(&self) -> &'static str {
1199        "RemoveLabel"
1200    }
1201}
1202
1203/// Operator that sets properties on nodes or edges.
1204///
1205/// This operator reads node/edge IDs from a column and sets the
1206/// specified properties on each entity.
1207pub struct SetPropertyOperator {
1208    /// The graph store.
1209    store: Arc<dyn GraphStoreMut>,
1210    /// Child operator providing entities.
1211    input: Box<dyn Operator>,
1212    /// Column index containing entity IDs (node or edge).
1213    entity_column: usize,
1214    /// Whether the entity is an edge (false = node).
1215    is_edge: bool,
1216    /// Properties to set (name -> source).
1217    properties: Vec<(String, PropertySource)>,
1218    /// Output schema.
1219    output_schema: Vec<LogicalType>,
1220    /// Whether to replace all properties (true) or merge (false) for map assignments.
1221    replace: bool,
1222    /// Optional constraint validator for schema enforcement.
1223    validator: Option<Arc<dyn ConstraintValidator>>,
1224    /// Entity labels (for node constraint validation).
1225    labels: Vec<String>,
1226    /// Edge type (for edge constraint validation).
1227    edge_type_name: Option<String>,
1228    /// Epoch for MVCC versioning.
1229    viewing_epoch: Option<EpochId>,
1230    /// Transaction ID for undo log tracking.
1231    transaction_id: Option<TransactionId>,
1232    /// Optional write tracker for conflict detection.
1233    write_tracker: Option<SharedWriteTracker>,
1234}
1235
1236impl SetPropertyOperator {
1237    /// Creates a new set property operator for nodes.
1238    pub fn new_for_node(
1239        store: Arc<dyn GraphStoreMut>,
1240        input: Box<dyn Operator>,
1241        node_column: usize,
1242        properties: Vec<(String, PropertySource)>,
1243        output_schema: Vec<LogicalType>,
1244    ) -> Self {
1245        Self {
1246            store,
1247            input,
1248            entity_column: node_column,
1249            is_edge: false,
1250            properties,
1251            output_schema,
1252            replace: false,
1253            validator: None,
1254            labels: Vec::new(),
1255            edge_type_name: None,
1256            viewing_epoch: None,
1257            transaction_id: None,
1258            write_tracker: None,
1259        }
1260    }
1261
1262    /// Creates a new set property operator for edges.
1263    pub fn new_for_edge(
1264        store: Arc<dyn GraphStoreMut>,
1265        input: Box<dyn Operator>,
1266        edge_column: usize,
1267        properties: Vec<(String, PropertySource)>,
1268        output_schema: Vec<LogicalType>,
1269    ) -> Self {
1270        Self {
1271            store,
1272            input,
1273            entity_column: edge_column,
1274            is_edge: true,
1275            properties,
1276            output_schema,
1277            replace: false,
1278            validator: None,
1279            labels: Vec::new(),
1280            edge_type_name: None,
1281            viewing_epoch: None,
1282            transaction_id: None,
1283            write_tracker: None,
1284        }
1285    }
1286
1287    /// Sets whether this operator replaces all properties (for map assignment).
1288    pub fn with_replace(mut self, replace: bool) -> Self {
1289        self.replace = replace;
1290        self
1291    }
1292
1293    /// Sets the constraint validator for schema enforcement.
1294    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1295        self.validator = Some(validator);
1296        self
1297    }
1298
1299    /// Sets the entity labels (for node constraint validation).
1300    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1301        self.labels = labels;
1302        self
1303    }
1304
1305    /// Sets the edge type name (for edge constraint validation).
1306    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1307        self.edge_type_name = Some(edge_type);
1308        self
1309    }
1310
1311    /// Sets the transaction context for versioned property mutations.
1312    ///
1313    /// When a transaction ID is provided, property changes are recorded in
1314    /// an undo log so they can be restored on rollback.
1315    pub fn with_transaction_context(
1316        mut self,
1317        epoch: EpochId,
1318        transaction_id: Option<TransactionId>,
1319    ) -> Self {
1320        self.viewing_epoch = Some(epoch);
1321        self.transaction_id = transaction_id;
1322        self
1323    }
1324
1325    /// Sets the write tracker for conflict detection.
1326    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1327        self.write_tracker = Some(tracker);
1328        self
1329    }
1330}
1331
1332impl Operator for SetPropertyOperator {
1333    fn next(&mut self) -> OperatorResult {
1334        if let Some(chunk) = self.input.next()? {
1335            let mut builder =
1336                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1337
1338            for row in chunk.selected_indices() {
1339                let entity_val = chunk
1340                    .column(self.entity_column)
1341                    .and_then(|c| c.get_value(row))
1342                    .ok_or_else(|| {
1343                        OperatorError::ColumnNotFound(format!(
1344                            "entity column {}",
1345                            self.entity_column
1346                        ))
1347                    })?;
1348
1349                let entity_id = match entity_val {
1350                    Value::Int64(id) => id as u64,
1351                    _ => {
1352                        return Err(OperatorError::TypeMismatch {
1353                            expected: "Int64 (entity ID)".to_string(),
1354                            found: format!("{entity_val:?}"),
1355                        });
1356                    }
1357                };
1358
1359                // Record write for conflict detection
1360                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1361                    if self.is_edge {
1362                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1363                    } else {
1364                        tracker.record_node_write(tid, NodeId(entity_id))?;
1365                    }
1366                }
1367
1368                // Resolve all property values
1369                let resolved_props: Vec<(String, Value)> = self
1370                    .properties
1371                    .iter()
1372                    .map(|(name, source)| {
1373                        let value =
1374                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1375                        (name.clone(), value)
1376                    })
1377                    .collect();
1378
1379                // Validate constraints before writing
1380                if let Some(ref validator) = self.validator {
1381                    if self.is_edge {
1382                        if let Some(ref et) = self.edge_type_name {
1383                            for (name, value) in &resolved_props {
1384                                validator.validate_edge_property(et, name, value)?;
1385                            }
1386                        }
1387                    } else {
1388                        for (name, value) in &resolved_props {
1389                            validator.validate_node_property(&self.labels, name, value)?;
1390                            validator.check_unique_node_property(&self.labels, name, value)?;
1391                        }
1392                    }
1393                }
1394
1395                // Write all properties (use versioned methods when inside a transaction)
1396                let tx_id = self.transaction_id;
1397                for (prop_name, value) in resolved_props {
1398                    if prop_name == "*" {
1399                        // Map assignment: value should be a Map
1400                        if let Value::Map(map) = value {
1401                            if self.replace {
1402                                // Replace: remove all existing properties first
1403                                if self.is_edge {
1404                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1405                                        let keys: Vec<String> = edge
1406                                            .properties
1407                                            .iter()
1408                                            .map(|(k, _)| k.as_str().to_string())
1409                                            .collect();
1410                                        for key in keys {
1411                                            if let Some(tid) = tx_id {
1412                                                self.store.remove_edge_property_versioned(
1413                                                    EdgeId(entity_id),
1414                                                    &key,
1415                                                    tid,
1416                                                );
1417                                            } else {
1418                                                self.store
1419                                                    .remove_edge_property(EdgeId(entity_id), &key);
1420                                            }
1421                                        }
1422                                    }
1423                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1424                                    let keys: Vec<String> = node
1425                                        .properties
1426                                        .iter()
1427                                        .map(|(k, _)| k.as_str().to_string())
1428                                        .collect();
1429                                    for key in keys {
1430                                        if let Some(tid) = tx_id {
1431                                            self.store.remove_node_property_versioned(
1432                                                NodeId(entity_id),
1433                                                &key,
1434                                                tid,
1435                                            );
1436                                        } else {
1437                                            self.store
1438                                                .remove_node_property(NodeId(entity_id), &key);
1439                                        }
1440                                    }
1441                                }
1442                            }
1443                            // Set each map entry
1444                            for (key, val) in map.iter() {
1445                                if self.is_edge {
1446                                    if let Some(tid) = tx_id {
1447                                        self.store.set_edge_property_versioned(
1448                                            EdgeId(entity_id),
1449                                            key.as_str(),
1450                                            val.clone(),
1451                                            tid,
1452                                        );
1453                                    } else {
1454                                        self.store.set_edge_property(
1455                                            EdgeId(entity_id),
1456                                            key.as_str(),
1457                                            val.clone(),
1458                                        );
1459                                    }
1460                                } else if let Some(tid) = tx_id {
1461                                    self.store.set_node_property_versioned(
1462                                        NodeId(entity_id),
1463                                        key.as_str(),
1464                                        val.clone(),
1465                                        tid,
1466                                    );
1467                                } else {
1468                                    self.store.set_node_property(
1469                                        NodeId(entity_id),
1470                                        key.as_str(),
1471                                        val.clone(),
1472                                    );
1473                                }
1474                            }
1475                        }
1476                    } else if self.is_edge {
1477                        if let Some(tid) = tx_id {
1478                            self.store.set_edge_property_versioned(
1479                                EdgeId(entity_id),
1480                                &prop_name,
1481                                value,
1482                                tid,
1483                            );
1484                        } else {
1485                            self.store
1486                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1487                        }
1488                    } else if let Some(tid) = tx_id {
1489                        self.store.set_node_property_versioned(
1490                            NodeId(entity_id),
1491                            &prop_name,
1492                            value,
1493                            tid,
1494                        );
1495                    } else {
1496                        self.store
1497                            .set_node_property(NodeId(entity_id), &prop_name, value);
1498                    }
1499                }
1500
1501                // Copy input columns to output
1502                for col_idx in 0..chunk.column_count() {
1503                    if let (Some(src), Some(dst)) =
1504                        (chunk.column(col_idx), builder.column_mut(col_idx))
1505                    {
1506                        if let Some(val) = src.get_value(row) {
1507                            dst.push_value(val);
1508                        } else {
1509                            dst.push_value(Value::Null);
1510                        }
1511                    }
1512                }
1513
1514                builder.advance_row();
1515            }
1516
1517            return Ok(Some(builder.finish()));
1518        }
1519        Ok(None)
1520    }
1521
1522    fn reset(&mut self) {
1523        self.input.reset();
1524    }
1525
1526    fn name(&self) -> &'static str {
1527        "SetProperty"
1528    }
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533    use super::*;
1534    use crate::execution::DataChunk;
1535    use crate::execution::chunk::DataChunkBuilder;
1536    use crate::graph::lpg::LpgStore;
1537
1538    // ── Helpers ────────────────────────────────────────────────────
1539
1540    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1541        Arc::new(LpgStore::new().unwrap())
1542    }
1543
1544    struct MockInput {
1545        chunk: Option<DataChunk>,
1546    }
1547
1548    impl MockInput {
1549        fn boxed(chunk: DataChunk) -> Box<Self> {
1550            Box::new(Self { chunk: Some(chunk) })
1551        }
1552    }
1553
1554    impl Operator for MockInput {
1555        fn next(&mut self) -> OperatorResult {
1556            Ok(self.chunk.take())
1557        }
1558        fn reset(&mut self) {}
1559        fn name(&self) -> &'static str {
1560            "MockInput"
1561        }
1562    }
1563
1564    struct EmptyInput;
1565    impl Operator for EmptyInput {
1566        fn next(&mut self) -> OperatorResult {
1567            Ok(None)
1568        }
1569        fn reset(&mut self) {}
1570        fn name(&self) -> &'static str {
1571            "EmptyInput"
1572        }
1573    }
1574
1575    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1576        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1577        for id in ids {
1578            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1579            builder.advance_row();
1580        }
1581        builder.finish()
1582    }
1583
1584    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1585        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1586        for id in ids {
1587            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1588            builder.advance_row();
1589        }
1590        builder.finish()
1591    }
1592
1593    // ── CreateNodeOperator ──────────────────────────────────────
1594
1595    #[test]
1596    fn test_create_node_standalone() {
1597        let store = create_test_store();
1598
1599        let mut op = CreateNodeOperator::new(
1600            Arc::clone(&store),
1601            None,
1602            vec!["Person".to_string()],
1603            vec![(
1604                "name".to_string(),
1605                PropertySource::Constant(Value::String("Alix".into())),
1606            )],
1607            vec![LogicalType::Int64],
1608            0,
1609        );
1610
1611        let chunk = op.next().unwrap().unwrap();
1612        assert_eq!(chunk.row_count(), 1);
1613
1614        // Second call should return None (standalone executes once)
1615        assert!(op.next().unwrap().is_none());
1616
1617        assert_eq!(store.node_count(), 1);
1618    }
1619
1620    #[test]
1621    fn test_create_edge() {
1622        let store = create_test_store();
1623
1624        let node1 = store.create_node(&["Person"]);
1625        let node2 = store.create_node(&["Person"]);
1626
1627        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1628        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1629        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1630        builder.advance_row();
1631
1632        let mut op = CreateEdgeOperator::new(
1633            Arc::clone(&store),
1634            MockInput::boxed(builder.finish()),
1635            0,
1636            1,
1637            "KNOWS".to_string(),
1638            vec![LogicalType::Int64, LogicalType::Int64],
1639        );
1640
1641        let _chunk = op.next().unwrap().unwrap();
1642        assert_eq!(store.edge_count(), 1);
1643    }
1644
1645    #[test]
1646    fn test_delete_node() {
1647        let store = create_test_store();
1648
1649        let node_id = store.create_node(&["Person"]);
1650        assert_eq!(store.node_count(), 1);
1651
1652        let mut op = DeleteNodeOperator::new(
1653            Arc::clone(&store),
1654            MockInput::boxed(node_id_chunk(&[node_id])),
1655            0,
1656            vec![LogicalType::Node],
1657            false,
1658        );
1659
1660        let chunk = op.next().unwrap().unwrap();
1661        // Pass-through: output row contains the original node ID
1662        assert_eq!(chunk.row_count(), 1);
1663        assert_eq!(store.node_count(), 0);
1664    }
1665
1666    // ── DeleteEdgeOperator ───────────────────────────────────────
1667
1668    #[test]
1669    fn test_delete_edge() {
1670        let store = create_test_store();
1671
1672        let n1 = store.create_node(&["Person"]);
1673        let n2 = store.create_node(&["Person"]);
1674        let eid = store.create_edge(n1, n2, "KNOWS");
1675        assert_eq!(store.edge_count(), 1);
1676
1677        let mut op = DeleteEdgeOperator::new(
1678            Arc::clone(&store),
1679            MockInput::boxed(edge_id_chunk(&[eid])),
1680            0,
1681            vec![LogicalType::Node],
1682        );
1683
1684        let chunk = op.next().unwrap().unwrap();
1685        assert_eq!(chunk.row_count(), 1);
1686        assert_eq!(store.edge_count(), 0);
1687    }
1688
1689    #[test]
1690    fn test_delete_edge_no_input_returns_none() {
1691        let store = create_test_store();
1692
1693        let mut op = DeleteEdgeOperator::new(
1694            Arc::clone(&store),
1695            Box::new(EmptyInput),
1696            0,
1697            vec![LogicalType::Int64],
1698        );
1699
1700        assert!(op.next().unwrap().is_none());
1701    }
1702
1703    #[test]
1704    fn test_delete_multiple_edges() {
1705        let store = create_test_store();
1706
1707        let n1 = store.create_node(&["N"]);
1708        let n2 = store.create_node(&["N"]);
1709        let e1 = store.create_edge(n1, n2, "R");
1710        let e2 = store.create_edge(n2, n1, "S");
1711        assert_eq!(store.edge_count(), 2);
1712
1713        let mut op = DeleteEdgeOperator::new(
1714            Arc::clone(&store),
1715            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1716            0,
1717            vec![LogicalType::Node],
1718        );
1719
1720        let chunk = op.next().unwrap().unwrap();
1721        assert_eq!(chunk.row_count(), 2);
1722        assert_eq!(store.edge_count(), 0);
1723    }
1724
1725    // ── DeleteNodeOperator with DETACH ───────────────────────────
1726
1727    #[test]
1728    fn test_delete_node_detach() {
1729        let store = create_test_store();
1730
1731        let n1 = store.create_node(&["Person"]);
1732        let n2 = store.create_node(&["Person"]);
1733        store.create_edge(n1, n2, "KNOWS");
1734        store.create_edge(n2, n1, "FOLLOWS");
1735        assert_eq!(store.edge_count(), 2);
1736
1737        let mut op = DeleteNodeOperator::new(
1738            Arc::clone(&store),
1739            MockInput::boxed(node_id_chunk(&[n1])),
1740            0,
1741            vec![LogicalType::Node],
1742            true, // detach = true
1743        );
1744
1745        let chunk = op.next().unwrap().unwrap();
1746        assert_eq!(chunk.row_count(), 1);
1747        assert_eq!(store.node_count(), 1);
1748        assert_eq!(store.edge_count(), 0); // edges detached
1749    }
1750
1751    // ── AddLabelOperator ─────────────────────────────────────────
1752
1753    #[test]
1754    fn test_add_label() {
1755        let store = create_test_store();
1756
1757        let node = store.create_node(&["Person"]);
1758
1759        let mut op = AddLabelOperator::new(
1760            Arc::clone(&store),
1761            MockInput::boxed(node_id_chunk(&[node])),
1762            0,
1763            vec!["Employee".to_string()],
1764            vec![LogicalType::Int64, LogicalType::Int64],
1765        );
1766
1767        let chunk = op.next().unwrap().unwrap();
1768        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1769        assert_eq!(updated, 1);
1770
1771        // Verify label was added
1772        let node_data = store.get_node(node).unwrap();
1773        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1774        assert!(labels.contains(&"Person"));
1775        assert!(labels.contains(&"Employee"));
1776    }
1777
1778    #[test]
1779    fn test_add_multiple_labels() {
1780        let store = create_test_store();
1781
1782        let node = store.create_node(&["Base"]);
1783
1784        let mut op = AddLabelOperator::new(
1785            Arc::clone(&store),
1786            MockInput::boxed(node_id_chunk(&[node])),
1787            0,
1788            vec!["LabelA".to_string(), "LabelB".to_string()],
1789            vec![LogicalType::Int64, LogicalType::Int64],
1790        );
1791
1792        let chunk = op.next().unwrap().unwrap();
1793        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1794        assert_eq!(updated, 2); // 2 labels added
1795
1796        let node_data = store.get_node(node).unwrap();
1797        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1798        assert!(labels.contains(&"LabelA"));
1799        assert!(labels.contains(&"LabelB"));
1800    }
1801
1802    #[test]
1803    fn test_add_label_no_input_returns_none() {
1804        let store = create_test_store();
1805
1806        let mut op = AddLabelOperator::new(
1807            Arc::clone(&store),
1808            Box::new(EmptyInput),
1809            0,
1810            vec!["Foo".to_string()],
1811            vec![LogicalType::Int64, LogicalType::Int64],
1812        );
1813
1814        assert!(op.next().unwrap().is_none());
1815    }
1816
1817    // ── RemoveLabelOperator ──────────────────────────────────────
1818
1819    #[test]
1820    fn test_remove_label() {
1821        let store = create_test_store();
1822
1823        let node = store.create_node(&["Person", "Employee"]);
1824
1825        let mut op = RemoveLabelOperator::new(
1826            Arc::clone(&store),
1827            MockInput::boxed(node_id_chunk(&[node])),
1828            0,
1829            vec!["Employee".to_string()],
1830            vec![LogicalType::Int64, LogicalType::Int64],
1831        );
1832
1833        let chunk = op.next().unwrap().unwrap();
1834        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1835        assert_eq!(updated, 1);
1836
1837        // Verify label was removed
1838        let node_data = store.get_node(node).unwrap();
1839        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1840        assert!(labels.contains(&"Person"));
1841        assert!(!labels.contains(&"Employee"));
1842    }
1843
1844    #[test]
1845    fn test_remove_nonexistent_label() {
1846        let store = create_test_store();
1847
1848        let node = store.create_node(&["Person"]);
1849
1850        let mut op = RemoveLabelOperator::new(
1851            Arc::clone(&store),
1852            MockInput::boxed(node_id_chunk(&[node])),
1853            0,
1854            vec!["NonExistent".to_string()],
1855            vec![LogicalType::Int64, LogicalType::Int64],
1856        );
1857
1858        let chunk = op.next().unwrap().unwrap();
1859        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1860        assert_eq!(updated, 0); // nothing removed
1861    }
1862
1863    // ── SetPropertyOperator ──────────────────────────────────────
1864
1865    #[test]
1866    fn test_set_node_property_constant() {
1867        let store = create_test_store();
1868
1869        let node = store.create_node(&["Person"]);
1870
1871        let mut op = SetPropertyOperator::new_for_node(
1872            Arc::clone(&store),
1873            MockInput::boxed(node_id_chunk(&[node])),
1874            0,
1875            vec![(
1876                "name".to_string(),
1877                PropertySource::Constant(Value::String("Alix".into())),
1878            )],
1879            vec![LogicalType::Int64],
1880        );
1881
1882        let chunk = op.next().unwrap().unwrap();
1883        assert_eq!(chunk.row_count(), 1);
1884
1885        // Verify property was set
1886        let node_data = store.get_node(node).unwrap();
1887        assert_eq!(
1888            node_data
1889                .properties
1890                .get(&grafeo_common::types::PropertyKey::new("name")),
1891            Some(&Value::String("Alix".into()))
1892        );
1893    }
1894
1895    #[test]
1896    fn test_set_node_property_from_column() {
1897        let store = create_test_store();
1898
1899        let node = store.create_node(&["Person"]);
1900
1901        // Input: column 0 = node ID, column 1 = property value
1902        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1903        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1904        builder
1905            .column_mut(1)
1906            .unwrap()
1907            .push_value(Value::String("Gus".into()));
1908        builder.advance_row();
1909
1910        let mut op = SetPropertyOperator::new_for_node(
1911            Arc::clone(&store),
1912            MockInput::boxed(builder.finish()),
1913            0,
1914            vec![("name".to_string(), PropertySource::Column(1))],
1915            vec![LogicalType::Int64, LogicalType::String],
1916        );
1917
1918        let chunk = op.next().unwrap().unwrap();
1919        assert_eq!(chunk.row_count(), 1);
1920
1921        let node_data = store.get_node(node).unwrap();
1922        assert_eq!(
1923            node_data
1924                .properties
1925                .get(&grafeo_common::types::PropertyKey::new("name")),
1926            Some(&Value::String("Gus".into()))
1927        );
1928    }
1929
1930    #[test]
1931    fn test_set_edge_property() {
1932        let store = create_test_store();
1933
1934        let n1 = store.create_node(&["N"]);
1935        let n2 = store.create_node(&["N"]);
1936        let eid = store.create_edge(n1, n2, "KNOWS");
1937
1938        let mut op = SetPropertyOperator::new_for_edge(
1939            Arc::clone(&store),
1940            MockInput::boxed(edge_id_chunk(&[eid])),
1941            0,
1942            vec![(
1943                "weight".to_string(),
1944                PropertySource::Constant(Value::Float64(0.75)),
1945            )],
1946            vec![LogicalType::Int64],
1947        );
1948
1949        let chunk = op.next().unwrap().unwrap();
1950        assert_eq!(chunk.row_count(), 1);
1951
1952        let edge_data = store.get_edge(eid).unwrap();
1953        assert_eq!(
1954            edge_data
1955                .properties
1956                .get(&grafeo_common::types::PropertyKey::new("weight")),
1957            Some(&Value::Float64(0.75))
1958        );
1959    }
1960
1961    #[test]
1962    fn test_set_multiple_properties() {
1963        let store = create_test_store();
1964
1965        let node = store.create_node(&["Person"]);
1966
1967        let mut op = SetPropertyOperator::new_for_node(
1968            Arc::clone(&store),
1969            MockInput::boxed(node_id_chunk(&[node])),
1970            0,
1971            vec![
1972                (
1973                    "name".to_string(),
1974                    PropertySource::Constant(Value::String("Alix".into())),
1975                ),
1976                (
1977                    "age".to_string(),
1978                    PropertySource::Constant(Value::Int64(30)),
1979                ),
1980            ],
1981            vec![LogicalType::Int64],
1982        );
1983
1984        op.next().unwrap().unwrap();
1985
1986        let node_data = store.get_node(node).unwrap();
1987        assert_eq!(
1988            node_data
1989                .properties
1990                .get(&grafeo_common::types::PropertyKey::new("name")),
1991            Some(&Value::String("Alix".into()))
1992        );
1993        assert_eq!(
1994            node_data
1995                .properties
1996                .get(&grafeo_common::types::PropertyKey::new("age")),
1997            Some(&Value::Int64(30))
1998        );
1999    }
2000
2001    #[test]
2002    fn test_set_property_no_input_returns_none() {
2003        let store = create_test_store();
2004
2005        let mut op = SetPropertyOperator::new_for_node(
2006            Arc::clone(&store),
2007            Box::new(EmptyInput),
2008            0,
2009            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2010            vec![LogicalType::Int64],
2011        );
2012
2013        assert!(op.next().unwrap().is_none());
2014    }
2015
2016    // ── Error paths ──────────────────────────────────────────────
2017
2018    #[test]
2019    fn test_delete_node_without_detach_errors_when_edges_exist() {
2020        let store = create_test_store();
2021
2022        let n1 = store.create_node(&["Person"]);
2023        let n2 = store.create_node(&["Person"]);
2024        store.create_edge(n1, n2, "KNOWS");
2025
2026        let mut op = DeleteNodeOperator::new(
2027            Arc::clone(&store),
2028            MockInput::boxed(node_id_chunk(&[n1])),
2029            0,
2030            vec![LogicalType::Int64],
2031            false, // no detach
2032        );
2033
2034        let err = op.next().unwrap_err();
2035        match err {
2036            OperatorError::ConstraintViolation(msg) => {
2037                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2038            }
2039            other => panic!("expected ConstraintViolation, got {other:?}"),
2040        }
2041        // Node should still exist
2042        assert_eq!(store.node_count(), 2);
2043    }
2044
2045    // ── CreateNodeOperator with input ───────────────────────────
2046
2047    #[test]
2048    fn test_create_node_with_input_operator() {
2049        let store = create_test_store();
2050
2051        // Seed node to provide input rows
2052        let existing = store.create_node(&["Seed"]);
2053
2054        let mut op = CreateNodeOperator::new(
2055            Arc::clone(&store),
2056            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2057            vec!["Created".to_string()],
2058            vec![(
2059                "source".to_string(),
2060                PropertySource::Constant(Value::String("from_input".into())),
2061            )],
2062            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2063            1,                                            // output column for new node ID
2064        );
2065
2066        let chunk = op.next().unwrap().unwrap();
2067        assert_eq!(chunk.row_count(), 1);
2068
2069        // Should have created one new node (2 total: Seed + Created)
2070        assert_eq!(store.node_count(), 2);
2071
2072        // Exhausted
2073        assert!(op.next().unwrap().is_none());
2074    }
2075
2076    // ── CreateEdgeOperator with properties and output column ────
2077
2078    #[test]
2079    fn test_create_edge_with_properties_and_output_column() {
2080        let store = create_test_store();
2081
2082        let n1 = store.create_node(&["Person"]);
2083        let n2 = store.create_node(&["Person"]);
2084
2085        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2086        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2087        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2088        builder.advance_row();
2089
2090        let mut op = CreateEdgeOperator::new(
2091            Arc::clone(&store),
2092            MockInput::boxed(builder.finish()),
2093            0,
2094            1,
2095            "KNOWS".to_string(),
2096            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2097        )
2098        .with_properties(vec![(
2099            "since".to_string(),
2100            PropertySource::Constant(Value::Int64(2024)),
2101        )])
2102        .with_output_column(2);
2103
2104        let chunk = op.next().unwrap().unwrap();
2105        assert_eq!(chunk.row_count(), 1);
2106        assert_eq!(store.edge_count(), 1);
2107
2108        // Verify the output chunk contains the edge ID in column 2
2109        let edge_id_raw = chunk
2110            .column(2)
2111            .and_then(|c| c.get_int64(0))
2112            .expect("edge ID should be in output column 2");
2113        let edge_id = EdgeId(edge_id_raw as u64);
2114
2115        // Verify the edge has the property
2116        let edge = store.get_edge(edge_id).expect("edge should exist");
2117        assert_eq!(
2118            edge.properties
2119                .get(&grafeo_common::types::PropertyKey::new("since")),
2120            Some(&Value::Int64(2024))
2121        );
2122    }
2123
2124    // ── SetPropertyOperator with map replacement ────────────────
2125
2126    #[test]
2127    fn test_set_property_map_replace() {
2128        use std::collections::BTreeMap;
2129
2130        let store = create_test_store();
2131
2132        let node = store.create_node(&["Person"]);
2133        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2134
2135        let mut map = BTreeMap::new();
2136        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2137
2138        let mut op = SetPropertyOperator::new_for_node(
2139            Arc::clone(&store),
2140            MockInput::boxed(node_id_chunk(&[node])),
2141            0,
2142            vec![(
2143                "*".to_string(),
2144                PropertySource::Constant(Value::Map(Arc::new(map))),
2145            )],
2146            vec![LogicalType::Int64],
2147        )
2148        .with_replace(true);
2149
2150        op.next().unwrap().unwrap();
2151
2152        let node_data = store.get_node(node).unwrap();
2153        // Old property should be gone
2154        assert!(
2155            node_data
2156                .properties
2157                .get(&PropertyKey::new("old_prop"))
2158                .is_none()
2159        );
2160        // New property should exist
2161        assert_eq!(
2162            node_data.properties.get(&PropertyKey::new("new_key")),
2163            Some(&Value::String("new_val".into()))
2164        );
2165    }
2166
2167    // ── SetPropertyOperator with map merge (no replace) ─────────
2168
2169    #[test]
2170    fn test_set_property_map_merge() {
2171        use std::collections::BTreeMap;
2172
2173        let store = create_test_store();
2174
2175        let node = store.create_node(&["Person"]);
2176        store.set_node_property(node, "existing", Value::Int64(42));
2177
2178        let mut map = BTreeMap::new();
2179        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2180
2181        let mut op = SetPropertyOperator::new_for_node(
2182            Arc::clone(&store),
2183            MockInput::boxed(node_id_chunk(&[node])),
2184            0,
2185            vec![(
2186                "*".to_string(),
2187                PropertySource::Constant(Value::Map(Arc::new(map))),
2188            )],
2189            vec![LogicalType::Int64],
2190        ); // replace defaults to false
2191
2192        op.next().unwrap().unwrap();
2193
2194        let node_data = store.get_node(node).unwrap();
2195        // Existing property should still be there
2196        assert_eq!(
2197            node_data.properties.get(&PropertyKey::new("existing")),
2198            Some(&Value::Int64(42))
2199        );
2200        // New property should also exist
2201        assert_eq!(
2202            node_data.properties.get(&PropertyKey::new("added")),
2203            Some(&Value::String("hello".into()))
2204        );
2205    }
2206
2207    // ── PropertySource::PropertyAccess ──────────────────────────
2208
2209    #[test]
2210    fn test_property_source_property_access() {
2211        let store = create_test_store();
2212
2213        let source_node = store.create_node(&["Source"]);
2214        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2215
2216        let target_node = store.create_node(&["Target"]);
2217
2218        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2219        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2220        builder.column_mut(0).unwrap().push_node_id(source_node);
2221        builder
2222            .column_mut(1)
2223            .unwrap()
2224            .push_int64(target_node.0 as i64);
2225        builder.advance_row();
2226
2227        let mut op = SetPropertyOperator::new_for_node(
2228            Arc::clone(&store),
2229            MockInput::boxed(builder.finish()),
2230            1, // entity column = target node
2231            vec![(
2232                "copied_name".to_string(),
2233                PropertySource::PropertyAccess {
2234                    column: 0,
2235                    property: "name".to_string(),
2236                },
2237            )],
2238            vec![LogicalType::Node, LogicalType::Int64],
2239        );
2240
2241        op.next().unwrap().unwrap();
2242
2243        let target_data = store.get_node(target_node).unwrap();
2244        assert_eq!(
2245            target_data.properties.get(&PropertyKey::new("copied_name")),
2246            Some(&Value::String("Alix".into()))
2247        );
2248    }
2249
2250    // ── ConstraintValidator integration ─────────────────────────
2251
2252    #[test]
2253    fn test_create_node_with_constraint_validator() {
2254        let store = create_test_store();
2255
2256        struct RejectAgeValidator;
2257        impl ConstraintValidator for RejectAgeValidator {
2258            fn validate_node_property(
2259                &self,
2260                _labels: &[String],
2261                key: &str,
2262                _value: &Value,
2263            ) -> Result<(), OperatorError> {
2264                if key == "forbidden" {
2265                    return Err(OperatorError::ConstraintViolation(
2266                        "property 'forbidden' is not allowed".to_string(),
2267                    ));
2268                }
2269                Ok(())
2270            }
2271            fn validate_node_complete(
2272                &self,
2273                _labels: &[String],
2274                _properties: &[(String, Value)],
2275            ) -> Result<(), OperatorError> {
2276                Ok(())
2277            }
2278            fn check_unique_node_property(
2279                &self,
2280                _labels: &[String],
2281                _key: &str,
2282                _value: &Value,
2283            ) -> Result<(), OperatorError> {
2284                Ok(())
2285            }
2286            fn validate_edge_property(
2287                &self,
2288                _edge_type: &str,
2289                _key: &str,
2290                _value: &Value,
2291            ) -> Result<(), OperatorError> {
2292                Ok(())
2293            }
2294            fn validate_edge_complete(
2295                &self,
2296                _edge_type: &str,
2297                _properties: &[(String, Value)],
2298            ) -> Result<(), OperatorError> {
2299                Ok(())
2300            }
2301        }
2302
2303        // Valid property should succeed
2304        let mut op = CreateNodeOperator::new(
2305            Arc::clone(&store),
2306            None,
2307            vec!["Thing".to_string()],
2308            vec![(
2309                "name".to_string(),
2310                PropertySource::Constant(Value::String("ok".into())),
2311            )],
2312            vec![LogicalType::Int64],
2313            0,
2314        )
2315        .with_validator(Arc::new(RejectAgeValidator));
2316
2317        assert!(op.next().is_ok());
2318        assert_eq!(store.node_count(), 1);
2319
2320        // Forbidden property should fail
2321        let mut op = CreateNodeOperator::new(
2322            Arc::clone(&store),
2323            None,
2324            vec!["Thing".to_string()],
2325            vec![(
2326                "forbidden".to_string(),
2327                PropertySource::Constant(Value::Int64(1)),
2328            )],
2329            vec![LogicalType::Int64],
2330            0,
2331        )
2332        .with_validator(Arc::new(RejectAgeValidator));
2333
2334        let err = op.next().unwrap_err();
2335        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2336        // Node count should still be 2 (the node is created before validation, but the error
2337        // propagates - this tests the validation logic fires)
2338    }
2339
2340    // ── Reset behavior ──────────────────────────────────────────
2341
2342    #[test]
2343    fn test_create_node_reset_allows_re_execution() {
2344        let store = create_test_store();
2345
2346        let mut op = CreateNodeOperator::new(
2347            Arc::clone(&store),
2348            None,
2349            vec!["Person".to_string()],
2350            vec![],
2351            vec![LogicalType::Int64],
2352            0,
2353        );
2354
2355        // First execution
2356        assert!(op.next().unwrap().is_some());
2357        assert!(op.next().unwrap().is_none());
2358
2359        // Reset and re-execute
2360        op.reset();
2361        assert!(op.next().unwrap().is_some());
2362
2363        assert_eq!(store.node_count(), 2);
2364    }
2365
2366    // ── Operator name() ──────────────────────────────────────────
2367
2368    #[test]
2369    fn test_operator_names() {
2370        let store = create_test_store();
2371
2372        let op = CreateNodeOperator::new(
2373            Arc::clone(&store),
2374            None,
2375            vec![],
2376            vec![],
2377            vec![LogicalType::Int64],
2378            0,
2379        );
2380        assert_eq!(op.name(), "CreateNode");
2381
2382        let op = CreateEdgeOperator::new(
2383            Arc::clone(&store),
2384            Box::new(EmptyInput),
2385            0,
2386            1,
2387            "R".to_string(),
2388            vec![LogicalType::Int64],
2389        );
2390        assert_eq!(op.name(), "CreateEdge");
2391
2392        let op = DeleteNodeOperator::new(
2393            Arc::clone(&store),
2394            Box::new(EmptyInput),
2395            0,
2396            vec![LogicalType::Int64],
2397            false,
2398        );
2399        assert_eq!(op.name(), "DeleteNode");
2400
2401        let op = DeleteEdgeOperator::new(
2402            Arc::clone(&store),
2403            Box::new(EmptyInput),
2404            0,
2405            vec![LogicalType::Int64],
2406        );
2407        assert_eq!(op.name(), "DeleteEdge");
2408
2409        let op = AddLabelOperator::new(
2410            Arc::clone(&store),
2411            Box::new(EmptyInput),
2412            0,
2413            vec!["L".to_string()],
2414            vec![LogicalType::Int64],
2415        );
2416        assert_eq!(op.name(), "AddLabel");
2417
2418        let op = RemoveLabelOperator::new(
2419            Arc::clone(&store),
2420            Box::new(EmptyInput),
2421            0,
2422            vec!["L".to_string()],
2423            vec![LogicalType::Int64],
2424        );
2425        assert_eq!(op.name(), "RemoveLabel");
2426
2427        let op = SetPropertyOperator::new_for_node(
2428            Arc::clone(&store),
2429            Box::new(EmptyInput),
2430            0,
2431            vec![],
2432            vec![LogicalType::Int64],
2433        );
2434        assert_eq!(op.name(), "SetProperty");
2435    }
2436}