Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17/// Trait for validating schema constraints during mutation operations.
18///
19/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
20/// before data is written to the store.
21pub trait ConstraintValidator: Send + Sync {
22    /// Validates a single property value for a node with the given labels.
23    ///
24    /// Checks type compatibility and NOT NULL constraints.
25    fn validate_node_property(
26        &self,
27        labels: &[String],
28        key: &str,
29        value: &Value,
30    ) -> Result<(), OperatorError>;
31
32    /// Validates that all required properties are present after creating a node.
33    ///
34    /// Checks NOT NULL constraints for properties that were not explicitly set.
35    fn validate_node_complete(
36        &self,
37        labels: &[String],
38        properties: &[(String, Value)],
39    ) -> Result<(), OperatorError>;
40
41    /// Checks UNIQUE constraint for a node property value.
42    ///
43    /// Returns an error if a node with the same label already has this value.
44    fn check_unique_node_property(
45        &self,
46        labels: &[String],
47        key: &str,
48        value: &Value,
49    ) -> Result<(), OperatorError>;
50
51    /// Validates a single property value for an edge of the given type.
52    fn validate_edge_property(
53        &self,
54        edge_type: &str,
55        key: &str,
56        value: &Value,
57    ) -> Result<(), OperatorError>;
58
59    /// Validates that all required properties are present after creating an edge.
60    fn validate_edge_complete(
61        &self,
62        edge_type: &str,
63        properties: &[(String, Value)],
64    ) -> Result<(), OperatorError>;
65}
66
67/// Operator that creates new nodes.
68///
69/// For each input row, creates a new node with the specified labels
70/// and properties, then outputs the row with the new node.
71pub struct CreateNodeOperator {
72    /// The graph store to modify.
73    store: Arc<dyn GraphStoreMut>,
74    /// Input operator.
75    input: Option<Box<dyn Operator>>,
76    /// Labels for the new nodes.
77    labels: Vec<String>,
78    /// Properties to set (name -> column index or constant value).
79    properties: Vec<(String, PropertySource)>,
80    /// Output schema.
81    output_schema: Vec<LogicalType>,
82    /// Column index for the created node variable.
83    output_column: usize,
84    /// Whether this operator has been executed (for no-input case).
85    executed: bool,
86    /// Epoch for MVCC versioning.
87    viewing_epoch: Option<EpochId>,
88    /// Transaction ID for MVCC versioning.
89    tx_id: Option<TxId>,
90    /// Optional constraint validator for schema enforcement.
91    validator: Option<Arc<dyn ConstraintValidator>>,
92}
93
94/// Source for a property value.
95#[derive(Debug, Clone)]
96pub enum PropertySource {
97    /// Get value from an input column.
98    Column(usize),
99    /// Use a constant value.
100    Constant(Value),
101    /// Access a named property from a map/node/edge in an input column.
102    PropertyAccess {
103        /// The column containing the map, node ID, or edge ID.
104        column: usize,
105        /// The property name to extract.
106        property: String,
107    },
108}
109
110impl PropertySource {
111    /// Resolves a property value from a data chunk row.
112    pub fn resolve(
113        &self,
114        chunk: &crate::execution::chunk::DataChunk,
115        row: usize,
116        store: &dyn GraphStore,
117    ) -> Value {
118        match self {
119            PropertySource::Column(col_idx) => chunk
120                .column(*col_idx)
121                .and_then(|c| c.get_value(row))
122                .unwrap_or(Value::Null),
123            PropertySource::Constant(v) => v.clone(),
124            PropertySource::PropertyAccess { column, property } => {
125                let Some(col) = chunk.column(*column) else {
126                    return Value::Null;
127                };
128                // Try node ID first, then edge ID, then map value
129                if let Some(node_id) = col.get_node_id(row) {
130                    store
131                        .get_node(node_id)
132                        .and_then(|node| node.get_property(property).cloned())
133                        .unwrap_or(Value::Null)
134                } else if let Some(edge_id) = col.get_edge_id(row) {
135                    store
136                        .get_edge(edge_id)
137                        .and_then(|edge| edge.get_property(property).cloned())
138                        .unwrap_or(Value::Null)
139                } else if let Some(Value::Map(map)) = col.get_value(row) {
140                    let key = PropertyKey::new(property);
141                    map.get(&key).cloned().unwrap_or(Value::Null)
142                } else {
143                    Value::Null
144                }
145            }
146        }
147    }
148}
149
150impl CreateNodeOperator {
151    /// Creates a new node creation operator.
152    ///
153    /// # Arguments
154    /// * `store` - The graph store to modify.
155    /// * `input` - Optional input operator (None for standalone CREATE).
156    /// * `labels` - Labels to assign to created nodes.
157    /// * `properties` - Properties to set on created nodes.
158    /// * `output_schema` - Schema of the output.
159    /// * `output_column` - Column index where the created node ID goes.
160    pub fn new(
161        store: Arc<dyn GraphStoreMut>,
162        input: Option<Box<dyn Operator>>,
163        labels: Vec<String>,
164        properties: Vec<(String, PropertySource)>,
165        output_schema: Vec<LogicalType>,
166        output_column: usize,
167    ) -> Self {
168        Self {
169            store,
170            input,
171            labels,
172            properties,
173            output_schema,
174            output_column,
175            executed: false,
176            viewing_epoch: None,
177            tx_id: None,
178            validator: None,
179        }
180    }
181
182    /// Sets the transaction context for MVCC versioning.
183    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
184        self.viewing_epoch = Some(epoch);
185        self.tx_id = tx_id;
186        self
187    }
188
189    /// Sets the constraint validator for schema enforcement.
190    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
191        self.validator = Some(validator);
192        self
193    }
194}
195
196impl CreateNodeOperator {
197    /// Validates and sets properties on a newly created node.
198    fn validate_and_set_properties(
199        &self,
200        node_id: NodeId,
201        resolved_props: &[(String, Value)],
202    ) -> Result<(), OperatorError> {
203        // Phase 1: Validate each property value
204        if let Some(ref validator) = self.validator {
205            for (name, value) in resolved_props {
206                validator.validate_node_property(&self.labels, name, value)?;
207                validator.check_unique_node_property(&self.labels, name, value)?;
208            }
209            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
210            validator.validate_node_complete(&self.labels, resolved_props)?;
211        }
212
213        // Phase 3: Write properties to the store
214        for (name, value) in resolved_props {
215            self.store.set_node_property(node_id, name, value.clone());
216        }
217        Ok(())
218    }
219}
220
221impl Operator for CreateNodeOperator {
222    fn next(&mut self) -> OperatorResult {
223        // Get transaction context for versioned creation
224        let epoch = self
225            .viewing_epoch
226            .unwrap_or_else(|| self.store.current_epoch());
227        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
228
229        if let Some(ref mut input) = self.input {
230            // For each input row, create a node
231            if let Some(chunk) = input.next()? {
232                let mut builder =
233                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
234
235                for row in chunk.selected_indices() {
236                    // Resolve all property values first (before creating node)
237                    let resolved_props: Vec<(String, Value)> = self
238                        .properties
239                        .iter()
240                        .map(|(name, source)| {
241                            let value =
242                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
243                            (name.clone(), value)
244                        })
245                        .collect();
246
247                    // Create the node with MVCC versioning
248                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
249                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
250
251                    // Validate and set properties
252                    self.validate_and_set_properties(node_id, &resolved_props)?;
253
254                    // Copy input columns to output
255                    for col_idx in 0..chunk.column_count() {
256                        if col_idx < self.output_column
257                            && let (Some(src), Some(dst)) =
258                                (chunk.column(col_idx), builder.column_mut(col_idx))
259                        {
260                            if let Some(val) = src.get_value(row) {
261                                dst.push_value(val);
262                            } else {
263                                dst.push_value(Value::Null);
264                            }
265                        }
266                    }
267
268                    // Add the new node ID
269                    if let Some(dst) = builder.column_mut(self.output_column) {
270                        dst.push_value(Value::Int64(node_id.0 as i64));
271                    }
272
273                    builder.advance_row();
274                }
275
276                return Ok(Some(builder.finish()));
277            }
278            Ok(None)
279        } else {
280            // No input - create a single node
281            if self.executed {
282                return Ok(None);
283            }
284            self.executed = true;
285
286            // Resolve constant properties
287            let resolved_props: Vec<(String, Value)> = self
288                .properties
289                .iter()
290                .filter_map(|(name, source)| {
291                    if let PropertySource::Constant(value) = source {
292                        Some((name.clone(), value.clone()))
293                    } else {
294                        None
295                    }
296                })
297                .collect();
298
299            // Create the node with MVCC versioning
300            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
301            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
302
303            // Validate and set properties
304            self.validate_and_set_properties(node_id, &resolved_props)?;
305
306            // Build output chunk with just the node ID
307            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
308            if let Some(dst) = builder.column_mut(self.output_column) {
309                dst.push_value(Value::Int64(node_id.0 as i64));
310            }
311            builder.advance_row();
312
313            Ok(Some(builder.finish()))
314        }
315    }
316
317    fn reset(&mut self) {
318        if let Some(ref mut input) = self.input {
319            input.reset();
320        }
321        self.executed = false;
322    }
323
324    fn name(&self) -> &'static str {
325        "CreateNode"
326    }
327}
328
329/// Operator that creates new edges.
330pub struct CreateEdgeOperator {
331    /// The graph store to modify.
332    store: Arc<dyn GraphStoreMut>,
333    /// Input operator.
334    input: Box<dyn Operator>,
335    /// Column index for the source node.
336    from_column: usize,
337    /// Column index for the target node.
338    to_column: usize,
339    /// Edge type.
340    edge_type: String,
341    /// Properties to set.
342    properties: Vec<(String, PropertySource)>,
343    /// Output schema.
344    output_schema: Vec<LogicalType>,
345    /// Column index for the created edge variable (if any).
346    output_column: Option<usize>,
347    /// Epoch for MVCC versioning.
348    viewing_epoch: Option<EpochId>,
349    /// Transaction ID for MVCC versioning.
350    tx_id: Option<TxId>,
351    /// Optional constraint validator for schema enforcement.
352    validator: Option<Arc<dyn ConstraintValidator>>,
353}
354
355impl CreateEdgeOperator {
356    /// Creates a new edge creation operator.
357    ///
358    /// Use builder methods to set additional options:
359    /// - [`with_properties`](Self::with_properties) - set edge properties
360    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
361    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
362    pub fn new(
363        store: Arc<dyn GraphStoreMut>,
364        input: Box<dyn Operator>,
365        from_column: usize,
366        to_column: usize,
367        edge_type: String,
368        output_schema: Vec<LogicalType>,
369    ) -> Self {
370        Self {
371            store,
372            input,
373            from_column,
374            to_column,
375            edge_type,
376            properties: Vec::new(),
377            output_schema,
378            output_column: None,
379            viewing_epoch: None,
380            tx_id: None,
381            validator: None,
382        }
383    }
384
385    /// Sets the properties to assign to created edges.
386    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
387        self.properties = properties;
388        self
389    }
390
391    /// Sets the output column for the created edge ID.
392    pub fn with_output_column(mut self, column: usize) -> Self {
393        self.output_column = Some(column);
394        self
395    }
396
397    /// Sets the transaction context for MVCC versioning.
398    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
399        self.viewing_epoch = Some(epoch);
400        self.tx_id = tx_id;
401        self
402    }
403
404    /// Sets the constraint validator for schema enforcement.
405    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
406        self.validator = Some(validator);
407        self
408    }
409}
410
411impl Operator for CreateEdgeOperator {
412    fn next(&mut self) -> OperatorResult {
413        // Get transaction context for versioned creation
414        let epoch = self
415            .viewing_epoch
416            .unwrap_or_else(|| self.store.current_epoch());
417        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
418
419        if let Some(chunk) = self.input.next()? {
420            let mut builder =
421                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
422
423            for row in chunk.selected_indices() {
424                // Get source and target node IDs
425                let from_id = chunk
426                    .column(self.from_column)
427                    .and_then(|c| c.get_value(row))
428                    .ok_or_else(|| {
429                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
430                    })?;
431
432                let to_id = chunk
433                    .column(self.to_column)
434                    .and_then(|c| c.get_value(row))
435                    .ok_or_else(|| {
436                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
437                    })?;
438
439                // Extract node IDs
440                let from_node_id = match from_id {
441                    Value::Int64(id) => NodeId(id as u64),
442                    _ => {
443                        return Err(OperatorError::TypeMismatch {
444                            expected: "Int64 (node ID)".to_string(),
445                            found: format!("{from_id:?}"),
446                        });
447                    }
448                };
449
450                let to_node_id = match to_id {
451                    Value::Int64(id) => NodeId(id as u64),
452                    _ => {
453                        return Err(OperatorError::TypeMismatch {
454                            expected: "Int64 (node ID)".to_string(),
455                            found: format!("{to_id:?}"),
456                        });
457                    }
458                };
459
460                // Resolve property values
461                let resolved_props: Vec<(String, Value)> = self
462                    .properties
463                    .iter()
464                    .map(|(name, source)| {
465                        let value =
466                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
467                        (name.clone(), value)
468                    })
469                    .collect();
470
471                // Validate constraints before writing
472                if let Some(ref validator) = self.validator {
473                    for (name, value) in &resolved_props {
474                        validator.validate_edge_property(&self.edge_type, name, value)?;
475                    }
476                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
477                }
478
479                // Create the edge with MVCC versioning
480                let edge_id = self.store.create_edge_versioned(
481                    from_node_id,
482                    to_node_id,
483                    &self.edge_type,
484                    epoch,
485                    tx,
486                );
487
488                // Set properties
489                for (name, value) in resolved_props {
490                    self.store.set_edge_property(edge_id, &name, value);
491                }
492
493                // Copy input columns
494                for col_idx in 0..chunk.column_count() {
495                    if let (Some(src), Some(dst)) =
496                        (chunk.column(col_idx), builder.column_mut(col_idx))
497                    {
498                        if let Some(val) = src.get_value(row) {
499                            dst.push_value(val);
500                        } else {
501                            dst.push_value(Value::Null);
502                        }
503                    }
504                }
505
506                // Add edge ID if requested
507                if let Some(out_col) = self.output_column
508                    && let Some(dst) = builder.column_mut(out_col)
509                {
510                    dst.push_value(Value::Int64(edge_id.0 as i64));
511                }
512
513                builder.advance_row();
514            }
515
516            return Ok(Some(builder.finish()));
517        }
518        Ok(None)
519    }
520
521    fn reset(&mut self) {
522        self.input.reset();
523    }
524
525    fn name(&self) -> &'static str {
526        "CreateEdge"
527    }
528}
529
530/// Operator that deletes nodes.
531pub struct DeleteNodeOperator {
532    /// The graph store to modify.
533    store: Arc<dyn GraphStoreMut>,
534    /// Input operator.
535    input: Box<dyn Operator>,
536    /// Column index for the node to delete.
537    node_column: usize,
538    /// Output schema.
539    output_schema: Vec<LogicalType>,
540    /// Whether to detach (delete connected edges) before deleting.
541    detach: bool,
542    /// Epoch for MVCC versioning.
543    viewing_epoch: Option<EpochId>,
544    /// Transaction ID for MVCC versioning.
545    tx_id: Option<TxId>,
546}
547
548impl DeleteNodeOperator {
549    /// Creates a new node deletion operator.
550    pub fn new(
551        store: Arc<dyn GraphStoreMut>,
552        input: Box<dyn Operator>,
553        node_column: usize,
554        output_schema: Vec<LogicalType>,
555        detach: bool,
556    ) -> Self {
557        Self {
558            store,
559            input,
560            node_column,
561            output_schema,
562            detach,
563            viewing_epoch: None,
564            tx_id: None,
565        }
566    }
567
568    /// Sets the transaction context for MVCC versioning.
569    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
570        self.viewing_epoch = Some(epoch);
571        self.tx_id = tx_id;
572        self
573    }
574}
575
576impl Operator for DeleteNodeOperator {
577    fn next(&mut self) -> OperatorResult {
578        // Get transaction context for versioned deletion
579        let epoch = self
580            .viewing_epoch
581            .unwrap_or_else(|| self.store.current_epoch());
582        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
583
584        if let Some(chunk) = self.input.next()? {
585            let mut deleted_count = 0;
586
587            for row in chunk.selected_indices() {
588                let node_val = chunk
589                    .column(self.node_column)
590                    .and_then(|c| c.get_value(row))
591                    .ok_or_else(|| {
592                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
593                    })?;
594
595                let node_id = match node_val {
596                    Value::Int64(id) => NodeId(id as u64),
597                    _ => {
598                        return Err(OperatorError::TypeMismatch {
599                            expected: "Int64 (node ID)".to_string(),
600                            found: format!("{node_val:?}"),
601                        });
602                    }
603                };
604
605                if self.detach {
606                    // Delete all connected edges first
607                    self.store.delete_node_edges(node_id);
608                } else {
609                    // NODETACH: check that node has no connected edges
610                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
611                    if degree > 0 {
612                        return Err(OperatorError::ConstraintViolation(format!(
613                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
614                            degree
615                        )));
616                    }
617                }
618
619                // Delete the node with MVCC versioning
620                if self.store.delete_node_versioned(node_id, epoch, tx) {
621                    deleted_count += 1;
622                }
623            }
624
625            // Return a chunk with the delete count
626            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627            if let Some(dst) = builder.column_mut(0) {
628                dst.push_value(Value::Int64(deleted_count));
629            }
630            builder.advance_row();
631
632            return Ok(Some(builder.finish()));
633        }
634        Ok(None)
635    }
636
637    fn reset(&mut self) {
638        self.input.reset();
639    }
640
641    fn name(&self) -> &'static str {
642        "DeleteNode"
643    }
644}
645
646/// Operator that deletes edges.
647pub struct DeleteEdgeOperator {
648    /// The graph store to modify.
649    store: Arc<dyn GraphStoreMut>,
650    /// Input operator.
651    input: Box<dyn Operator>,
652    /// Column index for the edge to delete.
653    edge_column: usize,
654    /// Output schema.
655    output_schema: Vec<LogicalType>,
656    /// Epoch for MVCC versioning.
657    viewing_epoch: Option<EpochId>,
658    /// Transaction ID for MVCC versioning.
659    tx_id: Option<TxId>,
660}
661
662impl DeleteEdgeOperator {
663    /// Creates a new edge deletion operator.
664    pub fn new(
665        store: Arc<dyn GraphStoreMut>,
666        input: Box<dyn Operator>,
667        edge_column: usize,
668        output_schema: Vec<LogicalType>,
669    ) -> Self {
670        Self {
671            store,
672            input,
673            edge_column,
674            output_schema,
675            viewing_epoch: None,
676            tx_id: None,
677        }
678    }
679
680    /// Sets the transaction context for MVCC versioning.
681    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
682        self.viewing_epoch = Some(epoch);
683        self.tx_id = tx_id;
684        self
685    }
686}
687
688impl Operator for DeleteEdgeOperator {
689    fn next(&mut self) -> OperatorResult {
690        // Get transaction context for versioned deletion
691        let epoch = self
692            .viewing_epoch
693            .unwrap_or_else(|| self.store.current_epoch());
694        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
695
696        if let Some(chunk) = self.input.next()? {
697            let mut deleted_count = 0;
698
699            for row in chunk.selected_indices() {
700                let edge_val = chunk
701                    .column(self.edge_column)
702                    .and_then(|c| c.get_value(row))
703                    .ok_or_else(|| {
704                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
705                    })?;
706
707                let edge_id = match edge_val {
708                    Value::Int64(id) => EdgeId(id as u64),
709                    _ => {
710                        return Err(OperatorError::TypeMismatch {
711                            expected: "Int64 (edge ID)".to_string(),
712                            found: format!("{edge_val:?}"),
713                        });
714                    }
715                };
716
717                // Delete the edge with MVCC versioning
718                if self.store.delete_edge_versioned(edge_id, epoch, tx) {
719                    deleted_count += 1;
720                }
721            }
722
723            // Return a chunk with the delete count
724            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725            if let Some(dst) = builder.column_mut(0) {
726                dst.push_value(Value::Int64(deleted_count));
727            }
728            builder.advance_row();
729
730            return Ok(Some(builder.finish()));
731        }
732        Ok(None)
733    }
734
735    fn reset(&mut self) {
736        self.input.reset();
737    }
738
739    fn name(&self) -> &'static str {
740        "DeleteEdge"
741    }
742}
743
744/// Operator that adds labels to nodes.
745pub struct AddLabelOperator {
746    /// The graph store.
747    store: Arc<dyn GraphStoreMut>,
748    /// Child operator providing nodes.
749    input: Box<dyn Operator>,
750    /// Column index containing node IDs.
751    node_column: usize,
752    /// Labels to add.
753    labels: Vec<String>,
754    /// Output schema.
755    output_schema: Vec<LogicalType>,
756}
757
758impl AddLabelOperator {
759    /// Creates a new add label operator.
760    pub fn new(
761        store: Arc<dyn GraphStoreMut>,
762        input: Box<dyn Operator>,
763        node_column: usize,
764        labels: Vec<String>,
765        output_schema: Vec<LogicalType>,
766    ) -> Self {
767        Self {
768            store,
769            input,
770            node_column,
771            labels,
772            output_schema,
773        }
774    }
775}
776
777impl Operator for AddLabelOperator {
778    fn next(&mut self) -> OperatorResult {
779        if let Some(chunk) = self.input.next()? {
780            let mut updated_count = 0;
781
782            for row in chunk.selected_indices() {
783                let node_val = chunk
784                    .column(self.node_column)
785                    .and_then(|c| c.get_value(row))
786                    .ok_or_else(|| {
787                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
788                    })?;
789
790                let node_id = match node_val {
791                    Value::Int64(id) => NodeId(id as u64),
792                    _ => {
793                        return Err(OperatorError::TypeMismatch {
794                            expected: "Int64 (node ID)".to_string(),
795                            found: format!("{node_val:?}"),
796                        });
797                    }
798                };
799
800                // Add all labels
801                for label in &self.labels {
802                    if self.store.add_label(node_id, label) {
803                        updated_count += 1;
804                    }
805                }
806            }
807
808            // Return a chunk with the update count
809            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
810            if let Some(dst) = builder.column_mut(0) {
811                dst.push_value(Value::Int64(updated_count));
812            }
813            builder.advance_row();
814
815            return Ok(Some(builder.finish()));
816        }
817        Ok(None)
818    }
819
820    fn reset(&mut self) {
821        self.input.reset();
822    }
823
824    fn name(&self) -> &'static str {
825        "AddLabel"
826    }
827}
828
829/// Operator that removes labels from nodes.
830pub struct RemoveLabelOperator {
831    /// The graph store.
832    store: Arc<dyn GraphStoreMut>,
833    /// Child operator providing nodes.
834    input: Box<dyn Operator>,
835    /// Column index containing node IDs.
836    node_column: usize,
837    /// Labels to remove.
838    labels: Vec<String>,
839    /// Output schema.
840    output_schema: Vec<LogicalType>,
841}
842
843impl RemoveLabelOperator {
844    /// Creates a new remove label operator.
845    pub fn new(
846        store: Arc<dyn GraphStoreMut>,
847        input: Box<dyn Operator>,
848        node_column: usize,
849        labels: Vec<String>,
850        output_schema: Vec<LogicalType>,
851    ) -> Self {
852        Self {
853            store,
854            input,
855            node_column,
856            labels,
857            output_schema,
858        }
859    }
860}
861
862impl Operator for RemoveLabelOperator {
863    fn next(&mut self) -> OperatorResult {
864        if let Some(chunk) = self.input.next()? {
865            let mut updated_count = 0;
866
867            for row in chunk.selected_indices() {
868                let node_val = chunk
869                    .column(self.node_column)
870                    .and_then(|c| c.get_value(row))
871                    .ok_or_else(|| {
872                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
873                    })?;
874
875                let node_id = match node_val {
876                    Value::Int64(id) => NodeId(id as u64),
877                    _ => {
878                        return Err(OperatorError::TypeMismatch {
879                            expected: "Int64 (node ID)".to_string(),
880                            found: format!("{node_val:?}"),
881                        });
882                    }
883                };
884
885                // Remove all labels
886                for label in &self.labels {
887                    if self.store.remove_label(node_id, label) {
888                        updated_count += 1;
889                    }
890                }
891            }
892
893            // Return a chunk with the update count
894            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
895            if let Some(dst) = builder.column_mut(0) {
896                dst.push_value(Value::Int64(updated_count));
897            }
898            builder.advance_row();
899
900            return Ok(Some(builder.finish()));
901        }
902        Ok(None)
903    }
904
905    fn reset(&mut self) {
906        self.input.reset();
907    }
908
909    fn name(&self) -> &'static str {
910        "RemoveLabel"
911    }
912}
913
914/// Operator that sets properties on nodes or edges.
915///
916/// This operator reads node/edge IDs from a column and sets the
917/// specified properties on each entity.
918pub struct SetPropertyOperator {
919    /// The graph store.
920    store: Arc<dyn GraphStoreMut>,
921    /// Child operator providing entities.
922    input: Box<dyn Operator>,
923    /// Column index containing entity IDs (node or edge).
924    entity_column: usize,
925    /// Whether the entity is an edge (false = node).
926    is_edge: bool,
927    /// Properties to set (name -> source).
928    properties: Vec<(String, PropertySource)>,
929    /// Output schema.
930    output_schema: Vec<LogicalType>,
931    /// Whether to replace all properties (true) or merge (false) for map assignments.
932    replace: bool,
933    /// Optional constraint validator for schema enforcement.
934    validator: Option<Arc<dyn ConstraintValidator>>,
935    /// Entity labels (for node constraint validation).
936    labels: Vec<String>,
937    /// Edge type (for edge constraint validation).
938    edge_type_name: Option<String>,
939}
940
941impl SetPropertyOperator {
942    /// Creates a new set property operator for nodes.
943    pub fn new_for_node(
944        store: Arc<dyn GraphStoreMut>,
945        input: Box<dyn Operator>,
946        node_column: usize,
947        properties: Vec<(String, PropertySource)>,
948        output_schema: Vec<LogicalType>,
949    ) -> Self {
950        Self {
951            store,
952            input,
953            entity_column: node_column,
954            is_edge: false,
955            properties,
956            output_schema,
957            replace: false,
958            validator: None,
959            labels: Vec::new(),
960            edge_type_name: None,
961        }
962    }
963
964    /// Creates a new set property operator for edges.
965    pub fn new_for_edge(
966        store: Arc<dyn GraphStoreMut>,
967        input: Box<dyn Operator>,
968        edge_column: usize,
969        properties: Vec<(String, PropertySource)>,
970        output_schema: Vec<LogicalType>,
971    ) -> Self {
972        Self {
973            store,
974            input,
975            entity_column: edge_column,
976            is_edge: true,
977            properties,
978            output_schema,
979            replace: false,
980            validator: None,
981            labels: Vec::new(),
982            edge_type_name: None,
983        }
984    }
985
986    /// Sets whether this operator replaces all properties (for map assignment).
987    pub fn with_replace(mut self, replace: bool) -> Self {
988        self.replace = replace;
989        self
990    }
991
992    /// Sets the constraint validator for schema enforcement.
993    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
994        self.validator = Some(validator);
995        self
996    }
997
998    /// Sets the entity labels (for node constraint validation).
999    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1000        self.labels = labels;
1001        self
1002    }
1003
1004    /// Sets the edge type name (for edge constraint validation).
1005    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1006        self.edge_type_name = Some(edge_type);
1007        self
1008    }
1009}
1010
1011impl Operator for SetPropertyOperator {
1012    fn next(&mut self) -> OperatorResult {
1013        if let Some(chunk) = self.input.next()? {
1014            let mut builder =
1015                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1016
1017            for row in chunk.selected_indices() {
1018                let entity_val = chunk
1019                    .column(self.entity_column)
1020                    .and_then(|c| c.get_value(row))
1021                    .ok_or_else(|| {
1022                        OperatorError::ColumnNotFound(format!(
1023                            "entity column {}",
1024                            self.entity_column
1025                        ))
1026                    })?;
1027
1028                let entity_id = match entity_val {
1029                    Value::Int64(id) => id as u64,
1030                    _ => {
1031                        return Err(OperatorError::TypeMismatch {
1032                            expected: "Int64 (entity ID)".to_string(),
1033                            found: format!("{entity_val:?}"),
1034                        });
1035                    }
1036                };
1037
1038                // Resolve all property values
1039                let resolved_props: Vec<(String, Value)> = self
1040                    .properties
1041                    .iter()
1042                    .map(|(name, source)| {
1043                        let value =
1044                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1045                        (name.clone(), value)
1046                    })
1047                    .collect();
1048
1049                // Validate constraints before writing
1050                if let Some(ref validator) = self.validator {
1051                    if self.is_edge {
1052                        if let Some(ref et) = self.edge_type_name {
1053                            for (name, value) in &resolved_props {
1054                                validator.validate_edge_property(et, name, value)?;
1055                            }
1056                        }
1057                    } else {
1058                        for (name, value) in &resolved_props {
1059                            validator.validate_node_property(&self.labels, name, value)?;
1060                            validator.check_unique_node_property(&self.labels, name, value)?;
1061                        }
1062                    }
1063                }
1064
1065                // Write all properties
1066                for (prop_name, value) in resolved_props {
1067                    if prop_name == "*" {
1068                        // Map assignment: value should be a Map
1069                        if let Value::Map(map) = value {
1070                            if self.replace {
1071                                // Replace: remove all existing properties first
1072                                if self.is_edge {
1073                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1074                                        let keys: Vec<String> = edge
1075                                            .properties
1076                                            .iter()
1077                                            .map(|(k, _)| k.as_str().to_string())
1078                                            .collect();
1079                                        for key in keys {
1080                                            self.store
1081                                                .remove_edge_property(EdgeId(entity_id), &key);
1082                                        }
1083                                    }
1084                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1085                                    let keys: Vec<String> = node
1086                                        .properties
1087                                        .iter()
1088                                        .map(|(k, _)| k.as_str().to_string())
1089                                        .collect();
1090                                    for key in keys {
1091                                        self.store.remove_node_property(NodeId(entity_id), &key);
1092                                    }
1093                                }
1094                            }
1095                            // Set each map entry
1096                            for (key, val) in map.iter() {
1097                                if self.is_edge {
1098                                    self.store.set_edge_property(
1099                                        EdgeId(entity_id),
1100                                        key.as_str(),
1101                                        val.clone(),
1102                                    );
1103                                } else {
1104                                    self.store.set_node_property(
1105                                        NodeId(entity_id),
1106                                        key.as_str(),
1107                                        val.clone(),
1108                                    );
1109                                }
1110                            }
1111                        }
1112                    } else if self.is_edge {
1113                        self.store
1114                            .set_edge_property(EdgeId(entity_id), &prop_name, value);
1115                    } else {
1116                        self.store
1117                            .set_node_property(NodeId(entity_id), &prop_name, value);
1118                    }
1119                }
1120
1121                // Copy input columns to output
1122                for col_idx in 0..chunk.column_count() {
1123                    if let (Some(src), Some(dst)) =
1124                        (chunk.column(col_idx), builder.column_mut(col_idx))
1125                    {
1126                        if let Some(val) = src.get_value(row) {
1127                            dst.push_value(val);
1128                        } else {
1129                            dst.push_value(Value::Null);
1130                        }
1131                    }
1132                }
1133
1134                builder.advance_row();
1135            }
1136
1137            return Ok(Some(builder.finish()));
1138        }
1139        Ok(None)
1140    }
1141
1142    fn reset(&mut self) {
1143        self.input.reset();
1144    }
1145
1146    fn name(&self) -> &'static str {
1147        "SetProperty"
1148    }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154    use crate::execution::DataChunk;
1155    use crate::execution::chunk::DataChunkBuilder;
1156    use crate::graph::lpg::LpgStore;
1157
1158    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1159        Arc::new(LpgStore::new())
1160    }
1161
1162    #[test]
1163    fn test_create_node_standalone() {
1164        let store = create_test_store();
1165
1166        let mut op = CreateNodeOperator::new(
1167            Arc::clone(&store),
1168            None,
1169            vec!["Person".to_string()],
1170            vec![(
1171                "name".to_string(),
1172                PropertySource::Constant(Value::String("Alice".into())),
1173            )],
1174            vec![LogicalType::Int64],
1175            0,
1176        );
1177
1178        // First call should create a node
1179        let chunk = op.next().unwrap().unwrap();
1180        assert_eq!(chunk.row_count(), 1);
1181
1182        // Second call should return None
1183        assert!(op.next().unwrap().is_none());
1184
1185        // Verify node was created
1186        assert_eq!(store.node_count(), 1);
1187    }
1188
1189    #[test]
1190    fn test_create_edge() {
1191        let store = create_test_store();
1192
1193        // Create two nodes first
1194        let node1 = store.create_node(&["Person"]);
1195        let node2 = store.create_node(&["Person"]);
1196
1197        // Create input chunk with node IDs
1198        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1199        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1200        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1201        builder.advance_row();
1202        let input_chunk = builder.finish();
1203
1204        // Mock input operator
1205        struct MockInput {
1206            chunk: Option<DataChunk>,
1207        }
1208        impl Operator for MockInput {
1209            fn next(&mut self) -> OperatorResult {
1210                Ok(self.chunk.take())
1211            }
1212            fn reset(&mut self) {}
1213            fn name(&self) -> &'static str {
1214                "MockInput"
1215            }
1216        }
1217
1218        let mut op = CreateEdgeOperator::new(
1219            Arc::clone(&store),
1220            Box::new(MockInput {
1221                chunk: Some(input_chunk),
1222            }),
1223            0, // from column
1224            1, // to column
1225            "KNOWS".to_string(),
1226            vec![LogicalType::Int64, LogicalType::Int64],
1227        );
1228
1229        // Execute
1230        let _chunk = op.next().unwrap().unwrap();
1231
1232        // Verify edge was created
1233        assert_eq!(store.edge_count(), 1);
1234    }
1235
1236    #[test]
1237    fn test_delete_node() {
1238        let store = create_test_store();
1239
1240        // Create a node
1241        let node_id = store.create_node(&["Person"]);
1242        assert_eq!(store.node_count(), 1);
1243
1244        // Create input chunk with the node ID
1245        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1246        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
1247        builder.advance_row();
1248        let input_chunk = builder.finish();
1249
1250        struct MockInput {
1251            chunk: Option<DataChunk>,
1252        }
1253        impl Operator for MockInput {
1254            fn next(&mut self) -> OperatorResult {
1255                Ok(self.chunk.take())
1256            }
1257            fn reset(&mut self) {}
1258            fn name(&self) -> &'static str {
1259                "MockInput"
1260            }
1261        }
1262
1263        let mut op = DeleteNodeOperator::new(
1264            Arc::clone(&store),
1265            Box::new(MockInput {
1266                chunk: Some(input_chunk),
1267            }),
1268            0,
1269            vec![LogicalType::Int64],
1270            false,
1271        );
1272
1273        // Execute
1274        let chunk = op.next().unwrap().unwrap();
1275
1276        // Verify deletion
1277        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1278        assert_eq!(deleted, 1);
1279        assert_eq!(store.node_count(), 0);
1280    }
1281
1282    // ── Helper: reusable MockInput ───────────────────────────────
1283
1284    struct MockInput {
1285        chunk: Option<DataChunk>,
1286    }
1287
1288    impl MockInput {
1289        fn boxed(chunk: DataChunk) -> Box<Self> {
1290            Box::new(Self { chunk: Some(chunk) })
1291        }
1292    }
1293
1294    impl Operator for MockInput {
1295        fn next(&mut self) -> OperatorResult {
1296            Ok(self.chunk.take())
1297        }
1298        fn reset(&mut self) {}
1299        fn name(&self) -> &'static str {
1300            "MockInput"
1301        }
1302    }
1303
1304    // ── DeleteEdgeOperator ───────────────────────────────────────
1305
1306    #[test]
1307    fn test_delete_edge() {
1308        let store = create_test_store();
1309
1310        let n1 = store.create_node(&["Person"]);
1311        let n2 = store.create_node(&["Person"]);
1312        let eid = store.create_edge(n1, n2, "KNOWS");
1313        assert_eq!(store.edge_count(), 1);
1314
1315        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1316        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1317        builder.advance_row();
1318
1319        let mut op = DeleteEdgeOperator::new(
1320            Arc::clone(&store),
1321            MockInput::boxed(builder.finish()),
1322            0,
1323            vec![LogicalType::Int64],
1324        );
1325
1326        let chunk = op.next().unwrap().unwrap();
1327        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1328        assert_eq!(deleted, 1);
1329        assert_eq!(store.edge_count(), 0);
1330    }
1331
1332    #[test]
1333    fn test_delete_edge_no_input_returns_none() {
1334        let store = create_test_store();
1335
1336        // Empty chunk: MockInput returns None immediately
1337        struct EmptyInput;
1338        impl Operator for EmptyInput {
1339            fn next(&mut self) -> OperatorResult {
1340                Ok(None)
1341            }
1342            fn reset(&mut self) {}
1343            fn name(&self) -> &'static str {
1344                "EmptyInput"
1345            }
1346        }
1347
1348        let mut op = DeleteEdgeOperator::new(
1349            Arc::clone(&store),
1350            Box::new(EmptyInput),
1351            0,
1352            vec![LogicalType::Int64],
1353        );
1354
1355        assert!(op.next().unwrap().is_none());
1356    }
1357
1358    #[test]
1359    fn test_delete_multiple_edges() {
1360        let store = create_test_store();
1361
1362        let n1 = store.create_node(&["N"]);
1363        let n2 = store.create_node(&["N"]);
1364        let e1 = store.create_edge(n1, n2, "R");
1365        let e2 = store.create_edge(n2, n1, "S");
1366        assert_eq!(store.edge_count(), 2);
1367
1368        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1369        builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1370        builder.advance_row();
1371        builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1372        builder.advance_row();
1373
1374        let mut op = DeleteEdgeOperator::new(
1375            Arc::clone(&store),
1376            MockInput::boxed(builder.finish()),
1377            0,
1378            vec![LogicalType::Int64],
1379        );
1380
1381        let chunk = op.next().unwrap().unwrap();
1382        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1383        assert_eq!(deleted, 2);
1384        assert_eq!(store.edge_count(), 0);
1385    }
1386
1387    // ── DeleteNodeOperator with DETACH ───────────────────────────
1388
1389    #[test]
1390    fn test_delete_node_detach() {
1391        let store = create_test_store();
1392
1393        let n1 = store.create_node(&["Person"]);
1394        let n2 = store.create_node(&["Person"]);
1395        store.create_edge(n1, n2, "KNOWS");
1396        store.create_edge(n2, n1, "FOLLOWS");
1397        assert_eq!(store.edge_count(), 2);
1398
1399        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1400        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1401        builder.advance_row();
1402
1403        let mut op = DeleteNodeOperator::new(
1404            Arc::clone(&store),
1405            MockInput::boxed(builder.finish()),
1406            0,
1407            vec![LogicalType::Int64],
1408            true, // detach = true
1409        );
1410
1411        let chunk = op.next().unwrap().unwrap();
1412        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1413        assert_eq!(deleted, 1);
1414        assert_eq!(store.node_count(), 1);
1415        assert_eq!(store.edge_count(), 0); // edges detached
1416    }
1417
1418    // ── AddLabelOperator ─────────────────────────────────────────
1419
1420    #[test]
1421    fn test_add_label() {
1422        let store = create_test_store();
1423
1424        let node = store.create_node(&["Person"]);
1425
1426        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1427        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1428        builder.advance_row();
1429
1430        let mut op = AddLabelOperator::new(
1431            Arc::clone(&store),
1432            MockInput::boxed(builder.finish()),
1433            0,
1434            vec!["Employee".to_string()],
1435            vec![LogicalType::Int64],
1436        );
1437
1438        let chunk = op.next().unwrap().unwrap();
1439        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1440        assert_eq!(updated, 1);
1441
1442        // Verify label was added
1443        let node_data = store.get_node(node).unwrap();
1444        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1445        assert!(labels.contains(&"Person"));
1446        assert!(labels.contains(&"Employee"));
1447    }
1448
1449    #[test]
1450    fn test_add_multiple_labels() {
1451        let store = create_test_store();
1452
1453        let node = store.create_node(&["Base"]);
1454
1455        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1456        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1457        builder.advance_row();
1458
1459        let mut op = AddLabelOperator::new(
1460            Arc::clone(&store),
1461            MockInput::boxed(builder.finish()),
1462            0,
1463            vec!["LabelA".to_string(), "LabelB".to_string()],
1464            vec![LogicalType::Int64],
1465        );
1466
1467        let chunk = op.next().unwrap().unwrap();
1468        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1469        assert_eq!(updated, 2); // 2 labels added
1470
1471        let node_data = store.get_node(node).unwrap();
1472        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1473        assert!(labels.contains(&"LabelA"));
1474        assert!(labels.contains(&"LabelB"));
1475    }
1476
1477    #[test]
1478    fn test_add_label_no_input_returns_none() {
1479        let store = create_test_store();
1480
1481        struct EmptyInput;
1482        impl Operator for EmptyInput {
1483            fn next(&mut self) -> OperatorResult {
1484                Ok(None)
1485            }
1486            fn reset(&mut self) {}
1487            fn name(&self) -> &'static str {
1488                "EmptyInput"
1489            }
1490        }
1491
1492        let mut op = AddLabelOperator::new(
1493            Arc::clone(&store),
1494            Box::new(EmptyInput),
1495            0,
1496            vec!["Foo".to_string()],
1497            vec![LogicalType::Int64],
1498        );
1499
1500        assert!(op.next().unwrap().is_none());
1501    }
1502
1503    // ── RemoveLabelOperator ──────────────────────────────────────
1504
1505    #[test]
1506    fn test_remove_label() {
1507        let store = create_test_store();
1508
1509        let node = store.create_node(&["Person", "Employee"]);
1510
1511        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1512        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1513        builder.advance_row();
1514
1515        let mut op = RemoveLabelOperator::new(
1516            Arc::clone(&store),
1517            MockInput::boxed(builder.finish()),
1518            0,
1519            vec!["Employee".to_string()],
1520            vec![LogicalType::Int64],
1521        );
1522
1523        let chunk = op.next().unwrap().unwrap();
1524        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1525        assert_eq!(updated, 1);
1526
1527        // Verify label was removed
1528        let node_data = store.get_node(node).unwrap();
1529        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1530        assert!(labels.contains(&"Person"));
1531        assert!(!labels.contains(&"Employee"));
1532    }
1533
1534    #[test]
1535    fn test_remove_nonexistent_label() {
1536        let store = create_test_store();
1537
1538        let node = store.create_node(&["Person"]);
1539
1540        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1541        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1542        builder.advance_row();
1543
1544        let mut op = RemoveLabelOperator::new(
1545            Arc::clone(&store),
1546            MockInput::boxed(builder.finish()),
1547            0,
1548            vec!["NonExistent".to_string()],
1549            vec![LogicalType::Int64],
1550        );
1551
1552        let chunk = op.next().unwrap().unwrap();
1553        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1554        assert_eq!(updated, 0); // nothing removed
1555    }
1556
1557    // ── SetPropertyOperator ──────────────────────────────────────
1558
1559    #[test]
1560    fn test_set_node_property_constant() {
1561        let store = create_test_store();
1562
1563        let node = store.create_node(&["Person"]);
1564
1565        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1566        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1567        builder.advance_row();
1568
1569        let mut op = SetPropertyOperator::new_for_node(
1570            Arc::clone(&store),
1571            MockInput::boxed(builder.finish()),
1572            0,
1573            vec![(
1574                "name".to_string(),
1575                PropertySource::Constant(Value::String("Alice".into())),
1576            )],
1577            vec![LogicalType::Int64],
1578        );
1579
1580        let chunk = op.next().unwrap().unwrap();
1581        assert_eq!(chunk.row_count(), 1);
1582
1583        // Verify property was set
1584        let node_data = store.get_node(node).unwrap();
1585        assert_eq!(
1586            node_data
1587                .properties
1588                .get(&grafeo_common::types::PropertyKey::new("name")),
1589            Some(&Value::String("Alice".into()))
1590        );
1591    }
1592
1593    #[test]
1594    fn test_set_node_property_from_column() {
1595        let store = create_test_store();
1596
1597        let node = store.create_node(&["Person"]);
1598
1599        // Input: column 0 = node ID, column 1 = property value
1600        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1601        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1602        builder
1603            .column_mut(1)
1604            .unwrap()
1605            .push_value(Value::String("Bob".into()));
1606        builder.advance_row();
1607
1608        let mut op = SetPropertyOperator::new_for_node(
1609            Arc::clone(&store),
1610            MockInput::boxed(builder.finish()),
1611            0,
1612            vec![("name".to_string(), PropertySource::Column(1))],
1613            vec![LogicalType::Int64, LogicalType::String],
1614        );
1615
1616        let chunk = op.next().unwrap().unwrap();
1617        assert_eq!(chunk.row_count(), 1);
1618
1619        let node_data = store.get_node(node).unwrap();
1620        assert_eq!(
1621            node_data
1622                .properties
1623                .get(&grafeo_common::types::PropertyKey::new("name")),
1624            Some(&Value::String("Bob".into()))
1625        );
1626    }
1627
1628    #[test]
1629    fn test_set_edge_property() {
1630        let store = create_test_store();
1631
1632        let n1 = store.create_node(&["N"]);
1633        let n2 = store.create_node(&["N"]);
1634        let eid = store.create_edge(n1, n2, "KNOWS");
1635
1636        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1637        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1638        builder.advance_row();
1639
1640        let mut op = SetPropertyOperator::new_for_edge(
1641            Arc::clone(&store),
1642            MockInput::boxed(builder.finish()),
1643            0,
1644            vec![(
1645                "weight".to_string(),
1646                PropertySource::Constant(Value::Float64(0.75)),
1647            )],
1648            vec![LogicalType::Int64],
1649        );
1650
1651        let chunk = op.next().unwrap().unwrap();
1652        assert_eq!(chunk.row_count(), 1);
1653
1654        let edge_data = store.get_edge(eid).unwrap();
1655        assert_eq!(
1656            edge_data
1657                .properties
1658                .get(&grafeo_common::types::PropertyKey::new("weight")),
1659            Some(&Value::Float64(0.75))
1660        );
1661    }
1662
1663    #[test]
1664    fn test_set_multiple_properties() {
1665        let store = create_test_store();
1666
1667        let node = store.create_node(&["Person"]);
1668
1669        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1670        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1671        builder.advance_row();
1672
1673        let mut op = SetPropertyOperator::new_for_node(
1674            Arc::clone(&store),
1675            MockInput::boxed(builder.finish()),
1676            0,
1677            vec![
1678                (
1679                    "name".to_string(),
1680                    PropertySource::Constant(Value::String("Alice".into())),
1681                ),
1682                (
1683                    "age".to_string(),
1684                    PropertySource::Constant(Value::Int64(30)),
1685                ),
1686            ],
1687            vec![LogicalType::Int64],
1688        );
1689
1690        op.next().unwrap().unwrap();
1691
1692        let node_data = store.get_node(node).unwrap();
1693        assert_eq!(
1694            node_data
1695                .properties
1696                .get(&grafeo_common::types::PropertyKey::new("name")),
1697            Some(&Value::String("Alice".into()))
1698        );
1699        assert_eq!(
1700            node_data
1701                .properties
1702                .get(&grafeo_common::types::PropertyKey::new("age")),
1703            Some(&Value::Int64(30))
1704        );
1705    }
1706
1707    #[test]
1708    fn test_set_property_no_input_returns_none() {
1709        let store = create_test_store();
1710
1711        struct EmptyInput;
1712        impl Operator for EmptyInput {
1713            fn next(&mut self) -> OperatorResult {
1714                Ok(None)
1715            }
1716            fn reset(&mut self) {}
1717            fn name(&self) -> &'static str {
1718                "EmptyInput"
1719            }
1720        }
1721
1722        let mut op = SetPropertyOperator::new_for_node(
1723            Arc::clone(&store),
1724            Box::new(EmptyInput),
1725            0,
1726            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1727            vec![LogicalType::Int64],
1728        );
1729
1730        assert!(op.next().unwrap().is_none());
1731    }
1732
1733    // ── Operator name() ──────────────────────────────────────────
1734
1735    #[test]
1736    fn test_operator_names() {
1737        let store = create_test_store();
1738
1739        struct EmptyInput;
1740        impl Operator for EmptyInput {
1741            fn next(&mut self) -> OperatorResult {
1742                Ok(None)
1743            }
1744            fn reset(&mut self) {}
1745            fn name(&self) -> &'static str {
1746                "EmptyInput"
1747            }
1748        }
1749
1750        let op = DeleteEdgeOperator::new(
1751            Arc::clone(&store),
1752            Box::new(EmptyInput),
1753            0,
1754            vec![LogicalType::Int64],
1755        );
1756        assert_eq!(op.name(), "DeleteEdge");
1757
1758        let op = AddLabelOperator::new(
1759            Arc::clone(&store),
1760            Box::new(EmptyInput),
1761            0,
1762            vec!["L".to_string()],
1763            vec![LogicalType::Int64],
1764        );
1765        assert_eq!(op.name(), "AddLabel");
1766
1767        let op = RemoveLabelOperator::new(
1768            Arc::clone(&store),
1769            Box::new(EmptyInput),
1770            0,
1771            vec!["L".to_string()],
1772            vec![LogicalType::Int64],
1773        );
1774        assert_eq!(op.name(), "RemoveLabel");
1775
1776        let op = SetPropertyOperator::new_for_node(
1777            Arc::clone(&store),
1778            Box::new(EmptyInput),
1779            0,
1780            vec![],
1781            vec![LogicalType::Int64],
1782        );
1783        assert_eq!(op.name(), "SetProperty");
1784    }
1785}