Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    fn validate_node_property(
28        &self,
29        labels: &[String],
30        key: &str,
31        value: &Value,
32    ) -> Result<(), OperatorError>;
33
34    /// Validates that all required properties are present after creating a node.
35    ///
36    /// Checks NOT NULL constraints for properties that were not explicitly set.
37    fn validate_node_complete(
38        &self,
39        labels: &[String],
40        properties: &[(String, Value)],
41    ) -> Result<(), OperatorError>;
42
43    /// Checks UNIQUE constraint for a node property value.
44    ///
45    /// Returns an error if a node with the same label already has this value.
46    fn check_unique_node_property(
47        &self,
48        labels: &[String],
49        key: &str,
50        value: &Value,
51    ) -> Result<(), OperatorError>;
52
53    /// Validates a single property value for an edge of the given type.
54    fn validate_edge_property(
55        &self,
56        edge_type: &str,
57        key: &str,
58        value: &Value,
59    ) -> Result<(), OperatorError>;
60
61    /// Validates that all required properties are present after creating an edge.
62    fn validate_edge_complete(
63        &self,
64        edge_type: &str,
65        properties: &[(String, Value)],
66    ) -> Result<(), OperatorError>;
67}
68
69/// Operator that creates new nodes.
70///
71/// For each input row, creates a new node with the specified labels
72/// and properties, then outputs the row with the new node.
73pub struct CreateNodeOperator {
74    /// The graph store to modify.
75    store: Arc<dyn GraphStoreMut>,
76    /// Input operator.
77    input: Option<Box<dyn Operator>>,
78    /// Labels for the new nodes.
79    labels: Vec<String>,
80    /// Properties to set (name -> column index or constant value).
81    properties: Vec<(String, PropertySource)>,
82    /// Output schema.
83    output_schema: Vec<LogicalType>,
84    /// Column index for the created node variable.
85    output_column: usize,
86    /// Whether this operator has been executed (for no-input case).
87    executed: bool,
88    /// Epoch for MVCC versioning.
89    viewing_epoch: Option<EpochId>,
90    /// Transaction ID for MVCC versioning.
91    transaction_id: Option<TransactionId>,
92    /// Optional constraint validator for schema enforcement.
93    validator: Option<Arc<dyn ConstraintValidator>>,
94}
95
96/// Source for a property value.
97#[derive(Debug, Clone)]
98pub enum PropertySource {
99    /// Get value from an input column.
100    Column(usize),
101    /// Use a constant value.
102    Constant(Value),
103    /// Access a named property from a map/node/edge in an input column.
104    PropertyAccess {
105        /// The column containing the map, node ID, or edge ID.
106        column: usize,
107        /// The property name to extract.
108        property: String,
109    },
110}
111
112impl PropertySource {
113    /// Resolves a property value from a data chunk row.
114    pub fn resolve(
115        &self,
116        chunk: &crate::execution::chunk::DataChunk,
117        row: usize,
118        store: &dyn GraphStore,
119    ) -> Value {
120        match self {
121            PropertySource::Column(col_idx) => chunk
122                .column(*col_idx)
123                .and_then(|c| c.get_value(row))
124                .unwrap_or(Value::Null),
125            PropertySource::Constant(v) => v.clone(),
126            PropertySource::PropertyAccess { column, property } => {
127                let Some(col) = chunk.column(*column) else {
128                    return Value::Null;
129                };
130                // Try node ID first, then edge ID, then map value
131                if let Some(node_id) = col.get_node_id(row) {
132                    store
133                        .get_node(node_id)
134                        .and_then(|node| node.get_property(property).cloned())
135                        .unwrap_or(Value::Null)
136                } else if let Some(edge_id) = col.get_edge_id(row) {
137                    store
138                        .get_edge(edge_id)
139                        .and_then(|edge| edge.get_property(property).cloned())
140                        .unwrap_or(Value::Null)
141                } else if let Some(Value::Map(map)) = col.get_value(row) {
142                    let key = PropertyKey::new(property);
143                    map.get(&key).cloned().unwrap_or(Value::Null)
144                } else {
145                    Value::Null
146                }
147            }
148        }
149    }
150}
151
152impl CreateNodeOperator {
153    /// Creates a new node creation operator.
154    ///
155    /// # Arguments
156    /// * `store` - The graph store to modify.
157    /// * `input` - Optional input operator (None for standalone CREATE).
158    /// * `labels` - Labels to assign to created nodes.
159    /// * `properties` - Properties to set on created nodes.
160    /// * `output_schema` - Schema of the output.
161    /// * `output_column` - Column index where the created node ID goes.
162    pub fn new(
163        store: Arc<dyn GraphStoreMut>,
164        input: Option<Box<dyn Operator>>,
165        labels: Vec<String>,
166        properties: Vec<(String, PropertySource)>,
167        output_schema: Vec<LogicalType>,
168        output_column: usize,
169    ) -> Self {
170        Self {
171            store,
172            input,
173            labels,
174            properties,
175            output_schema,
176            output_column,
177            executed: false,
178            viewing_epoch: None,
179            transaction_id: None,
180            validator: None,
181        }
182    }
183
184    /// Sets the transaction context for MVCC versioning.
185    pub fn with_transaction_context(
186        mut self,
187        epoch: EpochId,
188        transaction_id: Option<TransactionId>,
189    ) -> Self {
190        self.viewing_epoch = Some(epoch);
191        self.transaction_id = transaction_id;
192        self
193    }
194
195    /// Sets the constraint validator for schema enforcement.
196    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
197        self.validator = Some(validator);
198        self
199    }
200}
201
202impl CreateNodeOperator {
203    /// Validates and sets properties on a newly created node.
204    fn validate_and_set_properties(
205        &self,
206        node_id: NodeId,
207        resolved_props: &[(String, Value)],
208    ) -> Result<(), OperatorError> {
209        // Phase 1: Validate each property value
210        if let Some(ref validator) = self.validator {
211            for (name, value) in resolved_props {
212                validator.validate_node_property(&self.labels, name, value)?;
213                validator.check_unique_node_property(&self.labels, name, value)?;
214            }
215            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
216            validator.validate_node_complete(&self.labels, resolved_props)?;
217        }
218
219        // Phase 3: Write properties to the store
220        for (name, value) in resolved_props {
221            self.store.set_node_property(node_id, name, value.clone());
222        }
223        Ok(())
224    }
225}
226
227impl Operator for CreateNodeOperator {
228    fn next(&mut self) -> OperatorResult {
229        // Get transaction context for versioned creation
230        let epoch = self
231            .viewing_epoch
232            .unwrap_or_else(|| self.store.current_epoch());
233        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
234
235        if let Some(ref mut input) = self.input {
236            // For each input row, create a node
237            if let Some(chunk) = input.next()? {
238                let mut builder =
239                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
240
241                for row in chunk.selected_indices() {
242                    // Resolve all property values first (before creating node)
243                    let resolved_props: Vec<(String, Value)> = self
244                        .properties
245                        .iter()
246                        .map(|(name, source)| {
247                            let value =
248                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
249                            (name.clone(), value)
250                        })
251                        .collect();
252
253                    // Create the node with MVCC versioning
254                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
255                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
256
257                    // Validate and set properties
258                    self.validate_and_set_properties(node_id, &resolved_props)?;
259
260                    // Copy input columns to output
261                    for col_idx in 0..chunk.column_count() {
262                        if col_idx < self.output_column
263                            && let (Some(src), Some(dst)) =
264                                (chunk.column(col_idx), builder.column_mut(col_idx))
265                        {
266                            if let Some(val) = src.get_value(row) {
267                                dst.push_value(val);
268                            } else {
269                                dst.push_value(Value::Null);
270                            }
271                        }
272                    }
273
274                    // Add the new node ID
275                    if let Some(dst) = builder.column_mut(self.output_column) {
276                        dst.push_value(Value::Int64(node_id.0 as i64));
277                    }
278
279                    builder.advance_row();
280                }
281
282                return Ok(Some(builder.finish()));
283            }
284            Ok(None)
285        } else {
286            // No input - create a single node
287            if self.executed {
288                return Ok(None);
289            }
290            self.executed = true;
291
292            // Resolve constant properties
293            let resolved_props: Vec<(String, Value)> = self
294                .properties
295                .iter()
296                .filter_map(|(name, source)| {
297                    if let PropertySource::Constant(value) = source {
298                        Some((name.clone(), value.clone()))
299                    } else {
300                        None
301                    }
302                })
303                .collect();
304
305            // Create the node with MVCC versioning
306            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
307            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
308
309            // Validate and set properties
310            self.validate_and_set_properties(node_id, &resolved_props)?;
311
312            // Build output chunk with just the node ID
313            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
314            if let Some(dst) = builder.column_mut(self.output_column) {
315                dst.push_value(Value::Int64(node_id.0 as i64));
316            }
317            builder.advance_row();
318
319            Ok(Some(builder.finish()))
320        }
321    }
322
323    fn reset(&mut self) {
324        if let Some(ref mut input) = self.input {
325            input.reset();
326        }
327        self.executed = false;
328    }
329
330    fn name(&self) -> &'static str {
331        "CreateNode"
332    }
333}
334
335/// Operator that creates new edges.
336pub struct CreateEdgeOperator {
337    /// The graph store to modify.
338    store: Arc<dyn GraphStoreMut>,
339    /// Input operator.
340    input: Box<dyn Operator>,
341    /// Column index for the source node.
342    from_column: usize,
343    /// Column index for the target node.
344    to_column: usize,
345    /// Edge type.
346    edge_type: String,
347    /// Properties to set.
348    properties: Vec<(String, PropertySource)>,
349    /// Output schema.
350    output_schema: Vec<LogicalType>,
351    /// Column index for the created edge variable (if any).
352    output_column: Option<usize>,
353    /// Epoch for MVCC versioning.
354    viewing_epoch: Option<EpochId>,
355    /// Transaction ID for MVCC versioning.
356    transaction_id: Option<TransactionId>,
357    /// Optional constraint validator for schema enforcement.
358    validator: Option<Arc<dyn ConstraintValidator>>,
359}
360
361impl CreateEdgeOperator {
362    /// Creates a new edge creation operator.
363    ///
364    /// Use builder methods to set additional options:
365    /// - [`with_properties`](Self::with_properties) - set edge properties
366    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
367    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
368    pub fn new(
369        store: Arc<dyn GraphStoreMut>,
370        input: Box<dyn Operator>,
371        from_column: usize,
372        to_column: usize,
373        edge_type: String,
374        output_schema: Vec<LogicalType>,
375    ) -> Self {
376        Self {
377            store,
378            input,
379            from_column,
380            to_column,
381            edge_type,
382            properties: Vec::new(),
383            output_schema,
384            output_column: None,
385            viewing_epoch: None,
386            transaction_id: None,
387            validator: None,
388        }
389    }
390
391    /// Sets the properties to assign to created edges.
392    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
393        self.properties = properties;
394        self
395    }
396
397    /// Sets the output column for the created edge ID.
398    pub fn with_output_column(mut self, column: usize) -> Self {
399        self.output_column = Some(column);
400        self
401    }
402
403    /// Sets the transaction context for MVCC versioning.
404    pub fn with_transaction_context(
405        mut self,
406        epoch: EpochId,
407        transaction_id: Option<TransactionId>,
408    ) -> Self {
409        self.viewing_epoch = Some(epoch);
410        self.transaction_id = transaction_id;
411        self
412    }
413
414    /// Sets the constraint validator for schema enforcement.
415    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
416        self.validator = Some(validator);
417        self
418    }
419}
420
421impl Operator for CreateEdgeOperator {
422    fn next(&mut self) -> OperatorResult {
423        // Get transaction context for versioned creation
424        let epoch = self
425            .viewing_epoch
426            .unwrap_or_else(|| self.store.current_epoch());
427        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
428
429        if let Some(chunk) = self.input.next()? {
430            let mut builder =
431                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
432
433            for row in chunk.selected_indices() {
434                // Get source and target node IDs
435                let from_id = chunk
436                    .column(self.from_column)
437                    .and_then(|c| c.get_value(row))
438                    .ok_or_else(|| {
439                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
440                    })?;
441
442                let to_id = chunk
443                    .column(self.to_column)
444                    .and_then(|c| c.get_value(row))
445                    .ok_or_else(|| {
446                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
447                    })?;
448
449                // Extract node IDs
450                let from_node_id = match from_id {
451                    Value::Int64(id) => NodeId(id as u64),
452                    _ => {
453                        return Err(OperatorError::TypeMismatch {
454                            expected: "Int64 (node ID)".to_string(),
455                            found: format!("{from_id:?}"),
456                        });
457                    }
458                };
459
460                let to_node_id = match to_id {
461                    Value::Int64(id) => NodeId(id as u64),
462                    _ => {
463                        return Err(OperatorError::TypeMismatch {
464                            expected: "Int64 (node ID)".to_string(),
465                            found: format!("{to_id:?}"),
466                        });
467                    }
468                };
469
470                // Resolve property values
471                let resolved_props: Vec<(String, Value)> = self
472                    .properties
473                    .iter()
474                    .map(|(name, source)| {
475                        let value =
476                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
477                        (name.clone(), value)
478                    })
479                    .collect();
480
481                // Validate constraints before writing
482                if let Some(ref validator) = self.validator {
483                    for (name, value) in &resolved_props {
484                        validator.validate_edge_property(&self.edge_type, name, value)?;
485                    }
486                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
487                }
488
489                // Create the edge with MVCC versioning
490                let edge_id = self.store.create_edge_versioned(
491                    from_node_id,
492                    to_node_id,
493                    &self.edge_type,
494                    epoch,
495                    tx,
496                );
497
498                // Set properties
499                for (name, value) in resolved_props {
500                    self.store.set_edge_property(edge_id, &name, value);
501                }
502
503                // Copy input columns
504                for col_idx in 0..chunk.column_count() {
505                    if let (Some(src), Some(dst)) =
506                        (chunk.column(col_idx), builder.column_mut(col_idx))
507                    {
508                        if let Some(val) = src.get_value(row) {
509                            dst.push_value(val);
510                        } else {
511                            dst.push_value(Value::Null);
512                        }
513                    }
514                }
515
516                // Add edge ID if requested
517                if let Some(out_col) = self.output_column
518                    && let Some(dst) = builder.column_mut(out_col)
519                {
520                    dst.push_value(Value::Int64(edge_id.0 as i64));
521                }
522
523                builder.advance_row();
524            }
525
526            return Ok(Some(builder.finish()));
527        }
528        Ok(None)
529    }
530
531    fn reset(&mut self) {
532        self.input.reset();
533    }
534
535    fn name(&self) -> &'static str {
536        "CreateEdge"
537    }
538}
539
540/// Operator that deletes nodes.
541pub struct DeleteNodeOperator {
542    /// The graph store to modify.
543    store: Arc<dyn GraphStoreMut>,
544    /// Input operator.
545    input: Box<dyn Operator>,
546    /// Column index for the node to delete.
547    node_column: usize,
548    /// Output schema.
549    output_schema: Vec<LogicalType>,
550    /// Whether to detach (delete connected edges) before deleting.
551    detach: bool,
552    /// Epoch for MVCC versioning.
553    viewing_epoch: Option<EpochId>,
554    /// Transaction ID for MVCC versioning.
555    transaction_id: Option<TransactionId>,
556}
557
558impl DeleteNodeOperator {
559    /// Creates a new node deletion operator.
560    pub fn new(
561        store: Arc<dyn GraphStoreMut>,
562        input: Box<dyn Operator>,
563        node_column: usize,
564        output_schema: Vec<LogicalType>,
565        detach: bool,
566    ) -> Self {
567        Self {
568            store,
569            input,
570            node_column,
571            output_schema,
572            detach,
573            viewing_epoch: None,
574            transaction_id: None,
575        }
576    }
577
578    /// Sets the transaction context for MVCC versioning.
579    pub fn with_transaction_context(
580        mut self,
581        epoch: EpochId,
582        transaction_id: Option<TransactionId>,
583    ) -> Self {
584        self.viewing_epoch = Some(epoch);
585        self.transaction_id = transaction_id;
586        self
587    }
588}
589
590impl Operator for DeleteNodeOperator {
591    fn next(&mut self) -> OperatorResult {
592        // Get transaction context for versioned deletion
593        let epoch = self
594            .viewing_epoch
595            .unwrap_or_else(|| self.store.current_epoch());
596        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
597
598        if let Some(chunk) = self.input.next()? {
599            let mut builder =
600                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
601
602            for row in chunk.selected_indices() {
603                let node_val = chunk
604                    .column(self.node_column)
605                    .and_then(|c| c.get_value(row))
606                    .ok_or_else(|| {
607                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
608                    })?;
609
610                let node_id = match node_val {
611                    Value::Int64(id) => NodeId(id as u64),
612                    _ => {
613                        return Err(OperatorError::TypeMismatch {
614                            expected: "Int64 (node ID)".to_string(),
615                            found: format!("{node_val:?}"),
616                        });
617                    }
618                };
619
620                if self.detach {
621                    // Delete all connected edges first
622                    self.store.delete_node_edges(node_id);
623                } else {
624                    // NODETACH: check that node has no connected edges
625                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
626                    if degree > 0 {
627                        return Err(OperatorError::ConstraintViolation(format!(
628                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
629                            degree
630                        )));
631                    }
632                }
633
634                // Delete the node with MVCC versioning
635                self.store.delete_node_versioned(node_id, epoch, tx);
636
637                // Pass through all input columns so downstream RETURN can
638                // reference the variable (e.g., count(n) after DELETE n).
639                for col_idx in 0..chunk.column_count() {
640                    if let (Some(src), Some(dst)) =
641                        (chunk.column(col_idx), builder.column_mut(col_idx))
642                    {
643                        if let Some(val) = src.get_value(row) {
644                            dst.push_value(val);
645                        } else {
646                            dst.push_value(Value::Null);
647                        }
648                    }
649                }
650                builder.advance_row();
651            }
652
653            return Ok(Some(builder.finish()));
654        }
655        Ok(None)
656    }
657
658    fn reset(&mut self) {
659        self.input.reset();
660    }
661
662    fn name(&self) -> &'static str {
663        "DeleteNode"
664    }
665}
666
667/// Operator that deletes edges.
668pub struct DeleteEdgeOperator {
669    /// The graph store to modify.
670    store: Arc<dyn GraphStoreMut>,
671    /// Input operator.
672    input: Box<dyn Operator>,
673    /// Column index for the edge to delete.
674    edge_column: usize,
675    /// Output schema.
676    output_schema: Vec<LogicalType>,
677    /// Epoch for MVCC versioning.
678    viewing_epoch: Option<EpochId>,
679    /// Transaction ID for MVCC versioning.
680    transaction_id: Option<TransactionId>,
681}
682
683impl DeleteEdgeOperator {
684    /// Creates a new edge deletion operator.
685    pub fn new(
686        store: Arc<dyn GraphStoreMut>,
687        input: Box<dyn Operator>,
688        edge_column: usize,
689        output_schema: Vec<LogicalType>,
690    ) -> Self {
691        Self {
692            store,
693            input,
694            edge_column,
695            output_schema,
696            viewing_epoch: None,
697            transaction_id: None,
698        }
699    }
700
701    /// Sets the transaction context for MVCC versioning.
702    pub fn with_transaction_context(
703        mut self,
704        epoch: EpochId,
705        transaction_id: Option<TransactionId>,
706    ) -> Self {
707        self.viewing_epoch = Some(epoch);
708        self.transaction_id = transaction_id;
709        self
710    }
711}
712
713impl Operator for DeleteEdgeOperator {
714    fn next(&mut self) -> OperatorResult {
715        // Get transaction context for versioned deletion
716        let epoch = self
717            .viewing_epoch
718            .unwrap_or_else(|| self.store.current_epoch());
719        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
720
721        if let Some(chunk) = self.input.next()? {
722            let mut builder =
723                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
724
725            for row in chunk.selected_indices() {
726                let edge_val = chunk
727                    .column(self.edge_column)
728                    .and_then(|c| c.get_value(row))
729                    .ok_or_else(|| {
730                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
731                    })?;
732
733                let edge_id = match edge_val {
734                    Value::Int64(id) => EdgeId(id as u64),
735                    _ => {
736                        return Err(OperatorError::TypeMismatch {
737                            expected: "Int64 (edge ID)".to_string(),
738                            found: format!("{edge_val:?}"),
739                        });
740                    }
741                };
742
743                // Delete the edge with MVCC versioning
744                self.store.delete_edge_versioned(edge_id, epoch, tx);
745
746                // Pass through all input columns
747                for col_idx in 0..chunk.column_count() {
748                    if let (Some(src), Some(dst)) =
749                        (chunk.column(col_idx), builder.column_mut(col_idx))
750                    {
751                        if let Some(val) = src.get_value(row) {
752                            dst.push_value(val);
753                        } else {
754                            dst.push_value(Value::Null);
755                        }
756                    }
757                }
758                builder.advance_row();
759            }
760
761            return Ok(Some(builder.finish()));
762        }
763        Ok(None)
764    }
765
766    fn reset(&mut self) {
767        self.input.reset();
768    }
769
770    fn name(&self) -> &'static str {
771        "DeleteEdge"
772    }
773}
774
775/// Operator that adds labels to nodes.
776pub struct AddLabelOperator {
777    /// The graph store.
778    store: Arc<dyn GraphStoreMut>,
779    /// Child operator providing nodes.
780    input: Box<dyn Operator>,
781    /// Column index containing node IDs.
782    node_column: usize,
783    /// Labels to add.
784    labels: Vec<String>,
785    /// Output schema.
786    output_schema: Vec<LogicalType>,
787    /// Epoch for MVCC versioning.
788    viewing_epoch: Option<EpochId>,
789    /// Transaction ID for undo log tracking.
790    transaction_id: Option<TransactionId>,
791}
792
793impl AddLabelOperator {
794    /// Creates a new add label operator.
795    pub fn new(
796        store: Arc<dyn GraphStoreMut>,
797        input: Box<dyn Operator>,
798        node_column: usize,
799        labels: Vec<String>,
800        output_schema: Vec<LogicalType>,
801    ) -> Self {
802        Self {
803            store,
804            input,
805            node_column,
806            labels,
807            output_schema,
808            viewing_epoch: None,
809            transaction_id: None,
810        }
811    }
812
813    /// Sets the transaction context for versioned label mutations.
814    pub fn with_transaction_context(
815        mut self,
816        epoch: EpochId,
817        transaction_id: Option<TransactionId>,
818    ) -> Self {
819        self.viewing_epoch = Some(epoch);
820        self.transaction_id = transaction_id;
821        self
822    }
823}
824
825impl Operator for AddLabelOperator {
826    fn next(&mut self) -> OperatorResult {
827        if let Some(chunk) = self.input.next()? {
828            let mut updated_count = 0;
829
830            for row in chunk.selected_indices() {
831                let node_val = chunk
832                    .column(self.node_column)
833                    .and_then(|c| c.get_value(row))
834                    .ok_or_else(|| {
835                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
836                    })?;
837
838                let node_id = match node_val {
839                    Value::Int64(id) => NodeId(id as u64),
840                    _ => {
841                        return Err(OperatorError::TypeMismatch {
842                            expected: "Int64 (node ID)".to_string(),
843                            found: format!("{node_val:?}"),
844                        });
845                    }
846                };
847
848                // Add all labels
849                for label in &self.labels {
850                    let added = if let Some(tid) = self.transaction_id {
851                        self.store.add_label_versioned(node_id, label, tid)
852                    } else {
853                        self.store.add_label(node_id, label)
854                    };
855                    if added {
856                        updated_count += 1;
857                    }
858                }
859            }
860
861            // Return a chunk with the update count
862            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
863            if let Some(dst) = builder.column_mut(0) {
864                dst.push_value(Value::Int64(updated_count));
865            }
866            builder.advance_row();
867
868            return Ok(Some(builder.finish()));
869        }
870        Ok(None)
871    }
872
873    fn reset(&mut self) {
874        self.input.reset();
875    }
876
877    fn name(&self) -> &'static str {
878        "AddLabel"
879    }
880}
881
882/// Operator that removes labels from nodes.
883pub struct RemoveLabelOperator {
884    /// The graph store.
885    store: Arc<dyn GraphStoreMut>,
886    /// Child operator providing nodes.
887    input: Box<dyn Operator>,
888    /// Column index containing node IDs.
889    node_column: usize,
890    /// Labels to remove.
891    labels: Vec<String>,
892    /// Output schema.
893    output_schema: Vec<LogicalType>,
894    /// Epoch for MVCC versioning.
895    viewing_epoch: Option<EpochId>,
896    /// Transaction ID for undo log tracking.
897    transaction_id: Option<TransactionId>,
898}
899
900impl RemoveLabelOperator {
901    /// Creates a new remove label operator.
902    pub fn new(
903        store: Arc<dyn GraphStoreMut>,
904        input: Box<dyn Operator>,
905        node_column: usize,
906        labels: Vec<String>,
907        output_schema: Vec<LogicalType>,
908    ) -> Self {
909        Self {
910            store,
911            input,
912            node_column,
913            labels,
914            output_schema,
915            viewing_epoch: None,
916            transaction_id: None,
917        }
918    }
919
920    /// Sets the transaction context for versioned label mutations.
921    pub fn with_transaction_context(
922        mut self,
923        epoch: EpochId,
924        transaction_id: Option<TransactionId>,
925    ) -> Self {
926        self.viewing_epoch = Some(epoch);
927        self.transaction_id = transaction_id;
928        self
929    }
930}
931
932impl Operator for RemoveLabelOperator {
933    fn next(&mut self) -> OperatorResult {
934        if let Some(chunk) = self.input.next()? {
935            let mut updated_count = 0;
936
937            for row in chunk.selected_indices() {
938                let node_val = chunk
939                    .column(self.node_column)
940                    .and_then(|c| c.get_value(row))
941                    .ok_or_else(|| {
942                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
943                    })?;
944
945                let node_id = match node_val {
946                    Value::Int64(id) => NodeId(id as u64),
947                    _ => {
948                        return Err(OperatorError::TypeMismatch {
949                            expected: "Int64 (node ID)".to_string(),
950                            found: format!("{node_val:?}"),
951                        });
952                    }
953                };
954
955                // Remove all labels
956                for label in &self.labels {
957                    let removed = if let Some(tid) = self.transaction_id {
958                        self.store.remove_label_versioned(node_id, label, tid)
959                    } else {
960                        self.store.remove_label(node_id, label)
961                    };
962                    if removed {
963                        updated_count += 1;
964                    }
965                }
966            }
967
968            // Return a chunk with the update count
969            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
970            if let Some(dst) = builder.column_mut(0) {
971                dst.push_value(Value::Int64(updated_count));
972            }
973            builder.advance_row();
974
975            return Ok(Some(builder.finish()));
976        }
977        Ok(None)
978    }
979
980    fn reset(&mut self) {
981        self.input.reset();
982    }
983
984    fn name(&self) -> &'static str {
985        "RemoveLabel"
986    }
987}
988
989/// Operator that sets properties on nodes or edges.
990///
991/// This operator reads node/edge IDs from a column and sets the
992/// specified properties on each entity.
993pub struct SetPropertyOperator {
994    /// The graph store.
995    store: Arc<dyn GraphStoreMut>,
996    /// Child operator providing entities.
997    input: Box<dyn Operator>,
998    /// Column index containing entity IDs (node or edge).
999    entity_column: usize,
1000    /// Whether the entity is an edge (false = node).
1001    is_edge: bool,
1002    /// Properties to set (name -> source).
1003    properties: Vec<(String, PropertySource)>,
1004    /// Output schema.
1005    output_schema: Vec<LogicalType>,
1006    /// Whether to replace all properties (true) or merge (false) for map assignments.
1007    replace: bool,
1008    /// Optional constraint validator for schema enforcement.
1009    validator: Option<Arc<dyn ConstraintValidator>>,
1010    /// Entity labels (for node constraint validation).
1011    labels: Vec<String>,
1012    /// Edge type (for edge constraint validation).
1013    edge_type_name: Option<String>,
1014    /// Epoch for MVCC versioning.
1015    viewing_epoch: Option<EpochId>,
1016    /// Transaction ID for undo log tracking.
1017    transaction_id: Option<TransactionId>,
1018}
1019
1020impl SetPropertyOperator {
1021    /// Creates a new set property operator for nodes.
1022    pub fn new_for_node(
1023        store: Arc<dyn GraphStoreMut>,
1024        input: Box<dyn Operator>,
1025        node_column: usize,
1026        properties: Vec<(String, PropertySource)>,
1027        output_schema: Vec<LogicalType>,
1028    ) -> Self {
1029        Self {
1030            store,
1031            input,
1032            entity_column: node_column,
1033            is_edge: false,
1034            properties,
1035            output_schema,
1036            replace: false,
1037            validator: None,
1038            labels: Vec::new(),
1039            edge_type_name: None,
1040            viewing_epoch: None,
1041            transaction_id: None,
1042        }
1043    }
1044
1045    /// Creates a new set property operator for edges.
1046    pub fn new_for_edge(
1047        store: Arc<dyn GraphStoreMut>,
1048        input: Box<dyn Operator>,
1049        edge_column: usize,
1050        properties: Vec<(String, PropertySource)>,
1051        output_schema: Vec<LogicalType>,
1052    ) -> Self {
1053        Self {
1054            store,
1055            input,
1056            entity_column: edge_column,
1057            is_edge: true,
1058            properties,
1059            output_schema,
1060            replace: false,
1061            validator: None,
1062            labels: Vec::new(),
1063            edge_type_name: None,
1064            viewing_epoch: None,
1065            transaction_id: None,
1066        }
1067    }
1068
1069    /// Sets whether this operator replaces all properties (for map assignment).
1070    pub fn with_replace(mut self, replace: bool) -> Self {
1071        self.replace = replace;
1072        self
1073    }
1074
1075    /// Sets the constraint validator for schema enforcement.
1076    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1077        self.validator = Some(validator);
1078        self
1079    }
1080
1081    /// Sets the entity labels (for node constraint validation).
1082    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1083        self.labels = labels;
1084        self
1085    }
1086
1087    /// Sets the edge type name (for edge constraint validation).
1088    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1089        self.edge_type_name = Some(edge_type);
1090        self
1091    }
1092
1093    /// Sets the transaction context for versioned property mutations.
1094    ///
1095    /// When a transaction ID is provided, property changes are recorded in
1096    /// an undo log so they can be restored on rollback.
1097    pub fn with_transaction_context(
1098        mut self,
1099        epoch: EpochId,
1100        transaction_id: Option<TransactionId>,
1101    ) -> Self {
1102        self.viewing_epoch = Some(epoch);
1103        self.transaction_id = transaction_id;
1104        self
1105    }
1106}
1107
1108impl Operator for SetPropertyOperator {
1109    fn next(&mut self) -> OperatorResult {
1110        if let Some(chunk) = self.input.next()? {
1111            let mut builder =
1112                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1113
1114            for row in chunk.selected_indices() {
1115                let entity_val = chunk
1116                    .column(self.entity_column)
1117                    .and_then(|c| c.get_value(row))
1118                    .ok_or_else(|| {
1119                        OperatorError::ColumnNotFound(format!(
1120                            "entity column {}",
1121                            self.entity_column
1122                        ))
1123                    })?;
1124
1125                let entity_id = match entity_val {
1126                    Value::Int64(id) => id as u64,
1127                    _ => {
1128                        return Err(OperatorError::TypeMismatch {
1129                            expected: "Int64 (entity ID)".to_string(),
1130                            found: format!("{entity_val:?}"),
1131                        });
1132                    }
1133                };
1134
1135                // Resolve all property values
1136                let resolved_props: Vec<(String, Value)> = self
1137                    .properties
1138                    .iter()
1139                    .map(|(name, source)| {
1140                        let value =
1141                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1142                        (name.clone(), value)
1143                    })
1144                    .collect();
1145
1146                // Validate constraints before writing
1147                if let Some(ref validator) = self.validator {
1148                    if self.is_edge {
1149                        if let Some(ref et) = self.edge_type_name {
1150                            for (name, value) in &resolved_props {
1151                                validator.validate_edge_property(et, name, value)?;
1152                            }
1153                        }
1154                    } else {
1155                        for (name, value) in &resolved_props {
1156                            validator.validate_node_property(&self.labels, name, value)?;
1157                            validator.check_unique_node_property(&self.labels, name, value)?;
1158                        }
1159                    }
1160                }
1161
1162                // Write all properties (use versioned methods when inside a transaction)
1163                let tx_id = self.transaction_id;
1164                for (prop_name, value) in resolved_props {
1165                    if prop_name == "*" {
1166                        // Map assignment: value should be a Map
1167                        if let Value::Map(map) = value {
1168                            if self.replace {
1169                                // Replace: remove all existing properties first
1170                                if self.is_edge {
1171                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1172                                        let keys: Vec<String> = edge
1173                                            .properties
1174                                            .iter()
1175                                            .map(|(k, _)| k.as_str().to_string())
1176                                            .collect();
1177                                        for key in keys {
1178                                            if let Some(tid) = tx_id {
1179                                                self.store.remove_edge_property_versioned(
1180                                                    EdgeId(entity_id),
1181                                                    &key,
1182                                                    tid,
1183                                                );
1184                                            } else {
1185                                                self.store
1186                                                    .remove_edge_property(EdgeId(entity_id), &key);
1187                                            }
1188                                        }
1189                                    }
1190                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1191                                    let keys: Vec<String> = node
1192                                        .properties
1193                                        .iter()
1194                                        .map(|(k, _)| k.as_str().to_string())
1195                                        .collect();
1196                                    for key in keys {
1197                                        if let Some(tid) = tx_id {
1198                                            self.store.remove_node_property_versioned(
1199                                                NodeId(entity_id),
1200                                                &key,
1201                                                tid,
1202                                            );
1203                                        } else {
1204                                            self.store
1205                                                .remove_node_property(NodeId(entity_id), &key);
1206                                        }
1207                                    }
1208                                }
1209                            }
1210                            // Set each map entry
1211                            for (key, val) in map.iter() {
1212                                if self.is_edge {
1213                                    if let Some(tid) = tx_id {
1214                                        self.store.set_edge_property_versioned(
1215                                            EdgeId(entity_id),
1216                                            key.as_str(),
1217                                            val.clone(),
1218                                            tid,
1219                                        );
1220                                    } else {
1221                                        self.store.set_edge_property(
1222                                            EdgeId(entity_id),
1223                                            key.as_str(),
1224                                            val.clone(),
1225                                        );
1226                                    }
1227                                } else if let Some(tid) = tx_id {
1228                                    self.store.set_node_property_versioned(
1229                                        NodeId(entity_id),
1230                                        key.as_str(),
1231                                        val.clone(),
1232                                        tid,
1233                                    );
1234                                } else {
1235                                    self.store.set_node_property(
1236                                        NodeId(entity_id),
1237                                        key.as_str(),
1238                                        val.clone(),
1239                                    );
1240                                }
1241                            }
1242                        }
1243                    } else if self.is_edge {
1244                        if let Some(tid) = tx_id {
1245                            self.store.set_edge_property_versioned(
1246                                EdgeId(entity_id),
1247                                &prop_name,
1248                                value,
1249                                tid,
1250                            );
1251                        } else {
1252                            self.store
1253                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1254                        }
1255                    } else if let Some(tid) = tx_id {
1256                        self.store.set_node_property_versioned(
1257                            NodeId(entity_id),
1258                            &prop_name,
1259                            value,
1260                            tid,
1261                        );
1262                    } else {
1263                        self.store
1264                            .set_node_property(NodeId(entity_id), &prop_name, value);
1265                    }
1266                }
1267
1268                // Copy input columns to output
1269                for col_idx in 0..chunk.column_count() {
1270                    if let (Some(src), Some(dst)) =
1271                        (chunk.column(col_idx), builder.column_mut(col_idx))
1272                    {
1273                        if let Some(val) = src.get_value(row) {
1274                            dst.push_value(val);
1275                        } else {
1276                            dst.push_value(Value::Null);
1277                        }
1278                    }
1279                }
1280
1281                builder.advance_row();
1282            }
1283
1284            return Ok(Some(builder.finish()));
1285        }
1286        Ok(None)
1287    }
1288
1289    fn reset(&mut self) {
1290        self.input.reset();
1291    }
1292
1293    fn name(&self) -> &'static str {
1294        "SetProperty"
1295    }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use crate::execution::DataChunk;
1302    use crate::execution::chunk::DataChunkBuilder;
1303    use crate::graph::lpg::LpgStore;
1304
1305    // ── Helpers ────────────────────────────────────────────────────
1306
1307    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1308        Arc::new(LpgStore::new().unwrap())
1309    }
1310
1311    struct MockInput {
1312        chunk: Option<DataChunk>,
1313    }
1314
1315    impl MockInput {
1316        fn boxed(chunk: DataChunk) -> Box<Self> {
1317            Box::new(Self { chunk: Some(chunk) })
1318        }
1319    }
1320
1321    impl Operator for MockInput {
1322        fn next(&mut self) -> OperatorResult {
1323            Ok(self.chunk.take())
1324        }
1325        fn reset(&mut self) {}
1326        fn name(&self) -> &'static str {
1327            "MockInput"
1328        }
1329    }
1330
1331    struct EmptyInput;
1332    impl Operator for EmptyInput {
1333        fn next(&mut self) -> OperatorResult {
1334            Ok(None)
1335        }
1336        fn reset(&mut self) {}
1337        fn name(&self) -> &'static str {
1338            "EmptyInput"
1339        }
1340    }
1341
1342    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1343        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1344        for id in ids {
1345            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1346            builder.advance_row();
1347        }
1348        builder.finish()
1349    }
1350
1351    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1352        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1353        for id in ids {
1354            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1355            builder.advance_row();
1356        }
1357        builder.finish()
1358    }
1359
1360    // ── CreateNodeOperator ──────────────────────────────────────
1361
1362    #[test]
1363    fn test_create_node_standalone() {
1364        let store = create_test_store();
1365
1366        let mut op = CreateNodeOperator::new(
1367            Arc::clone(&store),
1368            None,
1369            vec!["Person".to_string()],
1370            vec![(
1371                "name".to_string(),
1372                PropertySource::Constant(Value::String("Alix".into())),
1373            )],
1374            vec![LogicalType::Int64],
1375            0,
1376        );
1377
1378        let chunk = op.next().unwrap().unwrap();
1379        assert_eq!(chunk.row_count(), 1);
1380
1381        // Second call should return None (standalone executes once)
1382        assert!(op.next().unwrap().is_none());
1383
1384        assert_eq!(store.node_count(), 1);
1385    }
1386
1387    #[test]
1388    fn test_create_edge() {
1389        let store = create_test_store();
1390
1391        let node1 = store.create_node(&["Person"]);
1392        let node2 = store.create_node(&["Person"]);
1393
1394        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1395        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1396        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1397        builder.advance_row();
1398
1399        let mut op = CreateEdgeOperator::new(
1400            Arc::clone(&store),
1401            MockInput::boxed(builder.finish()),
1402            0,
1403            1,
1404            "KNOWS".to_string(),
1405            vec![LogicalType::Int64, LogicalType::Int64],
1406        );
1407
1408        let _chunk = op.next().unwrap().unwrap();
1409        assert_eq!(store.edge_count(), 1);
1410    }
1411
1412    #[test]
1413    fn test_delete_node() {
1414        let store = create_test_store();
1415
1416        let node_id = store.create_node(&["Person"]);
1417        assert_eq!(store.node_count(), 1);
1418
1419        let mut op = DeleteNodeOperator::new(
1420            Arc::clone(&store),
1421            MockInput::boxed(node_id_chunk(&[node_id])),
1422            0,
1423            vec![LogicalType::Node],
1424            false,
1425        );
1426
1427        let chunk = op.next().unwrap().unwrap();
1428        // Pass-through: output row contains the original node ID
1429        assert_eq!(chunk.row_count(), 1);
1430        assert_eq!(store.node_count(), 0);
1431    }
1432
1433    // ── DeleteEdgeOperator ───────────────────────────────────────
1434
1435    #[test]
1436    fn test_delete_edge() {
1437        let store = create_test_store();
1438
1439        let n1 = store.create_node(&["Person"]);
1440        let n2 = store.create_node(&["Person"]);
1441        let eid = store.create_edge(n1, n2, "KNOWS");
1442        assert_eq!(store.edge_count(), 1);
1443
1444        let mut op = DeleteEdgeOperator::new(
1445            Arc::clone(&store),
1446            MockInput::boxed(edge_id_chunk(&[eid])),
1447            0,
1448            vec![LogicalType::Node],
1449        );
1450
1451        let chunk = op.next().unwrap().unwrap();
1452        assert_eq!(chunk.row_count(), 1);
1453        assert_eq!(store.edge_count(), 0);
1454    }
1455
1456    #[test]
1457    fn test_delete_edge_no_input_returns_none() {
1458        let store = create_test_store();
1459
1460        let mut op = DeleteEdgeOperator::new(
1461            Arc::clone(&store),
1462            Box::new(EmptyInput),
1463            0,
1464            vec![LogicalType::Int64],
1465        );
1466
1467        assert!(op.next().unwrap().is_none());
1468    }
1469
1470    #[test]
1471    fn test_delete_multiple_edges() {
1472        let store = create_test_store();
1473
1474        let n1 = store.create_node(&["N"]);
1475        let n2 = store.create_node(&["N"]);
1476        let e1 = store.create_edge(n1, n2, "R");
1477        let e2 = store.create_edge(n2, n1, "S");
1478        assert_eq!(store.edge_count(), 2);
1479
1480        let mut op = DeleteEdgeOperator::new(
1481            Arc::clone(&store),
1482            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1483            0,
1484            vec![LogicalType::Node],
1485        );
1486
1487        let chunk = op.next().unwrap().unwrap();
1488        assert_eq!(chunk.row_count(), 2);
1489        assert_eq!(store.edge_count(), 0);
1490    }
1491
1492    // ── DeleteNodeOperator with DETACH ───────────────────────────
1493
1494    #[test]
1495    fn test_delete_node_detach() {
1496        let store = create_test_store();
1497
1498        let n1 = store.create_node(&["Person"]);
1499        let n2 = store.create_node(&["Person"]);
1500        store.create_edge(n1, n2, "KNOWS");
1501        store.create_edge(n2, n1, "FOLLOWS");
1502        assert_eq!(store.edge_count(), 2);
1503
1504        let mut op = DeleteNodeOperator::new(
1505            Arc::clone(&store),
1506            MockInput::boxed(node_id_chunk(&[n1])),
1507            0,
1508            vec![LogicalType::Node],
1509            true, // detach = true
1510        );
1511
1512        let chunk = op.next().unwrap().unwrap();
1513        assert_eq!(chunk.row_count(), 1);
1514        assert_eq!(store.node_count(), 1);
1515        assert_eq!(store.edge_count(), 0); // edges detached
1516    }
1517
1518    // ── AddLabelOperator ─────────────────────────────────────────
1519
1520    #[test]
1521    fn test_add_label() {
1522        let store = create_test_store();
1523
1524        let node = store.create_node(&["Person"]);
1525
1526        let mut op = AddLabelOperator::new(
1527            Arc::clone(&store),
1528            MockInput::boxed(node_id_chunk(&[node])),
1529            0,
1530            vec!["Employee".to_string()],
1531            vec![LogicalType::Int64],
1532        );
1533
1534        let chunk = op.next().unwrap().unwrap();
1535        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1536        assert_eq!(updated, 1);
1537
1538        // Verify label was added
1539        let node_data = store.get_node(node).unwrap();
1540        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1541        assert!(labels.contains(&"Person"));
1542        assert!(labels.contains(&"Employee"));
1543    }
1544
1545    #[test]
1546    fn test_add_multiple_labels() {
1547        let store = create_test_store();
1548
1549        let node = store.create_node(&["Base"]);
1550
1551        let mut op = AddLabelOperator::new(
1552            Arc::clone(&store),
1553            MockInput::boxed(node_id_chunk(&[node])),
1554            0,
1555            vec!["LabelA".to_string(), "LabelB".to_string()],
1556            vec![LogicalType::Int64],
1557        );
1558
1559        let chunk = op.next().unwrap().unwrap();
1560        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1561        assert_eq!(updated, 2); // 2 labels added
1562
1563        let node_data = store.get_node(node).unwrap();
1564        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1565        assert!(labels.contains(&"LabelA"));
1566        assert!(labels.contains(&"LabelB"));
1567    }
1568
1569    #[test]
1570    fn test_add_label_no_input_returns_none() {
1571        let store = create_test_store();
1572
1573        let mut op = AddLabelOperator::new(
1574            Arc::clone(&store),
1575            Box::new(EmptyInput),
1576            0,
1577            vec!["Foo".to_string()],
1578            vec![LogicalType::Int64],
1579        );
1580
1581        assert!(op.next().unwrap().is_none());
1582    }
1583
1584    // ── RemoveLabelOperator ──────────────────────────────────────
1585
1586    #[test]
1587    fn test_remove_label() {
1588        let store = create_test_store();
1589
1590        let node = store.create_node(&["Person", "Employee"]);
1591
1592        let mut op = RemoveLabelOperator::new(
1593            Arc::clone(&store),
1594            MockInput::boxed(node_id_chunk(&[node])),
1595            0,
1596            vec!["Employee".to_string()],
1597            vec![LogicalType::Int64],
1598        );
1599
1600        let chunk = op.next().unwrap().unwrap();
1601        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1602        assert_eq!(updated, 1);
1603
1604        // Verify label was removed
1605        let node_data = store.get_node(node).unwrap();
1606        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1607        assert!(labels.contains(&"Person"));
1608        assert!(!labels.contains(&"Employee"));
1609    }
1610
1611    #[test]
1612    fn test_remove_nonexistent_label() {
1613        let store = create_test_store();
1614
1615        let node = store.create_node(&["Person"]);
1616
1617        let mut op = RemoveLabelOperator::new(
1618            Arc::clone(&store),
1619            MockInput::boxed(node_id_chunk(&[node])),
1620            0,
1621            vec!["NonExistent".to_string()],
1622            vec![LogicalType::Int64],
1623        );
1624
1625        let chunk = op.next().unwrap().unwrap();
1626        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1627        assert_eq!(updated, 0); // nothing removed
1628    }
1629
1630    // ── SetPropertyOperator ──────────────────────────────────────
1631
1632    #[test]
1633    fn test_set_node_property_constant() {
1634        let store = create_test_store();
1635
1636        let node = store.create_node(&["Person"]);
1637
1638        let mut op = SetPropertyOperator::new_for_node(
1639            Arc::clone(&store),
1640            MockInput::boxed(node_id_chunk(&[node])),
1641            0,
1642            vec![(
1643                "name".to_string(),
1644                PropertySource::Constant(Value::String("Alix".into())),
1645            )],
1646            vec![LogicalType::Int64],
1647        );
1648
1649        let chunk = op.next().unwrap().unwrap();
1650        assert_eq!(chunk.row_count(), 1);
1651
1652        // Verify property was set
1653        let node_data = store.get_node(node).unwrap();
1654        assert_eq!(
1655            node_data
1656                .properties
1657                .get(&grafeo_common::types::PropertyKey::new("name")),
1658            Some(&Value::String("Alix".into()))
1659        );
1660    }
1661
1662    #[test]
1663    fn test_set_node_property_from_column() {
1664        let store = create_test_store();
1665
1666        let node = store.create_node(&["Person"]);
1667
1668        // Input: column 0 = node ID, column 1 = property value
1669        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1670        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1671        builder
1672            .column_mut(1)
1673            .unwrap()
1674            .push_value(Value::String("Gus".into()));
1675        builder.advance_row();
1676
1677        let mut op = SetPropertyOperator::new_for_node(
1678            Arc::clone(&store),
1679            MockInput::boxed(builder.finish()),
1680            0,
1681            vec![("name".to_string(), PropertySource::Column(1))],
1682            vec![LogicalType::Int64, LogicalType::String],
1683        );
1684
1685        let chunk = op.next().unwrap().unwrap();
1686        assert_eq!(chunk.row_count(), 1);
1687
1688        let node_data = store.get_node(node).unwrap();
1689        assert_eq!(
1690            node_data
1691                .properties
1692                .get(&grafeo_common::types::PropertyKey::new("name")),
1693            Some(&Value::String("Gus".into()))
1694        );
1695    }
1696
1697    #[test]
1698    fn test_set_edge_property() {
1699        let store = create_test_store();
1700
1701        let n1 = store.create_node(&["N"]);
1702        let n2 = store.create_node(&["N"]);
1703        let eid = store.create_edge(n1, n2, "KNOWS");
1704
1705        let mut op = SetPropertyOperator::new_for_edge(
1706            Arc::clone(&store),
1707            MockInput::boxed(edge_id_chunk(&[eid])),
1708            0,
1709            vec![(
1710                "weight".to_string(),
1711                PropertySource::Constant(Value::Float64(0.75)),
1712            )],
1713            vec![LogicalType::Int64],
1714        );
1715
1716        let chunk = op.next().unwrap().unwrap();
1717        assert_eq!(chunk.row_count(), 1);
1718
1719        let edge_data = store.get_edge(eid).unwrap();
1720        assert_eq!(
1721            edge_data
1722                .properties
1723                .get(&grafeo_common::types::PropertyKey::new("weight")),
1724            Some(&Value::Float64(0.75))
1725        );
1726    }
1727
1728    #[test]
1729    fn test_set_multiple_properties() {
1730        let store = create_test_store();
1731
1732        let node = store.create_node(&["Person"]);
1733
1734        let mut op = SetPropertyOperator::new_for_node(
1735            Arc::clone(&store),
1736            MockInput::boxed(node_id_chunk(&[node])),
1737            0,
1738            vec![
1739                (
1740                    "name".to_string(),
1741                    PropertySource::Constant(Value::String("Alix".into())),
1742                ),
1743                (
1744                    "age".to_string(),
1745                    PropertySource::Constant(Value::Int64(30)),
1746                ),
1747            ],
1748            vec![LogicalType::Int64],
1749        );
1750
1751        op.next().unwrap().unwrap();
1752
1753        let node_data = store.get_node(node).unwrap();
1754        assert_eq!(
1755            node_data
1756                .properties
1757                .get(&grafeo_common::types::PropertyKey::new("name")),
1758            Some(&Value::String("Alix".into()))
1759        );
1760        assert_eq!(
1761            node_data
1762                .properties
1763                .get(&grafeo_common::types::PropertyKey::new("age")),
1764            Some(&Value::Int64(30))
1765        );
1766    }
1767
1768    #[test]
1769    fn test_set_property_no_input_returns_none() {
1770        let store = create_test_store();
1771
1772        let mut op = SetPropertyOperator::new_for_node(
1773            Arc::clone(&store),
1774            Box::new(EmptyInput),
1775            0,
1776            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1777            vec![LogicalType::Int64],
1778        );
1779
1780        assert!(op.next().unwrap().is_none());
1781    }
1782
1783    // ── Error paths ──────────────────────────────────────────────
1784
1785    #[test]
1786    fn test_delete_node_without_detach_errors_when_edges_exist() {
1787        let store = create_test_store();
1788
1789        let n1 = store.create_node(&["Person"]);
1790        let n2 = store.create_node(&["Person"]);
1791        store.create_edge(n1, n2, "KNOWS");
1792
1793        let mut op = DeleteNodeOperator::new(
1794            Arc::clone(&store),
1795            MockInput::boxed(node_id_chunk(&[n1])),
1796            0,
1797            vec![LogicalType::Int64],
1798            false, // no detach
1799        );
1800
1801        let err = op.next().unwrap_err();
1802        match err {
1803            OperatorError::ConstraintViolation(msg) => {
1804                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1805            }
1806            other => panic!("expected ConstraintViolation, got {other:?}"),
1807        }
1808        // Node should still exist
1809        assert_eq!(store.node_count(), 2);
1810    }
1811
1812    // ── CreateNodeOperator with input ───────────────────────────
1813
1814    #[test]
1815    fn test_create_node_with_input_operator() {
1816        let store = create_test_store();
1817
1818        // Seed node to provide input rows
1819        let existing = store.create_node(&["Seed"]);
1820
1821        let mut op = CreateNodeOperator::new(
1822            Arc::clone(&store),
1823            Some(MockInput::boxed(node_id_chunk(&[existing]))),
1824            vec!["Created".to_string()],
1825            vec![(
1826                "source".to_string(),
1827                PropertySource::Constant(Value::String("from_input".into())),
1828            )],
1829            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
1830            1,                                            // output column for new node ID
1831        );
1832
1833        let chunk = op.next().unwrap().unwrap();
1834        assert_eq!(chunk.row_count(), 1);
1835
1836        // Should have created one new node (2 total: Seed + Created)
1837        assert_eq!(store.node_count(), 2);
1838
1839        // Exhausted
1840        assert!(op.next().unwrap().is_none());
1841    }
1842
1843    // ── CreateEdgeOperator with properties and output column ────
1844
1845    #[test]
1846    fn test_create_edge_with_properties_and_output_column() {
1847        let store = create_test_store();
1848
1849        let n1 = store.create_node(&["Person"]);
1850        let n2 = store.create_node(&["Person"]);
1851
1852        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1853        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1854        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1855        builder.advance_row();
1856
1857        let mut op = CreateEdgeOperator::new(
1858            Arc::clone(&store),
1859            MockInput::boxed(builder.finish()),
1860            0,
1861            1,
1862            "KNOWS".to_string(),
1863            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1864        )
1865        .with_properties(vec![(
1866            "since".to_string(),
1867            PropertySource::Constant(Value::Int64(2024)),
1868        )])
1869        .with_output_column(2);
1870
1871        let chunk = op.next().unwrap().unwrap();
1872        assert_eq!(chunk.row_count(), 1);
1873        assert_eq!(store.edge_count(), 1);
1874
1875        // Verify the output chunk contains the edge ID in column 2
1876        let edge_id_raw = chunk
1877            .column(2)
1878            .and_then(|c| c.get_int64(0))
1879            .expect("edge ID should be in output column 2");
1880        let edge_id = EdgeId(edge_id_raw as u64);
1881
1882        // Verify the edge has the property
1883        let edge = store.get_edge(edge_id).expect("edge should exist");
1884        assert_eq!(
1885            edge.properties
1886                .get(&grafeo_common::types::PropertyKey::new("since")),
1887            Some(&Value::Int64(2024))
1888        );
1889    }
1890
1891    // ── SetPropertyOperator with map replacement ────────────────
1892
1893    #[test]
1894    fn test_set_property_map_replace() {
1895        use std::collections::BTreeMap;
1896
1897        let store = create_test_store();
1898
1899        let node = store.create_node(&["Person"]);
1900        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1901
1902        let mut map = BTreeMap::new();
1903        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1904
1905        let mut op = SetPropertyOperator::new_for_node(
1906            Arc::clone(&store),
1907            MockInput::boxed(node_id_chunk(&[node])),
1908            0,
1909            vec![(
1910                "*".to_string(),
1911                PropertySource::Constant(Value::Map(Arc::new(map))),
1912            )],
1913            vec![LogicalType::Int64],
1914        )
1915        .with_replace(true);
1916
1917        op.next().unwrap().unwrap();
1918
1919        let node_data = store.get_node(node).unwrap();
1920        // Old property should be gone
1921        assert!(
1922            node_data
1923                .properties
1924                .get(&PropertyKey::new("old_prop"))
1925                .is_none()
1926        );
1927        // New property should exist
1928        assert_eq!(
1929            node_data.properties.get(&PropertyKey::new("new_key")),
1930            Some(&Value::String("new_val".into()))
1931        );
1932    }
1933
1934    // ── SetPropertyOperator with map merge (no replace) ─────────
1935
1936    #[test]
1937    fn test_set_property_map_merge() {
1938        use std::collections::BTreeMap;
1939
1940        let store = create_test_store();
1941
1942        let node = store.create_node(&["Person"]);
1943        store.set_node_property(node, "existing", Value::Int64(42));
1944
1945        let mut map = BTreeMap::new();
1946        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1947
1948        let mut op = SetPropertyOperator::new_for_node(
1949            Arc::clone(&store),
1950            MockInput::boxed(node_id_chunk(&[node])),
1951            0,
1952            vec![(
1953                "*".to_string(),
1954                PropertySource::Constant(Value::Map(Arc::new(map))),
1955            )],
1956            vec![LogicalType::Int64],
1957        ); // replace defaults to false
1958
1959        op.next().unwrap().unwrap();
1960
1961        let node_data = store.get_node(node).unwrap();
1962        // Existing property should still be there
1963        assert_eq!(
1964            node_data.properties.get(&PropertyKey::new("existing")),
1965            Some(&Value::Int64(42))
1966        );
1967        // New property should also exist
1968        assert_eq!(
1969            node_data.properties.get(&PropertyKey::new("added")),
1970            Some(&Value::String("hello".into()))
1971        );
1972    }
1973
1974    // ── PropertySource::PropertyAccess ──────────────────────────
1975
1976    #[test]
1977    fn test_property_source_property_access() {
1978        let store = create_test_store();
1979
1980        let source_node = store.create_node(&["Source"]);
1981        store.set_node_property(source_node, "name", Value::String("Alix".into()));
1982
1983        let target_node = store.create_node(&["Target"]);
1984
1985        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
1986        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1987        builder.column_mut(0).unwrap().push_node_id(source_node);
1988        builder
1989            .column_mut(1)
1990            .unwrap()
1991            .push_int64(target_node.0 as i64);
1992        builder.advance_row();
1993
1994        let mut op = SetPropertyOperator::new_for_node(
1995            Arc::clone(&store),
1996            MockInput::boxed(builder.finish()),
1997            1, // entity column = target node
1998            vec![(
1999                "copied_name".to_string(),
2000                PropertySource::PropertyAccess {
2001                    column: 0,
2002                    property: "name".to_string(),
2003                },
2004            )],
2005            vec![LogicalType::Node, LogicalType::Int64],
2006        );
2007
2008        op.next().unwrap().unwrap();
2009
2010        let target_data = store.get_node(target_node).unwrap();
2011        assert_eq!(
2012            target_data.properties.get(&PropertyKey::new("copied_name")),
2013            Some(&Value::String("Alix".into()))
2014        );
2015    }
2016
2017    // ── ConstraintValidator integration ─────────────────────────
2018
2019    #[test]
2020    fn test_create_node_with_constraint_validator() {
2021        let store = create_test_store();
2022
2023        struct RejectAgeValidator;
2024        impl ConstraintValidator for RejectAgeValidator {
2025            fn validate_node_property(
2026                &self,
2027                _labels: &[String],
2028                key: &str,
2029                _value: &Value,
2030            ) -> Result<(), OperatorError> {
2031                if key == "forbidden" {
2032                    return Err(OperatorError::ConstraintViolation(
2033                        "property 'forbidden' is not allowed".to_string(),
2034                    ));
2035                }
2036                Ok(())
2037            }
2038            fn validate_node_complete(
2039                &self,
2040                _labels: &[String],
2041                _properties: &[(String, Value)],
2042            ) -> Result<(), OperatorError> {
2043                Ok(())
2044            }
2045            fn check_unique_node_property(
2046                &self,
2047                _labels: &[String],
2048                _key: &str,
2049                _value: &Value,
2050            ) -> Result<(), OperatorError> {
2051                Ok(())
2052            }
2053            fn validate_edge_property(
2054                &self,
2055                _edge_type: &str,
2056                _key: &str,
2057                _value: &Value,
2058            ) -> Result<(), OperatorError> {
2059                Ok(())
2060            }
2061            fn validate_edge_complete(
2062                &self,
2063                _edge_type: &str,
2064                _properties: &[(String, Value)],
2065            ) -> Result<(), OperatorError> {
2066                Ok(())
2067            }
2068        }
2069
2070        // Valid property should succeed
2071        let mut op = CreateNodeOperator::new(
2072            Arc::clone(&store),
2073            None,
2074            vec!["Thing".to_string()],
2075            vec![(
2076                "name".to_string(),
2077                PropertySource::Constant(Value::String("ok".into())),
2078            )],
2079            vec![LogicalType::Int64],
2080            0,
2081        )
2082        .with_validator(Arc::new(RejectAgeValidator));
2083
2084        assert!(op.next().is_ok());
2085        assert_eq!(store.node_count(), 1);
2086
2087        // Forbidden property should fail
2088        let mut op = CreateNodeOperator::new(
2089            Arc::clone(&store),
2090            None,
2091            vec!["Thing".to_string()],
2092            vec![(
2093                "forbidden".to_string(),
2094                PropertySource::Constant(Value::Int64(1)),
2095            )],
2096            vec![LogicalType::Int64],
2097            0,
2098        )
2099        .with_validator(Arc::new(RejectAgeValidator));
2100
2101        let err = op.next().unwrap_err();
2102        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2103        // Node count should still be 2 (the node is created before validation, but the error
2104        // propagates - this tests the validation logic fires)
2105    }
2106
2107    // ── Reset behavior ──────────────────────────────────────────
2108
2109    #[test]
2110    fn test_create_node_reset_allows_re_execution() {
2111        let store = create_test_store();
2112
2113        let mut op = CreateNodeOperator::new(
2114            Arc::clone(&store),
2115            None,
2116            vec!["Person".to_string()],
2117            vec![],
2118            vec![LogicalType::Int64],
2119            0,
2120        );
2121
2122        // First execution
2123        assert!(op.next().unwrap().is_some());
2124        assert!(op.next().unwrap().is_none());
2125
2126        // Reset and re-execute
2127        op.reset();
2128        assert!(op.next().unwrap().is_some());
2129
2130        assert_eq!(store.node_count(), 2);
2131    }
2132
2133    // ── Operator name() ──────────────────────────────────────────
2134
2135    #[test]
2136    fn test_operator_names() {
2137        let store = create_test_store();
2138
2139        let op = CreateNodeOperator::new(
2140            Arc::clone(&store),
2141            None,
2142            vec![],
2143            vec![],
2144            vec![LogicalType::Int64],
2145            0,
2146        );
2147        assert_eq!(op.name(), "CreateNode");
2148
2149        let op = CreateEdgeOperator::new(
2150            Arc::clone(&store),
2151            Box::new(EmptyInput),
2152            0,
2153            1,
2154            "R".to_string(),
2155            vec![LogicalType::Int64],
2156        );
2157        assert_eq!(op.name(), "CreateEdge");
2158
2159        let op = DeleteNodeOperator::new(
2160            Arc::clone(&store),
2161            Box::new(EmptyInput),
2162            0,
2163            vec![LogicalType::Int64],
2164            false,
2165        );
2166        assert_eq!(op.name(), "DeleteNode");
2167
2168        let op = DeleteEdgeOperator::new(
2169            Arc::clone(&store),
2170            Box::new(EmptyInput),
2171            0,
2172            vec![LogicalType::Int64],
2173        );
2174        assert_eq!(op.name(), "DeleteEdge");
2175
2176        let op = AddLabelOperator::new(
2177            Arc::clone(&store),
2178            Box::new(EmptyInput),
2179            0,
2180            vec!["L".to_string()],
2181            vec![LogicalType::Int64],
2182        );
2183        assert_eq!(op.name(), "AddLabel");
2184
2185        let op = RemoveLabelOperator::new(
2186            Arc::clone(&store),
2187            Box::new(EmptyInput),
2188            0,
2189            vec!["L".to_string()],
2190            vec![LogicalType::Int64],
2191        );
2192        assert_eq!(op.name(), "RemoveLabel");
2193
2194        let op = SetPropertyOperator::new_for_node(
2195            Arc::clone(&store),
2196            Box::new(EmptyInput),
2197            0,
2198            vec![],
2199            vec![LogicalType::Int64],
2200        );
2201        assert_eq!(op.name(), "SetProperty");
2202    }
2203}