Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    fn validate_node_property(
28        &self,
29        labels: &[String],
30        key: &str,
31        value: &Value,
32    ) -> Result<(), OperatorError>;
33
34    /// Validates that all required properties are present after creating a node.
35    ///
36    /// Checks NOT NULL constraints for properties that were not explicitly set.
37    fn validate_node_complete(
38        &self,
39        labels: &[String],
40        properties: &[(String, Value)],
41    ) -> Result<(), OperatorError>;
42
43    /// Checks UNIQUE constraint for a node property value.
44    ///
45    /// Returns an error if a node with the same label already has this value.
46    fn check_unique_node_property(
47        &self,
48        labels: &[String],
49        key: &str,
50        value: &Value,
51    ) -> Result<(), OperatorError>;
52
53    /// Validates a single property value for an edge of the given type.
54    fn validate_edge_property(
55        &self,
56        edge_type: &str,
57        key: &str,
58        value: &Value,
59    ) -> Result<(), OperatorError>;
60
61    /// Validates that all required properties are present after creating an edge.
62    fn validate_edge_complete(
63        &self,
64        edge_type: &str,
65        properties: &[(String, Value)],
66    ) -> Result<(), OperatorError>;
67}
68
69/// Operator that creates new nodes.
70///
71/// For each input row, creates a new node with the specified labels
72/// and properties, then outputs the row with the new node.
73pub struct CreateNodeOperator {
74    /// The graph store to modify.
75    store: Arc<dyn GraphStoreMut>,
76    /// Input operator.
77    input: Option<Box<dyn Operator>>,
78    /// Labels for the new nodes.
79    labels: Vec<String>,
80    /// Properties to set (name -> column index or constant value).
81    properties: Vec<(String, PropertySource)>,
82    /// Output schema.
83    output_schema: Vec<LogicalType>,
84    /// Column index for the created node variable.
85    output_column: usize,
86    /// Whether this operator has been executed (for no-input case).
87    executed: bool,
88    /// Epoch for MVCC versioning.
89    viewing_epoch: Option<EpochId>,
90    /// Transaction ID for MVCC versioning.
91    transaction_id: Option<TransactionId>,
92    /// Optional constraint validator for schema enforcement.
93    validator: Option<Arc<dyn ConstraintValidator>>,
94}
95
96/// Source for a property value.
97#[derive(Debug, Clone)]
98pub enum PropertySource {
99    /// Get value from an input column.
100    Column(usize),
101    /// Use a constant value.
102    Constant(Value),
103    /// Access a named property from a map/node/edge in an input column.
104    PropertyAccess {
105        /// The column containing the map, node ID, or edge ID.
106        column: usize,
107        /// The property name to extract.
108        property: String,
109    },
110}
111
112impl PropertySource {
113    /// Resolves a property value from a data chunk row.
114    pub fn resolve(
115        &self,
116        chunk: &crate::execution::chunk::DataChunk,
117        row: usize,
118        store: &dyn GraphStore,
119    ) -> Value {
120        match self {
121            PropertySource::Column(col_idx) => chunk
122                .column(*col_idx)
123                .and_then(|c| c.get_value(row))
124                .unwrap_or(Value::Null),
125            PropertySource::Constant(v) => v.clone(),
126            PropertySource::PropertyAccess { column, property } => {
127                let Some(col) = chunk.column(*column) else {
128                    return Value::Null;
129                };
130                // Try node ID first, then edge ID, then map value
131                if let Some(node_id) = col.get_node_id(row) {
132                    store
133                        .get_node(node_id)
134                        .and_then(|node| node.get_property(property).cloned())
135                        .unwrap_or(Value::Null)
136                } else if let Some(edge_id) = col.get_edge_id(row) {
137                    store
138                        .get_edge(edge_id)
139                        .and_then(|edge| edge.get_property(property).cloned())
140                        .unwrap_or(Value::Null)
141                } else if let Some(Value::Map(map)) = col.get_value(row) {
142                    let key = PropertyKey::new(property);
143                    map.get(&key).cloned().unwrap_or(Value::Null)
144                } else {
145                    Value::Null
146                }
147            }
148        }
149    }
150}
151
152impl CreateNodeOperator {
153    /// Creates a new node creation operator.
154    ///
155    /// # Arguments
156    /// * `store` - The graph store to modify.
157    /// * `input` - Optional input operator (None for standalone CREATE).
158    /// * `labels` - Labels to assign to created nodes.
159    /// * `properties` - Properties to set on created nodes.
160    /// * `output_schema` - Schema of the output.
161    /// * `output_column` - Column index where the created node ID goes.
162    pub fn new(
163        store: Arc<dyn GraphStoreMut>,
164        input: Option<Box<dyn Operator>>,
165        labels: Vec<String>,
166        properties: Vec<(String, PropertySource)>,
167        output_schema: Vec<LogicalType>,
168        output_column: usize,
169    ) -> Self {
170        Self {
171            store,
172            input,
173            labels,
174            properties,
175            output_schema,
176            output_column,
177            executed: false,
178            viewing_epoch: None,
179            transaction_id: None,
180            validator: None,
181        }
182    }
183
184    /// Sets the transaction context for MVCC versioning.
185    pub fn with_transaction_context(
186        mut self,
187        epoch: EpochId,
188        transaction_id: Option<TransactionId>,
189    ) -> Self {
190        self.viewing_epoch = Some(epoch);
191        self.transaction_id = transaction_id;
192        self
193    }
194
195    /// Sets the constraint validator for schema enforcement.
196    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
197        self.validator = Some(validator);
198        self
199    }
200}
201
202impl CreateNodeOperator {
203    /// Validates and sets properties on a newly created node.
204    fn validate_and_set_properties(
205        &self,
206        node_id: NodeId,
207        resolved_props: &[(String, Value)],
208    ) -> Result<(), OperatorError> {
209        // Phase 1: Validate each property value
210        if let Some(ref validator) = self.validator {
211            for (name, value) in resolved_props {
212                validator.validate_node_property(&self.labels, name, value)?;
213                validator.check_unique_node_property(&self.labels, name, value)?;
214            }
215            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
216            validator.validate_node_complete(&self.labels, resolved_props)?;
217        }
218
219        // Phase 3: Write properties to the store
220        for (name, value) in resolved_props {
221            self.store.set_node_property(node_id, name, value.clone());
222        }
223        Ok(())
224    }
225}
226
227impl Operator for CreateNodeOperator {
228    fn next(&mut self) -> OperatorResult {
229        // Get transaction context for versioned creation
230        let epoch = self
231            .viewing_epoch
232            .unwrap_or_else(|| self.store.current_epoch());
233        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
234
235        if let Some(ref mut input) = self.input {
236            // For each input row, create a node
237            if let Some(chunk) = input.next()? {
238                let mut builder =
239                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
240
241                for row in chunk.selected_indices() {
242                    // Resolve all property values first (before creating node)
243                    let resolved_props: Vec<(String, Value)> = self
244                        .properties
245                        .iter()
246                        .map(|(name, source)| {
247                            let value =
248                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
249                            (name.clone(), value)
250                        })
251                        .collect();
252
253                    // Create the node with MVCC versioning
254                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
255                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
256
257                    // Validate and set properties
258                    self.validate_and_set_properties(node_id, &resolved_props)?;
259
260                    // Copy input columns to output
261                    for col_idx in 0..chunk.column_count() {
262                        if col_idx < self.output_column
263                            && let (Some(src), Some(dst)) =
264                                (chunk.column(col_idx), builder.column_mut(col_idx))
265                        {
266                            if let Some(val) = src.get_value(row) {
267                                dst.push_value(val);
268                            } else {
269                                dst.push_value(Value::Null);
270                            }
271                        }
272                    }
273
274                    // Add the new node ID
275                    if let Some(dst) = builder.column_mut(self.output_column) {
276                        dst.push_value(Value::Int64(node_id.0 as i64));
277                    }
278
279                    builder.advance_row();
280                }
281
282                return Ok(Some(builder.finish()));
283            }
284            Ok(None)
285        } else {
286            // No input - create a single node
287            if self.executed {
288                return Ok(None);
289            }
290            self.executed = true;
291
292            // Resolve constant properties
293            let resolved_props: Vec<(String, Value)> = self
294                .properties
295                .iter()
296                .filter_map(|(name, source)| {
297                    if let PropertySource::Constant(value) = source {
298                        Some((name.clone(), value.clone()))
299                    } else {
300                        None
301                    }
302                })
303                .collect();
304
305            // Create the node with MVCC versioning
306            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
307            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
308
309            // Validate and set properties
310            self.validate_and_set_properties(node_id, &resolved_props)?;
311
312            // Build output chunk with just the node ID
313            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
314            if let Some(dst) = builder.column_mut(self.output_column) {
315                dst.push_value(Value::Int64(node_id.0 as i64));
316            }
317            builder.advance_row();
318
319            Ok(Some(builder.finish()))
320        }
321    }
322
323    fn reset(&mut self) {
324        if let Some(ref mut input) = self.input {
325            input.reset();
326        }
327        self.executed = false;
328    }
329
330    fn name(&self) -> &'static str {
331        "CreateNode"
332    }
333}
334
335/// Operator that creates new edges.
336pub struct CreateEdgeOperator {
337    /// The graph store to modify.
338    store: Arc<dyn GraphStoreMut>,
339    /// Input operator.
340    input: Box<dyn Operator>,
341    /// Column index for the source node.
342    from_column: usize,
343    /// Column index for the target node.
344    to_column: usize,
345    /// Edge type.
346    edge_type: String,
347    /// Properties to set.
348    properties: Vec<(String, PropertySource)>,
349    /// Output schema.
350    output_schema: Vec<LogicalType>,
351    /// Column index for the created edge variable (if any).
352    output_column: Option<usize>,
353    /// Epoch for MVCC versioning.
354    viewing_epoch: Option<EpochId>,
355    /// Transaction ID for MVCC versioning.
356    transaction_id: Option<TransactionId>,
357    /// Optional constraint validator for schema enforcement.
358    validator: Option<Arc<dyn ConstraintValidator>>,
359}
360
361impl CreateEdgeOperator {
362    /// Creates a new edge creation operator.
363    ///
364    /// Use builder methods to set additional options:
365    /// - [`with_properties`](Self::with_properties) - set edge properties
366    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
367    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
368    pub fn new(
369        store: Arc<dyn GraphStoreMut>,
370        input: Box<dyn Operator>,
371        from_column: usize,
372        to_column: usize,
373        edge_type: String,
374        output_schema: Vec<LogicalType>,
375    ) -> Self {
376        Self {
377            store,
378            input,
379            from_column,
380            to_column,
381            edge_type,
382            properties: Vec::new(),
383            output_schema,
384            output_column: None,
385            viewing_epoch: None,
386            transaction_id: None,
387            validator: None,
388        }
389    }
390
391    /// Sets the properties to assign to created edges.
392    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
393        self.properties = properties;
394        self
395    }
396
397    /// Sets the output column for the created edge ID.
398    pub fn with_output_column(mut self, column: usize) -> Self {
399        self.output_column = Some(column);
400        self
401    }
402
403    /// Sets the transaction context for MVCC versioning.
404    pub fn with_transaction_context(
405        mut self,
406        epoch: EpochId,
407        transaction_id: Option<TransactionId>,
408    ) -> Self {
409        self.viewing_epoch = Some(epoch);
410        self.transaction_id = transaction_id;
411        self
412    }
413
414    /// Sets the constraint validator for schema enforcement.
415    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
416        self.validator = Some(validator);
417        self
418    }
419}
420
421impl Operator for CreateEdgeOperator {
422    fn next(&mut self) -> OperatorResult {
423        // Get transaction context for versioned creation
424        let epoch = self
425            .viewing_epoch
426            .unwrap_or_else(|| self.store.current_epoch());
427        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
428
429        if let Some(chunk) = self.input.next()? {
430            let mut builder =
431                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
432
433            for row in chunk.selected_indices() {
434                // Get source and target node IDs
435                let from_id = chunk
436                    .column(self.from_column)
437                    .and_then(|c| c.get_value(row))
438                    .ok_or_else(|| {
439                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
440                    })?;
441
442                let to_id = chunk
443                    .column(self.to_column)
444                    .and_then(|c| c.get_value(row))
445                    .ok_or_else(|| {
446                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
447                    })?;
448
449                // Extract node IDs
450                let from_node_id = match from_id {
451                    Value::Int64(id) => NodeId(id as u64),
452                    _ => {
453                        return Err(OperatorError::TypeMismatch {
454                            expected: "Int64 (node ID)".to_string(),
455                            found: format!("{from_id:?}"),
456                        });
457                    }
458                };
459
460                let to_node_id = match to_id {
461                    Value::Int64(id) => NodeId(id as u64),
462                    _ => {
463                        return Err(OperatorError::TypeMismatch {
464                            expected: "Int64 (node ID)".to_string(),
465                            found: format!("{to_id:?}"),
466                        });
467                    }
468                };
469
470                // Resolve property values
471                let resolved_props: Vec<(String, Value)> = self
472                    .properties
473                    .iter()
474                    .map(|(name, source)| {
475                        let value =
476                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
477                        (name.clone(), value)
478                    })
479                    .collect();
480
481                // Validate constraints before writing
482                if let Some(ref validator) = self.validator {
483                    for (name, value) in &resolved_props {
484                        validator.validate_edge_property(&self.edge_type, name, value)?;
485                    }
486                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
487                }
488
489                // Create the edge with MVCC versioning
490                let edge_id = self.store.create_edge_versioned(
491                    from_node_id,
492                    to_node_id,
493                    &self.edge_type,
494                    epoch,
495                    tx,
496                );
497
498                // Set properties
499                for (name, value) in resolved_props {
500                    self.store.set_edge_property(edge_id, &name, value);
501                }
502
503                // Copy input columns
504                for col_idx in 0..chunk.column_count() {
505                    if let (Some(src), Some(dst)) =
506                        (chunk.column(col_idx), builder.column_mut(col_idx))
507                    {
508                        if let Some(val) = src.get_value(row) {
509                            dst.push_value(val);
510                        } else {
511                            dst.push_value(Value::Null);
512                        }
513                    }
514                }
515
516                // Add edge ID if requested
517                if let Some(out_col) = self.output_column
518                    && let Some(dst) = builder.column_mut(out_col)
519                {
520                    dst.push_value(Value::Int64(edge_id.0 as i64));
521                }
522
523                builder.advance_row();
524            }
525
526            return Ok(Some(builder.finish()));
527        }
528        Ok(None)
529    }
530
531    fn reset(&mut self) {
532        self.input.reset();
533    }
534
535    fn name(&self) -> &'static str {
536        "CreateEdge"
537    }
538}
539
540/// Operator that deletes nodes.
541pub struct DeleteNodeOperator {
542    /// The graph store to modify.
543    store: Arc<dyn GraphStoreMut>,
544    /// Input operator.
545    input: Box<dyn Operator>,
546    /// Column index for the node to delete.
547    node_column: usize,
548    /// Output schema.
549    output_schema: Vec<LogicalType>,
550    /// Whether to detach (delete connected edges) before deleting.
551    detach: bool,
552    /// Epoch for MVCC versioning.
553    viewing_epoch: Option<EpochId>,
554    /// Transaction ID for MVCC versioning.
555    transaction_id: Option<TransactionId>,
556}
557
558impl DeleteNodeOperator {
559    /// Creates a new node deletion operator.
560    pub fn new(
561        store: Arc<dyn GraphStoreMut>,
562        input: Box<dyn Operator>,
563        node_column: usize,
564        output_schema: Vec<LogicalType>,
565        detach: bool,
566    ) -> Self {
567        Self {
568            store,
569            input,
570            node_column,
571            output_schema,
572            detach,
573            viewing_epoch: None,
574            transaction_id: None,
575        }
576    }
577
578    /// Sets the transaction context for MVCC versioning.
579    pub fn with_transaction_context(
580        mut self,
581        epoch: EpochId,
582        transaction_id: Option<TransactionId>,
583    ) -> Self {
584        self.viewing_epoch = Some(epoch);
585        self.transaction_id = transaction_id;
586        self
587    }
588}
589
590impl Operator for DeleteNodeOperator {
591    fn next(&mut self) -> OperatorResult {
592        // Get transaction context for versioned deletion
593        let epoch = self
594            .viewing_epoch
595            .unwrap_or_else(|| self.store.current_epoch());
596        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
597
598        if let Some(chunk) = self.input.next()? {
599            let mut deleted_count = 0;
600
601            for row in chunk.selected_indices() {
602                let node_val = chunk
603                    .column(self.node_column)
604                    .and_then(|c| c.get_value(row))
605                    .ok_or_else(|| {
606                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
607                    })?;
608
609                let node_id = match node_val {
610                    Value::Int64(id) => NodeId(id as u64),
611                    _ => {
612                        return Err(OperatorError::TypeMismatch {
613                            expected: "Int64 (node ID)".to_string(),
614                            found: format!("{node_val:?}"),
615                        });
616                    }
617                };
618
619                if self.detach {
620                    // Delete all connected edges first
621                    self.store.delete_node_edges(node_id);
622                } else {
623                    // NODETACH: check that node has no connected edges
624                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
625                    if degree > 0 {
626                        return Err(OperatorError::ConstraintViolation(format!(
627                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
628                            degree
629                        )));
630                    }
631                }
632
633                // Delete the node with MVCC versioning
634                if self.store.delete_node_versioned(node_id, epoch, tx) {
635                    deleted_count += 1;
636                }
637            }
638
639            // Return a chunk with the delete count
640            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
641            if let Some(dst) = builder.column_mut(0) {
642                dst.push_value(Value::Int64(deleted_count));
643            }
644            builder.advance_row();
645
646            return Ok(Some(builder.finish()));
647        }
648        Ok(None)
649    }
650
651    fn reset(&mut self) {
652        self.input.reset();
653    }
654
655    fn name(&self) -> &'static str {
656        "DeleteNode"
657    }
658}
659
660/// Operator that deletes edges.
661pub struct DeleteEdgeOperator {
662    /// The graph store to modify.
663    store: Arc<dyn GraphStoreMut>,
664    /// Input operator.
665    input: Box<dyn Operator>,
666    /// Column index for the edge to delete.
667    edge_column: usize,
668    /// Output schema.
669    output_schema: Vec<LogicalType>,
670    /// Epoch for MVCC versioning.
671    viewing_epoch: Option<EpochId>,
672    /// Transaction ID for MVCC versioning.
673    transaction_id: Option<TransactionId>,
674}
675
676impl DeleteEdgeOperator {
677    /// Creates a new edge deletion operator.
678    pub fn new(
679        store: Arc<dyn GraphStoreMut>,
680        input: Box<dyn Operator>,
681        edge_column: usize,
682        output_schema: Vec<LogicalType>,
683    ) -> Self {
684        Self {
685            store,
686            input,
687            edge_column,
688            output_schema,
689            viewing_epoch: None,
690            transaction_id: None,
691        }
692    }
693
694    /// Sets the transaction context for MVCC versioning.
695    pub fn with_transaction_context(
696        mut self,
697        epoch: EpochId,
698        transaction_id: Option<TransactionId>,
699    ) -> Self {
700        self.viewing_epoch = Some(epoch);
701        self.transaction_id = transaction_id;
702        self
703    }
704}
705
706impl Operator for DeleteEdgeOperator {
707    fn next(&mut self) -> OperatorResult {
708        // Get transaction context for versioned deletion
709        let epoch = self
710            .viewing_epoch
711            .unwrap_or_else(|| self.store.current_epoch());
712        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
713
714        if let Some(chunk) = self.input.next()? {
715            let mut deleted_count = 0;
716
717            for row in chunk.selected_indices() {
718                let edge_val = chunk
719                    .column(self.edge_column)
720                    .and_then(|c| c.get_value(row))
721                    .ok_or_else(|| {
722                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
723                    })?;
724
725                let edge_id = match edge_val {
726                    Value::Int64(id) => EdgeId(id as u64),
727                    _ => {
728                        return Err(OperatorError::TypeMismatch {
729                            expected: "Int64 (edge ID)".to_string(),
730                            found: format!("{edge_val:?}"),
731                        });
732                    }
733                };
734
735                // Delete the edge with MVCC versioning
736                if self.store.delete_edge_versioned(edge_id, epoch, tx) {
737                    deleted_count += 1;
738                }
739            }
740
741            // Return a chunk with the delete count
742            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
743            if let Some(dst) = builder.column_mut(0) {
744                dst.push_value(Value::Int64(deleted_count));
745            }
746            builder.advance_row();
747
748            return Ok(Some(builder.finish()));
749        }
750        Ok(None)
751    }
752
753    fn reset(&mut self) {
754        self.input.reset();
755    }
756
757    fn name(&self) -> &'static str {
758        "DeleteEdge"
759    }
760}
761
762/// Operator that adds labels to nodes.
763pub struct AddLabelOperator {
764    /// The graph store.
765    store: Arc<dyn GraphStoreMut>,
766    /// Child operator providing nodes.
767    input: Box<dyn Operator>,
768    /// Column index containing node IDs.
769    node_column: usize,
770    /// Labels to add.
771    labels: Vec<String>,
772    /// Output schema.
773    output_schema: Vec<LogicalType>,
774}
775
776impl AddLabelOperator {
777    /// Creates a new add label operator.
778    pub fn new(
779        store: Arc<dyn GraphStoreMut>,
780        input: Box<dyn Operator>,
781        node_column: usize,
782        labels: Vec<String>,
783        output_schema: Vec<LogicalType>,
784    ) -> Self {
785        Self {
786            store,
787            input,
788            node_column,
789            labels,
790            output_schema,
791        }
792    }
793}
794
795impl Operator for AddLabelOperator {
796    fn next(&mut self) -> OperatorResult {
797        if let Some(chunk) = self.input.next()? {
798            let mut updated_count = 0;
799
800            for row in chunk.selected_indices() {
801                let node_val = chunk
802                    .column(self.node_column)
803                    .and_then(|c| c.get_value(row))
804                    .ok_or_else(|| {
805                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
806                    })?;
807
808                let node_id = match node_val {
809                    Value::Int64(id) => NodeId(id as u64),
810                    _ => {
811                        return Err(OperatorError::TypeMismatch {
812                            expected: "Int64 (node ID)".to_string(),
813                            found: format!("{node_val:?}"),
814                        });
815                    }
816                };
817
818                // Add all labels
819                for label in &self.labels {
820                    if self.store.add_label(node_id, label) {
821                        updated_count += 1;
822                    }
823                }
824            }
825
826            // Return a chunk with the update count
827            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
828            if let Some(dst) = builder.column_mut(0) {
829                dst.push_value(Value::Int64(updated_count));
830            }
831            builder.advance_row();
832
833            return Ok(Some(builder.finish()));
834        }
835        Ok(None)
836    }
837
838    fn reset(&mut self) {
839        self.input.reset();
840    }
841
842    fn name(&self) -> &'static str {
843        "AddLabel"
844    }
845}
846
847/// Operator that removes labels from nodes.
848pub struct RemoveLabelOperator {
849    /// The graph store.
850    store: Arc<dyn GraphStoreMut>,
851    /// Child operator providing nodes.
852    input: Box<dyn Operator>,
853    /// Column index containing node IDs.
854    node_column: usize,
855    /// Labels to remove.
856    labels: Vec<String>,
857    /// Output schema.
858    output_schema: Vec<LogicalType>,
859}
860
861impl RemoveLabelOperator {
862    /// Creates a new remove label operator.
863    pub fn new(
864        store: Arc<dyn GraphStoreMut>,
865        input: Box<dyn Operator>,
866        node_column: usize,
867        labels: Vec<String>,
868        output_schema: Vec<LogicalType>,
869    ) -> Self {
870        Self {
871            store,
872            input,
873            node_column,
874            labels,
875            output_schema,
876        }
877    }
878}
879
880impl Operator for RemoveLabelOperator {
881    fn next(&mut self) -> OperatorResult {
882        if let Some(chunk) = self.input.next()? {
883            let mut updated_count = 0;
884
885            for row in chunk.selected_indices() {
886                let node_val = chunk
887                    .column(self.node_column)
888                    .and_then(|c| c.get_value(row))
889                    .ok_or_else(|| {
890                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
891                    })?;
892
893                let node_id = match node_val {
894                    Value::Int64(id) => NodeId(id as u64),
895                    _ => {
896                        return Err(OperatorError::TypeMismatch {
897                            expected: "Int64 (node ID)".to_string(),
898                            found: format!("{node_val:?}"),
899                        });
900                    }
901                };
902
903                // Remove all labels
904                for label in &self.labels {
905                    if self.store.remove_label(node_id, label) {
906                        updated_count += 1;
907                    }
908                }
909            }
910
911            // Return a chunk with the update count
912            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
913            if let Some(dst) = builder.column_mut(0) {
914                dst.push_value(Value::Int64(updated_count));
915            }
916            builder.advance_row();
917
918            return Ok(Some(builder.finish()));
919        }
920        Ok(None)
921    }
922
923    fn reset(&mut self) {
924        self.input.reset();
925    }
926
927    fn name(&self) -> &'static str {
928        "RemoveLabel"
929    }
930}
931
932/// Operator that sets properties on nodes or edges.
933///
934/// This operator reads node/edge IDs from a column and sets the
935/// specified properties on each entity.
936pub struct SetPropertyOperator {
937    /// The graph store.
938    store: Arc<dyn GraphStoreMut>,
939    /// Child operator providing entities.
940    input: Box<dyn Operator>,
941    /// Column index containing entity IDs (node or edge).
942    entity_column: usize,
943    /// Whether the entity is an edge (false = node).
944    is_edge: bool,
945    /// Properties to set (name -> source).
946    properties: Vec<(String, PropertySource)>,
947    /// Output schema.
948    output_schema: Vec<LogicalType>,
949    /// Whether to replace all properties (true) or merge (false) for map assignments.
950    replace: bool,
951    /// Optional constraint validator for schema enforcement.
952    validator: Option<Arc<dyn ConstraintValidator>>,
953    /// Entity labels (for node constraint validation).
954    labels: Vec<String>,
955    /// Edge type (for edge constraint validation).
956    edge_type_name: Option<String>,
957}
958
959impl SetPropertyOperator {
960    /// Creates a new set property operator for nodes.
961    pub fn new_for_node(
962        store: Arc<dyn GraphStoreMut>,
963        input: Box<dyn Operator>,
964        node_column: usize,
965        properties: Vec<(String, PropertySource)>,
966        output_schema: Vec<LogicalType>,
967    ) -> Self {
968        Self {
969            store,
970            input,
971            entity_column: node_column,
972            is_edge: false,
973            properties,
974            output_schema,
975            replace: false,
976            validator: None,
977            labels: Vec::new(),
978            edge_type_name: None,
979        }
980    }
981
982    /// Creates a new set property operator for edges.
983    pub fn new_for_edge(
984        store: Arc<dyn GraphStoreMut>,
985        input: Box<dyn Operator>,
986        edge_column: usize,
987        properties: Vec<(String, PropertySource)>,
988        output_schema: Vec<LogicalType>,
989    ) -> Self {
990        Self {
991            store,
992            input,
993            entity_column: edge_column,
994            is_edge: true,
995            properties,
996            output_schema,
997            replace: false,
998            validator: None,
999            labels: Vec::new(),
1000            edge_type_name: None,
1001        }
1002    }
1003
1004    /// Sets whether this operator replaces all properties (for map assignment).
1005    pub fn with_replace(mut self, replace: bool) -> Self {
1006        self.replace = replace;
1007        self
1008    }
1009
1010    /// Sets the constraint validator for schema enforcement.
1011    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1012        self.validator = Some(validator);
1013        self
1014    }
1015
1016    /// Sets the entity labels (for node constraint validation).
1017    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1018        self.labels = labels;
1019        self
1020    }
1021
1022    /// Sets the edge type name (for edge constraint validation).
1023    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1024        self.edge_type_name = Some(edge_type);
1025        self
1026    }
1027}
1028
1029impl Operator for SetPropertyOperator {
1030    fn next(&mut self) -> OperatorResult {
1031        if let Some(chunk) = self.input.next()? {
1032            let mut builder =
1033                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1034
1035            for row in chunk.selected_indices() {
1036                let entity_val = chunk
1037                    .column(self.entity_column)
1038                    .and_then(|c| c.get_value(row))
1039                    .ok_or_else(|| {
1040                        OperatorError::ColumnNotFound(format!(
1041                            "entity column {}",
1042                            self.entity_column
1043                        ))
1044                    })?;
1045
1046                let entity_id = match entity_val {
1047                    Value::Int64(id) => id as u64,
1048                    _ => {
1049                        return Err(OperatorError::TypeMismatch {
1050                            expected: "Int64 (entity ID)".to_string(),
1051                            found: format!("{entity_val:?}"),
1052                        });
1053                    }
1054                };
1055
1056                // Resolve all property values
1057                let resolved_props: Vec<(String, Value)> = self
1058                    .properties
1059                    .iter()
1060                    .map(|(name, source)| {
1061                        let value =
1062                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1063                        (name.clone(), value)
1064                    })
1065                    .collect();
1066
1067                // Validate constraints before writing
1068                if let Some(ref validator) = self.validator {
1069                    if self.is_edge {
1070                        if let Some(ref et) = self.edge_type_name {
1071                            for (name, value) in &resolved_props {
1072                                validator.validate_edge_property(et, name, value)?;
1073                            }
1074                        }
1075                    } else {
1076                        for (name, value) in &resolved_props {
1077                            validator.validate_node_property(&self.labels, name, value)?;
1078                            validator.check_unique_node_property(&self.labels, name, value)?;
1079                        }
1080                    }
1081                }
1082
1083                // Write all properties
1084                for (prop_name, value) in resolved_props {
1085                    if prop_name == "*" {
1086                        // Map assignment: value should be a Map
1087                        if let Value::Map(map) = value {
1088                            if self.replace {
1089                                // Replace: remove all existing properties first
1090                                if self.is_edge {
1091                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1092                                        let keys: Vec<String> = edge
1093                                            .properties
1094                                            .iter()
1095                                            .map(|(k, _)| k.as_str().to_string())
1096                                            .collect();
1097                                        for key in keys {
1098                                            self.store
1099                                                .remove_edge_property(EdgeId(entity_id), &key);
1100                                        }
1101                                    }
1102                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1103                                    let keys: Vec<String> = node
1104                                        .properties
1105                                        .iter()
1106                                        .map(|(k, _)| k.as_str().to_string())
1107                                        .collect();
1108                                    for key in keys {
1109                                        self.store.remove_node_property(NodeId(entity_id), &key);
1110                                    }
1111                                }
1112                            }
1113                            // Set each map entry
1114                            for (key, val) in map.iter() {
1115                                if self.is_edge {
1116                                    self.store.set_edge_property(
1117                                        EdgeId(entity_id),
1118                                        key.as_str(),
1119                                        val.clone(),
1120                                    );
1121                                } else {
1122                                    self.store.set_node_property(
1123                                        NodeId(entity_id),
1124                                        key.as_str(),
1125                                        val.clone(),
1126                                    );
1127                                }
1128                            }
1129                        }
1130                    } else if self.is_edge {
1131                        self.store
1132                            .set_edge_property(EdgeId(entity_id), &prop_name, value);
1133                    } else {
1134                        self.store
1135                            .set_node_property(NodeId(entity_id), &prop_name, value);
1136                    }
1137                }
1138
1139                // Copy input columns to output
1140                for col_idx in 0..chunk.column_count() {
1141                    if let (Some(src), Some(dst)) =
1142                        (chunk.column(col_idx), builder.column_mut(col_idx))
1143                    {
1144                        if let Some(val) = src.get_value(row) {
1145                            dst.push_value(val);
1146                        } else {
1147                            dst.push_value(Value::Null);
1148                        }
1149                    }
1150                }
1151
1152                builder.advance_row();
1153            }
1154
1155            return Ok(Some(builder.finish()));
1156        }
1157        Ok(None)
1158    }
1159
1160    fn reset(&mut self) {
1161        self.input.reset();
1162    }
1163
1164    fn name(&self) -> &'static str {
1165        "SetProperty"
1166    }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use super::*;
1172    use crate::execution::DataChunk;
1173    use crate::execution::chunk::DataChunkBuilder;
1174    use crate::graph::lpg::LpgStore;
1175
1176    // ── Helpers ────────────────────────────────────────────────────
1177
1178    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1179        Arc::new(LpgStore::new().unwrap())
1180    }
1181
1182    struct MockInput {
1183        chunk: Option<DataChunk>,
1184    }
1185
1186    impl MockInput {
1187        fn boxed(chunk: DataChunk) -> Box<Self> {
1188            Box::new(Self { chunk: Some(chunk) })
1189        }
1190    }
1191
1192    impl Operator for MockInput {
1193        fn next(&mut self) -> OperatorResult {
1194            Ok(self.chunk.take())
1195        }
1196        fn reset(&mut self) {}
1197        fn name(&self) -> &'static str {
1198            "MockInput"
1199        }
1200    }
1201
1202    struct EmptyInput;
1203    impl Operator for EmptyInput {
1204        fn next(&mut self) -> OperatorResult {
1205            Ok(None)
1206        }
1207        fn reset(&mut self) {}
1208        fn name(&self) -> &'static str {
1209            "EmptyInput"
1210        }
1211    }
1212
1213    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1214        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1215        for id in ids {
1216            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1217            builder.advance_row();
1218        }
1219        builder.finish()
1220    }
1221
1222    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1223        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1224        for id in ids {
1225            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1226            builder.advance_row();
1227        }
1228        builder.finish()
1229    }
1230
1231    // ── CreateNodeOperator ──────────────────────────────────────
1232
1233    #[test]
1234    fn test_create_node_standalone() {
1235        let store = create_test_store();
1236
1237        let mut op = CreateNodeOperator::new(
1238            Arc::clone(&store),
1239            None,
1240            vec!["Person".to_string()],
1241            vec![(
1242                "name".to_string(),
1243                PropertySource::Constant(Value::String("Alix".into())),
1244            )],
1245            vec![LogicalType::Int64],
1246            0,
1247        );
1248
1249        let chunk = op.next().unwrap().unwrap();
1250        assert_eq!(chunk.row_count(), 1);
1251
1252        // Second call should return None (standalone executes once)
1253        assert!(op.next().unwrap().is_none());
1254
1255        assert_eq!(store.node_count(), 1);
1256    }
1257
1258    #[test]
1259    fn test_create_edge() {
1260        let store = create_test_store();
1261
1262        let node1 = store.create_node(&["Person"]);
1263        let node2 = store.create_node(&["Person"]);
1264
1265        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1266        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1267        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1268        builder.advance_row();
1269
1270        let mut op = CreateEdgeOperator::new(
1271            Arc::clone(&store),
1272            MockInput::boxed(builder.finish()),
1273            0,
1274            1,
1275            "KNOWS".to_string(),
1276            vec![LogicalType::Int64, LogicalType::Int64],
1277        );
1278
1279        let _chunk = op.next().unwrap().unwrap();
1280        assert_eq!(store.edge_count(), 1);
1281    }
1282
1283    #[test]
1284    fn test_delete_node() {
1285        let store = create_test_store();
1286
1287        let node_id = store.create_node(&["Person"]);
1288        assert_eq!(store.node_count(), 1);
1289
1290        let mut op = DeleteNodeOperator::new(
1291            Arc::clone(&store),
1292            MockInput::boxed(node_id_chunk(&[node_id])),
1293            0,
1294            vec![LogicalType::Int64],
1295            false,
1296        );
1297
1298        let chunk = op.next().unwrap().unwrap();
1299        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1300        assert_eq!(deleted, 1);
1301        assert_eq!(store.node_count(), 0);
1302    }
1303
1304    // ── DeleteEdgeOperator ───────────────────────────────────────
1305
1306    #[test]
1307    fn test_delete_edge() {
1308        let store = create_test_store();
1309
1310        let n1 = store.create_node(&["Person"]);
1311        let n2 = store.create_node(&["Person"]);
1312        let eid = store.create_edge(n1, n2, "KNOWS");
1313        assert_eq!(store.edge_count(), 1);
1314
1315        let mut op = DeleteEdgeOperator::new(
1316            Arc::clone(&store),
1317            MockInput::boxed(edge_id_chunk(&[eid])),
1318            0,
1319            vec![LogicalType::Int64],
1320        );
1321
1322        let chunk = op.next().unwrap().unwrap();
1323        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1324        assert_eq!(deleted, 1);
1325        assert_eq!(store.edge_count(), 0);
1326    }
1327
1328    #[test]
1329    fn test_delete_edge_no_input_returns_none() {
1330        let store = create_test_store();
1331
1332        let mut op = DeleteEdgeOperator::new(
1333            Arc::clone(&store),
1334            Box::new(EmptyInput),
1335            0,
1336            vec![LogicalType::Int64],
1337        );
1338
1339        assert!(op.next().unwrap().is_none());
1340    }
1341
1342    #[test]
1343    fn test_delete_multiple_edges() {
1344        let store = create_test_store();
1345
1346        let n1 = store.create_node(&["N"]);
1347        let n2 = store.create_node(&["N"]);
1348        let e1 = store.create_edge(n1, n2, "R");
1349        let e2 = store.create_edge(n2, n1, "S");
1350        assert_eq!(store.edge_count(), 2);
1351
1352        let mut op = DeleteEdgeOperator::new(
1353            Arc::clone(&store),
1354            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1355            0,
1356            vec![LogicalType::Int64],
1357        );
1358
1359        let chunk = op.next().unwrap().unwrap();
1360        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1361        assert_eq!(deleted, 2);
1362        assert_eq!(store.edge_count(), 0);
1363    }
1364
1365    // ── DeleteNodeOperator with DETACH ───────────────────────────
1366
1367    #[test]
1368    fn test_delete_node_detach() {
1369        let store = create_test_store();
1370
1371        let n1 = store.create_node(&["Person"]);
1372        let n2 = store.create_node(&["Person"]);
1373        store.create_edge(n1, n2, "KNOWS");
1374        store.create_edge(n2, n1, "FOLLOWS");
1375        assert_eq!(store.edge_count(), 2);
1376
1377        let mut op = DeleteNodeOperator::new(
1378            Arc::clone(&store),
1379            MockInput::boxed(node_id_chunk(&[n1])),
1380            0,
1381            vec![LogicalType::Int64],
1382            true, // detach = true
1383        );
1384
1385        let chunk = op.next().unwrap().unwrap();
1386        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1387        assert_eq!(deleted, 1);
1388        assert_eq!(store.node_count(), 1);
1389        assert_eq!(store.edge_count(), 0); // edges detached
1390    }
1391
1392    // ── AddLabelOperator ─────────────────────────────────────────
1393
1394    #[test]
1395    fn test_add_label() {
1396        let store = create_test_store();
1397
1398        let node = store.create_node(&["Person"]);
1399
1400        let mut op = AddLabelOperator::new(
1401            Arc::clone(&store),
1402            MockInput::boxed(node_id_chunk(&[node])),
1403            0,
1404            vec!["Employee".to_string()],
1405            vec![LogicalType::Int64],
1406        );
1407
1408        let chunk = op.next().unwrap().unwrap();
1409        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1410        assert_eq!(updated, 1);
1411
1412        // Verify label was added
1413        let node_data = store.get_node(node).unwrap();
1414        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1415        assert!(labels.contains(&"Person"));
1416        assert!(labels.contains(&"Employee"));
1417    }
1418
1419    #[test]
1420    fn test_add_multiple_labels() {
1421        let store = create_test_store();
1422
1423        let node = store.create_node(&["Base"]);
1424
1425        let mut op = AddLabelOperator::new(
1426            Arc::clone(&store),
1427            MockInput::boxed(node_id_chunk(&[node])),
1428            0,
1429            vec!["LabelA".to_string(), "LabelB".to_string()],
1430            vec![LogicalType::Int64],
1431        );
1432
1433        let chunk = op.next().unwrap().unwrap();
1434        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1435        assert_eq!(updated, 2); // 2 labels added
1436
1437        let node_data = store.get_node(node).unwrap();
1438        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1439        assert!(labels.contains(&"LabelA"));
1440        assert!(labels.contains(&"LabelB"));
1441    }
1442
1443    #[test]
1444    fn test_add_label_no_input_returns_none() {
1445        let store = create_test_store();
1446
1447        let mut op = AddLabelOperator::new(
1448            Arc::clone(&store),
1449            Box::new(EmptyInput),
1450            0,
1451            vec!["Foo".to_string()],
1452            vec![LogicalType::Int64],
1453        );
1454
1455        assert!(op.next().unwrap().is_none());
1456    }
1457
1458    // ── RemoveLabelOperator ──────────────────────────────────────
1459
1460    #[test]
1461    fn test_remove_label() {
1462        let store = create_test_store();
1463
1464        let node = store.create_node(&["Person", "Employee"]);
1465
1466        let mut op = RemoveLabelOperator::new(
1467            Arc::clone(&store),
1468            MockInput::boxed(node_id_chunk(&[node])),
1469            0,
1470            vec!["Employee".to_string()],
1471            vec![LogicalType::Int64],
1472        );
1473
1474        let chunk = op.next().unwrap().unwrap();
1475        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1476        assert_eq!(updated, 1);
1477
1478        // Verify label was removed
1479        let node_data = store.get_node(node).unwrap();
1480        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1481        assert!(labels.contains(&"Person"));
1482        assert!(!labels.contains(&"Employee"));
1483    }
1484
1485    #[test]
1486    fn test_remove_nonexistent_label() {
1487        let store = create_test_store();
1488
1489        let node = store.create_node(&["Person"]);
1490
1491        let mut op = RemoveLabelOperator::new(
1492            Arc::clone(&store),
1493            MockInput::boxed(node_id_chunk(&[node])),
1494            0,
1495            vec!["NonExistent".to_string()],
1496            vec![LogicalType::Int64],
1497        );
1498
1499        let chunk = op.next().unwrap().unwrap();
1500        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1501        assert_eq!(updated, 0); // nothing removed
1502    }
1503
1504    // ── SetPropertyOperator ──────────────────────────────────────
1505
1506    #[test]
1507    fn test_set_node_property_constant() {
1508        let store = create_test_store();
1509
1510        let node = store.create_node(&["Person"]);
1511
1512        let mut op = SetPropertyOperator::new_for_node(
1513            Arc::clone(&store),
1514            MockInput::boxed(node_id_chunk(&[node])),
1515            0,
1516            vec![(
1517                "name".to_string(),
1518                PropertySource::Constant(Value::String("Alix".into())),
1519            )],
1520            vec![LogicalType::Int64],
1521        );
1522
1523        let chunk = op.next().unwrap().unwrap();
1524        assert_eq!(chunk.row_count(), 1);
1525
1526        // Verify property was set
1527        let node_data = store.get_node(node).unwrap();
1528        assert_eq!(
1529            node_data
1530                .properties
1531                .get(&grafeo_common::types::PropertyKey::new("name")),
1532            Some(&Value::String("Alix".into()))
1533        );
1534    }
1535
1536    #[test]
1537    fn test_set_node_property_from_column() {
1538        let store = create_test_store();
1539
1540        let node = store.create_node(&["Person"]);
1541
1542        // Input: column 0 = node ID, column 1 = property value
1543        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1544        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1545        builder
1546            .column_mut(1)
1547            .unwrap()
1548            .push_value(Value::String("Gus".into()));
1549        builder.advance_row();
1550
1551        let mut op = SetPropertyOperator::new_for_node(
1552            Arc::clone(&store),
1553            MockInput::boxed(builder.finish()),
1554            0,
1555            vec![("name".to_string(), PropertySource::Column(1))],
1556            vec![LogicalType::Int64, LogicalType::String],
1557        );
1558
1559        let chunk = op.next().unwrap().unwrap();
1560        assert_eq!(chunk.row_count(), 1);
1561
1562        let node_data = store.get_node(node).unwrap();
1563        assert_eq!(
1564            node_data
1565                .properties
1566                .get(&grafeo_common::types::PropertyKey::new("name")),
1567            Some(&Value::String("Gus".into()))
1568        );
1569    }
1570
1571    #[test]
1572    fn test_set_edge_property() {
1573        let store = create_test_store();
1574
1575        let n1 = store.create_node(&["N"]);
1576        let n2 = store.create_node(&["N"]);
1577        let eid = store.create_edge(n1, n2, "KNOWS");
1578
1579        let mut op = SetPropertyOperator::new_for_edge(
1580            Arc::clone(&store),
1581            MockInput::boxed(edge_id_chunk(&[eid])),
1582            0,
1583            vec![(
1584                "weight".to_string(),
1585                PropertySource::Constant(Value::Float64(0.75)),
1586            )],
1587            vec![LogicalType::Int64],
1588        );
1589
1590        let chunk = op.next().unwrap().unwrap();
1591        assert_eq!(chunk.row_count(), 1);
1592
1593        let edge_data = store.get_edge(eid).unwrap();
1594        assert_eq!(
1595            edge_data
1596                .properties
1597                .get(&grafeo_common::types::PropertyKey::new("weight")),
1598            Some(&Value::Float64(0.75))
1599        );
1600    }
1601
1602    #[test]
1603    fn test_set_multiple_properties() {
1604        let store = create_test_store();
1605
1606        let node = store.create_node(&["Person"]);
1607
1608        let mut op = SetPropertyOperator::new_for_node(
1609            Arc::clone(&store),
1610            MockInput::boxed(node_id_chunk(&[node])),
1611            0,
1612            vec![
1613                (
1614                    "name".to_string(),
1615                    PropertySource::Constant(Value::String("Alix".into())),
1616                ),
1617                (
1618                    "age".to_string(),
1619                    PropertySource::Constant(Value::Int64(30)),
1620                ),
1621            ],
1622            vec![LogicalType::Int64],
1623        );
1624
1625        op.next().unwrap().unwrap();
1626
1627        let node_data = store.get_node(node).unwrap();
1628        assert_eq!(
1629            node_data
1630                .properties
1631                .get(&grafeo_common::types::PropertyKey::new("name")),
1632            Some(&Value::String("Alix".into()))
1633        );
1634        assert_eq!(
1635            node_data
1636                .properties
1637                .get(&grafeo_common::types::PropertyKey::new("age")),
1638            Some(&Value::Int64(30))
1639        );
1640    }
1641
1642    #[test]
1643    fn test_set_property_no_input_returns_none() {
1644        let store = create_test_store();
1645
1646        let mut op = SetPropertyOperator::new_for_node(
1647            Arc::clone(&store),
1648            Box::new(EmptyInput),
1649            0,
1650            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1651            vec![LogicalType::Int64],
1652        );
1653
1654        assert!(op.next().unwrap().is_none());
1655    }
1656
1657    // ── Error paths ──────────────────────────────────────────────
1658
1659    #[test]
1660    fn test_delete_node_without_detach_errors_when_edges_exist() {
1661        let store = create_test_store();
1662
1663        let n1 = store.create_node(&["Person"]);
1664        let n2 = store.create_node(&["Person"]);
1665        store.create_edge(n1, n2, "KNOWS");
1666
1667        let mut op = DeleteNodeOperator::new(
1668            Arc::clone(&store),
1669            MockInput::boxed(node_id_chunk(&[n1])),
1670            0,
1671            vec![LogicalType::Int64],
1672            false, // no detach
1673        );
1674
1675        let err = op.next().unwrap_err();
1676        match err {
1677            OperatorError::ConstraintViolation(msg) => {
1678                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1679            }
1680            other => panic!("expected ConstraintViolation, got {other:?}"),
1681        }
1682        // Node should still exist
1683        assert_eq!(store.node_count(), 2);
1684    }
1685
1686    // ── CreateNodeOperator with input ───────────────────────────
1687
1688    #[test]
1689    fn test_create_node_with_input_operator() {
1690        let store = create_test_store();
1691
1692        // Seed node to provide input rows
1693        let existing = store.create_node(&["Seed"]);
1694
1695        let mut op = CreateNodeOperator::new(
1696            Arc::clone(&store),
1697            Some(MockInput::boxed(node_id_chunk(&[existing]))),
1698            vec!["Created".to_string()],
1699            vec![(
1700                "source".to_string(),
1701                PropertySource::Constant(Value::String("from_input".into())),
1702            )],
1703            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
1704            1,                                            // output column for new node ID
1705        );
1706
1707        let chunk = op.next().unwrap().unwrap();
1708        assert_eq!(chunk.row_count(), 1);
1709
1710        // Should have created one new node (2 total: Seed + Created)
1711        assert_eq!(store.node_count(), 2);
1712
1713        // Exhausted
1714        assert!(op.next().unwrap().is_none());
1715    }
1716
1717    // ── CreateEdgeOperator with properties and output column ────
1718
1719    #[test]
1720    fn test_create_edge_with_properties_and_output_column() {
1721        let store = create_test_store();
1722
1723        let n1 = store.create_node(&["Person"]);
1724        let n2 = store.create_node(&["Person"]);
1725
1726        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1727        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1728        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1729        builder.advance_row();
1730
1731        let mut op = CreateEdgeOperator::new(
1732            Arc::clone(&store),
1733            MockInput::boxed(builder.finish()),
1734            0,
1735            1,
1736            "KNOWS".to_string(),
1737            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1738        )
1739        .with_properties(vec![(
1740            "since".to_string(),
1741            PropertySource::Constant(Value::Int64(2024)),
1742        )])
1743        .with_output_column(2);
1744
1745        let chunk = op.next().unwrap().unwrap();
1746        assert_eq!(chunk.row_count(), 1);
1747        assert_eq!(store.edge_count(), 1);
1748
1749        // Verify the output chunk contains the edge ID in column 2
1750        let edge_id_raw = chunk
1751            .column(2)
1752            .and_then(|c| c.get_int64(0))
1753            .expect("edge ID should be in output column 2");
1754        let edge_id = EdgeId(edge_id_raw as u64);
1755
1756        // Verify the edge has the property
1757        let edge = store.get_edge(edge_id).expect("edge should exist");
1758        assert_eq!(
1759            edge.properties
1760                .get(&grafeo_common::types::PropertyKey::new("since")),
1761            Some(&Value::Int64(2024))
1762        );
1763    }
1764
1765    // ── SetPropertyOperator with map replacement ────────────────
1766
1767    #[test]
1768    fn test_set_property_map_replace() {
1769        use std::collections::BTreeMap;
1770
1771        let store = create_test_store();
1772
1773        let node = store.create_node(&["Person"]);
1774        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1775
1776        let mut map = BTreeMap::new();
1777        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1778
1779        let mut op = SetPropertyOperator::new_for_node(
1780            Arc::clone(&store),
1781            MockInput::boxed(node_id_chunk(&[node])),
1782            0,
1783            vec![(
1784                "*".to_string(),
1785                PropertySource::Constant(Value::Map(Arc::new(map))),
1786            )],
1787            vec![LogicalType::Int64],
1788        )
1789        .with_replace(true);
1790
1791        op.next().unwrap().unwrap();
1792
1793        let node_data = store.get_node(node).unwrap();
1794        // Old property should be gone
1795        assert!(
1796            node_data
1797                .properties
1798                .get(&PropertyKey::new("old_prop"))
1799                .is_none()
1800        );
1801        // New property should exist
1802        assert_eq!(
1803            node_data.properties.get(&PropertyKey::new("new_key")),
1804            Some(&Value::String("new_val".into()))
1805        );
1806    }
1807
1808    // ── SetPropertyOperator with map merge (no replace) ─────────
1809
1810    #[test]
1811    fn test_set_property_map_merge() {
1812        use std::collections::BTreeMap;
1813
1814        let store = create_test_store();
1815
1816        let node = store.create_node(&["Person"]);
1817        store.set_node_property(node, "existing", Value::Int64(42));
1818
1819        let mut map = BTreeMap::new();
1820        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1821
1822        let mut op = SetPropertyOperator::new_for_node(
1823            Arc::clone(&store),
1824            MockInput::boxed(node_id_chunk(&[node])),
1825            0,
1826            vec![(
1827                "*".to_string(),
1828                PropertySource::Constant(Value::Map(Arc::new(map))),
1829            )],
1830            vec![LogicalType::Int64],
1831        ); // replace defaults to false
1832
1833        op.next().unwrap().unwrap();
1834
1835        let node_data = store.get_node(node).unwrap();
1836        // Existing property should still be there
1837        assert_eq!(
1838            node_data.properties.get(&PropertyKey::new("existing")),
1839            Some(&Value::Int64(42))
1840        );
1841        // New property should also exist
1842        assert_eq!(
1843            node_data.properties.get(&PropertyKey::new("added")),
1844            Some(&Value::String("hello".into()))
1845        );
1846    }
1847
1848    // ── PropertySource::PropertyAccess ──────────────────────────
1849
1850    #[test]
1851    fn test_property_source_property_access() {
1852        let store = create_test_store();
1853
1854        let source_node = store.create_node(&["Source"]);
1855        store.set_node_property(source_node, "name", Value::String("Alix".into()));
1856
1857        let target_node = store.create_node(&["Target"]);
1858
1859        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
1860        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1861        builder.column_mut(0).unwrap().push_node_id(source_node);
1862        builder
1863            .column_mut(1)
1864            .unwrap()
1865            .push_int64(target_node.0 as i64);
1866        builder.advance_row();
1867
1868        let mut op = SetPropertyOperator::new_for_node(
1869            Arc::clone(&store),
1870            MockInput::boxed(builder.finish()),
1871            1, // entity column = target node
1872            vec![(
1873                "copied_name".to_string(),
1874                PropertySource::PropertyAccess {
1875                    column: 0,
1876                    property: "name".to_string(),
1877                },
1878            )],
1879            vec![LogicalType::Node, LogicalType::Int64],
1880        );
1881
1882        op.next().unwrap().unwrap();
1883
1884        let target_data = store.get_node(target_node).unwrap();
1885        assert_eq!(
1886            target_data.properties.get(&PropertyKey::new("copied_name")),
1887            Some(&Value::String("Alix".into()))
1888        );
1889    }
1890
1891    // ── ConstraintValidator integration ─────────────────────────
1892
1893    #[test]
1894    fn test_create_node_with_constraint_validator() {
1895        let store = create_test_store();
1896
1897        struct RejectAgeValidator;
1898        impl ConstraintValidator for RejectAgeValidator {
1899            fn validate_node_property(
1900                &self,
1901                _labels: &[String],
1902                key: &str,
1903                _value: &Value,
1904            ) -> Result<(), OperatorError> {
1905                if key == "forbidden" {
1906                    return Err(OperatorError::ConstraintViolation(
1907                        "property 'forbidden' is not allowed".to_string(),
1908                    ));
1909                }
1910                Ok(())
1911            }
1912            fn validate_node_complete(
1913                &self,
1914                _labels: &[String],
1915                _properties: &[(String, Value)],
1916            ) -> Result<(), OperatorError> {
1917                Ok(())
1918            }
1919            fn check_unique_node_property(
1920                &self,
1921                _labels: &[String],
1922                _key: &str,
1923                _value: &Value,
1924            ) -> Result<(), OperatorError> {
1925                Ok(())
1926            }
1927            fn validate_edge_property(
1928                &self,
1929                _edge_type: &str,
1930                _key: &str,
1931                _value: &Value,
1932            ) -> Result<(), OperatorError> {
1933                Ok(())
1934            }
1935            fn validate_edge_complete(
1936                &self,
1937                _edge_type: &str,
1938                _properties: &[(String, Value)],
1939            ) -> Result<(), OperatorError> {
1940                Ok(())
1941            }
1942        }
1943
1944        // Valid property should succeed
1945        let mut op = CreateNodeOperator::new(
1946            Arc::clone(&store),
1947            None,
1948            vec!["Thing".to_string()],
1949            vec![(
1950                "name".to_string(),
1951                PropertySource::Constant(Value::String("ok".into())),
1952            )],
1953            vec![LogicalType::Int64],
1954            0,
1955        )
1956        .with_validator(Arc::new(RejectAgeValidator));
1957
1958        assert!(op.next().is_ok());
1959        assert_eq!(store.node_count(), 1);
1960
1961        // Forbidden property should fail
1962        let mut op = CreateNodeOperator::new(
1963            Arc::clone(&store),
1964            None,
1965            vec!["Thing".to_string()],
1966            vec![(
1967                "forbidden".to_string(),
1968                PropertySource::Constant(Value::Int64(1)),
1969            )],
1970            vec![LogicalType::Int64],
1971            0,
1972        )
1973        .with_validator(Arc::new(RejectAgeValidator));
1974
1975        let err = op.next().unwrap_err();
1976        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
1977        // Node count should still be 2 (the node is created before validation, but the error
1978        // propagates - this tests the validation logic fires)
1979    }
1980
1981    // ── Reset behavior ──────────────────────────────────────────
1982
1983    #[test]
1984    fn test_create_node_reset_allows_re_execution() {
1985        let store = create_test_store();
1986
1987        let mut op = CreateNodeOperator::new(
1988            Arc::clone(&store),
1989            None,
1990            vec!["Person".to_string()],
1991            vec![],
1992            vec![LogicalType::Int64],
1993            0,
1994        );
1995
1996        // First execution
1997        assert!(op.next().unwrap().is_some());
1998        assert!(op.next().unwrap().is_none());
1999
2000        // Reset and re-execute
2001        op.reset();
2002        assert!(op.next().unwrap().is_some());
2003
2004        assert_eq!(store.node_count(), 2);
2005    }
2006
2007    // ── Operator name() ──────────────────────────────────────────
2008
2009    #[test]
2010    fn test_operator_names() {
2011        let store = create_test_store();
2012
2013        let op = CreateNodeOperator::new(
2014            Arc::clone(&store),
2015            None,
2016            vec![],
2017            vec![],
2018            vec![LogicalType::Int64],
2019            0,
2020        );
2021        assert_eq!(op.name(), "CreateNode");
2022
2023        let op = CreateEdgeOperator::new(
2024            Arc::clone(&store),
2025            Box::new(EmptyInput),
2026            0,
2027            1,
2028            "R".to_string(),
2029            vec![LogicalType::Int64],
2030        );
2031        assert_eq!(op.name(), "CreateEdge");
2032
2033        let op = DeleteNodeOperator::new(
2034            Arc::clone(&store),
2035            Box::new(EmptyInput),
2036            0,
2037            vec![LogicalType::Int64],
2038            false,
2039        );
2040        assert_eq!(op.name(), "DeleteNode");
2041
2042        let op = DeleteEdgeOperator::new(
2043            Arc::clone(&store),
2044            Box::new(EmptyInput),
2045            0,
2046            vec![LogicalType::Int64],
2047        );
2048        assert_eq!(op.name(), "DeleteEdge");
2049
2050        let op = AddLabelOperator::new(
2051            Arc::clone(&store),
2052            Box::new(EmptyInput),
2053            0,
2054            vec!["L".to_string()],
2055            vec![LogicalType::Int64],
2056        );
2057        assert_eq!(op.name(), "AddLabel");
2058
2059        let op = RemoveLabelOperator::new(
2060            Arc::clone(&store),
2061            Box::new(EmptyInput),
2062            0,
2063            vec!["L".to_string()],
2064            vec![LogicalType::Int64],
2065        );
2066        assert_eq!(op.name(), "RemoveLabel");
2067
2068        let op = SetPropertyOperator::new_for_node(
2069            Arc::clone(&store),
2070            Box::new(EmptyInput),
2071            0,
2072            vec![],
2073            vec![LogicalType::Int64],
2074        );
2075        assert_eq!(op.name(), "SetProperty");
2076    }
2077}