Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17/// Trait for validating schema constraints during mutation operations.
18///
19/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
20/// before data is written to the store.
21pub trait ConstraintValidator: Send + Sync {
22    /// Validates a single property value for a node with the given labels.
23    ///
24    /// Checks type compatibility and NOT NULL constraints.
25    fn validate_node_property(
26        &self,
27        labels: &[String],
28        key: &str,
29        value: &Value,
30    ) -> Result<(), OperatorError>;
31
32    /// Validates that all required properties are present after creating a node.
33    ///
34    /// Checks NOT NULL constraints for properties that were not explicitly set.
35    fn validate_node_complete(
36        &self,
37        labels: &[String],
38        properties: &[(String, Value)],
39    ) -> Result<(), OperatorError>;
40
41    /// Checks UNIQUE constraint for a node property value.
42    ///
43    /// Returns an error if a node with the same label already has this value.
44    fn check_unique_node_property(
45        &self,
46        labels: &[String],
47        key: &str,
48        value: &Value,
49    ) -> Result<(), OperatorError>;
50
51    /// Validates a single property value for an edge of the given type.
52    fn validate_edge_property(
53        &self,
54        edge_type: &str,
55        key: &str,
56        value: &Value,
57    ) -> Result<(), OperatorError>;
58
59    /// Validates that all required properties are present after creating an edge.
60    fn validate_edge_complete(
61        &self,
62        edge_type: &str,
63        properties: &[(String, Value)],
64    ) -> Result<(), OperatorError>;
65}
66
67/// Operator that creates new nodes.
68///
69/// For each input row, creates a new node with the specified labels
70/// and properties, then outputs the row with the new node.
71pub struct CreateNodeOperator {
72    /// The graph store to modify.
73    store: Arc<dyn GraphStoreMut>,
74    /// Input operator.
75    input: Option<Box<dyn Operator>>,
76    /// Labels for the new nodes.
77    labels: Vec<String>,
78    /// Properties to set (name -> column index or constant value).
79    properties: Vec<(String, PropertySource)>,
80    /// Output schema.
81    output_schema: Vec<LogicalType>,
82    /// Column index for the created node variable.
83    output_column: usize,
84    /// Whether this operator has been executed (for no-input case).
85    executed: bool,
86    /// Epoch for MVCC versioning.
87    viewing_epoch: Option<EpochId>,
88    /// Transaction ID for MVCC versioning.
89    tx_id: Option<TxId>,
90    /// Optional constraint validator for schema enforcement.
91    validator: Option<Arc<dyn ConstraintValidator>>,
92}
93
94/// Source for a property value.
95#[derive(Debug, Clone)]
96pub enum PropertySource {
97    /// Get value from an input column.
98    Column(usize),
99    /// Use a constant value.
100    Constant(Value),
101    /// Access a named property from a map/node/edge in an input column.
102    PropertyAccess {
103        /// The column containing the map, node ID, or edge ID.
104        column: usize,
105        /// The property name to extract.
106        property: String,
107    },
108}
109
110impl PropertySource {
111    /// Resolves a property value from a data chunk row.
112    pub fn resolve(
113        &self,
114        chunk: &crate::execution::chunk::DataChunk,
115        row: usize,
116        store: &dyn GraphStore,
117    ) -> Value {
118        match self {
119            PropertySource::Column(col_idx) => chunk
120                .column(*col_idx)
121                .and_then(|c| c.get_value(row))
122                .unwrap_or(Value::Null),
123            PropertySource::Constant(v) => v.clone(),
124            PropertySource::PropertyAccess { column, property } => {
125                let Some(col) = chunk.column(*column) else {
126                    return Value::Null;
127                };
128                // Try node ID first, then edge ID, then map value
129                if let Some(node_id) = col.get_node_id(row) {
130                    store
131                        .get_node(node_id)
132                        .and_then(|node| node.get_property(property).cloned())
133                        .unwrap_or(Value::Null)
134                } else if let Some(edge_id) = col.get_edge_id(row) {
135                    store
136                        .get_edge(edge_id)
137                        .and_then(|edge| edge.get_property(property).cloned())
138                        .unwrap_or(Value::Null)
139                } else if let Some(Value::Map(map)) = col.get_value(row) {
140                    let key = PropertyKey::new(property);
141                    map.get(&key).cloned().unwrap_or(Value::Null)
142                } else {
143                    Value::Null
144                }
145            }
146        }
147    }
148}
149
150impl CreateNodeOperator {
151    /// Creates a new node creation operator.
152    ///
153    /// # Arguments
154    /// * `store` - The graph store to modify.
155    /// * `input` - Optional input operator (None for standalone CREATE).
156    /// * `labels` - Labels to assign to created nodes.
157    /// * `properties` - Properties to set on created nodes.
158    /// * `output_schema` - Schema of the output.
159    /// * `output_column` - Column index where the created node ID goes.
160    pub fn new(
161        store: Arc<dyn GraphStoreMut>,
162        input: Option<Box<dyn Operator>>,
163        labels: Vec<String>,
164        properties: Vec<(String, PropertySource)>,
165        output_schema: Vec<LogicalType>,
166        output_column: usize,
167    ) -> Self {
168        Self {
169            store,
170            input,
171            labels,
172            properties,
173            output_schema,
174            output_column,
175            executed: false,
176            viewing_epoch: None,
177            tx_id: None,
178            validator: None,
179        }
180    }
181
182    /// Sets the transaction context for MVCC versioning.
183    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
184        self.viewing_epoch = Some(epoch);
185        self.tx_id = tx_id;
186        self
187    }
188
189    /// Sets the constraint validator for schema enforcement.
190    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
191        self.validator = Some(validator);
192        self
193    }
194}
195
196impl CreateNodeOperator {
197    /// Validates and sets properties on a newly created node.
198    fn validate_and_set_properties(
199        &self,
200        node_id: NodeId,
201        resolved_props: &[(String, Value)],
202    ) -> Result<(), OperatorError> {
203        // Phase 1: Validate each property value
204        if let Some(ref validator) = self.validator {
205            for (name, value) in resolved_props {
206                validator.validate_node_property(&self.labels, name, value)?;
207                validator.check_unique_node_property(&self.labels, name, value)?;
208            }
209            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
210            validator.validate_node_complete(&self.labels, resolved_props)?;
211        }
212
213        // Phase 3: Write properties to the store
214        for (name, value) in resolved_props {
215            self.store.set_node_property(node_id, name, value.clone());
216        }
217        Ok(())
218    }
219}
220
221impl Operator for CreateNodeOperator {
222    fn next(&mut self) -> OperatorResult {
223        // Get transaction context for versioned creation
224        let epoch = self
225            .viewing_epoch
226            .unwrap_or_else(|| self.store.current_epoch());
227        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
228
229        if let Some(ref mut input) = self.input {
230            // For each input row, create a node
231            if let Some(chunk) = input.next()? {
232                let mut builder =
233                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
234
235                for row in chunk.selected_indices() {
236                    // Resolve all property values first (before creating node)
237                    let resolved_props: Vec<(String, Value)> = self
238                        .properties
239                        .iter()
240                        .map(|(name, source)| {
241                            let value =
242                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
243                            (name.clone(), value)
244                        })
245                        .collect();
246
247                    // Create the node with MVCC versioning
248                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
249                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
250
251                    // Validate and set properties
252                    self.validate_and_set_properties(node_id, &resolved_props)?;
253
254                    // Copy input columns to output
255                    for col_idx in 0..chunk.column_count() {
256                        if col_idx < self.output_column
257                            && let (Some(src), Some(dst)) =
258                                (chunk.column(col_idx), builder.column_mut(col_idx))
259                        {
260                            if let Some(val) = src.get_value(row) {
261                                dst.push_value(val);
262                            } else {
263                                dst.push_value(Value::Null);
264                            }
265                        }
266                    }
267
268                    // Add the new node ID
269                    if let Some(dst) = builder.column_mut(self.output_column) {
270                        dst.push_value(Value::Int64(node_id.0 as i64));
271                    }
272
273                    builder.advance_row();
274                }
275
276                return Ok(Some(builder.finish()));
277            }
278            Ok(None)
279        } else {
280            // No input - create a single node
281            if self.executed {
282                return Ok(None);
283            }
284            self.executed = true;
285
286            // Resolve constant properties
287            let resolved_props: Vec<(String, Value)> = self
288                .properties
289                .iter()
290                .filter_map(|(name, source)| {
291                    if let PropertySource::Constant(value) = source {
292                        Some((name.clone(), value.clone()))
293                    } else {
294                        None
295                    }
296                })
297                .collect();
298
299            // Create the node with MVCC versioning
300            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
301            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
302
303            // Validate and set properties
304            self.validate_and_set_properties(node_id, &resolved_props)?;
305
306            // Build output chunk with just the node ID
307            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
308            if let Some(dst) = builder.column_mut(self.output_column) {
309                dst.push_value(Value::Int64(node_id.0 as i64));
310            }
311            builder.advance_row();
312
313            Ok(Some(builder.finish()))
314        }
315    }
316
317    fn reset(&mut self) {
318        if let Some(ref mut input) = self.input {
319            input.reset();
320        }
321        self.executed = false;
322    }
323
324    fn name(&self) -> &'static str {
325        "CreateNode"
326    }
327}
328
329/// Operator that creates new edges.
330pub struct CreateEdgeOperator {
331    /// The graph store to modify.
332    store: Arc<dyn GraphStoreMut>,
333    /// Input operator.
334    input: Box<dyn Operator>,
335    /// Column index for the source node.
336    from_column: usize,
337    /// Column index for the target node.
338    to_column: usize,
339    /// Edge type.
340    edge_type: String,
341    /// Properties to set.
342    properties: Vec<(String, PropertySource)>,
343    /// Output schema.
344    output_schema: Vec<LogicalType>,
345    /// Column index for the created edge variable (if any).
346    output_column: Option<usize>,
347    /// Epoch for MVCC versioning.
348    viewing_epoch: Option<EpochId>,
349    /// Transaction ID for MVCC versioning.
350    tx_id: Option<TxId>,
351    /// Optional constraint validator for schema enforcement.
352    validator: Option<Arc<dyn ConstraintValidator>>,
353}
354
355impl CreateEdgeOperator {
356    /// Creates a new edge creation operator.
357    ///
358    /// Use builder methods to set additional options:
359    /// - [`with_properties`](Self::with_properties) - set edge properties
360    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
361    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
362    pub fn new(
363        store: Arc<dyn GraphStoreMut>,
364        input: Box<dyn Operator>,
365        from_column: usize,
366        to_column: usize,
367        edge_type: String,
368        output_schema: Vec<LogicalType>,
369    ) -> Self {
370        Self {
371            store,
372            input,
373            from_column,
374            to_column,
375            edge_type,
376            properties: Vec::new(),
377            output_schema,
378            output_column: None,
379            viewing_epoch: None,
380            tx_id: None,
381            validator: None,
382        }
383    }
384
385    /// Sets the properties to assign to created edges.
386    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
387        self.properties = properties;
388        self
389    }
390
391    /// Sets the output column for the created edge ID.
392    pub fn with_output_column(mut self, column: usize) -> Self {
393        self.output_column = Some(column);
394        self
395    }
396
397    /// Sets the transaction context for MVCC versioning.
398    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
399        self.viewing_epoch = Some(epoch);
400        self.tx_id = tx_id;
401        self
402    }
403
404    /// Sets the constraint validator for schema enforcement.
405    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
406        self.validator = Some(validator);
407        self
408    }
409}
410
411impl Operator for CreateEdgeOperator {
412    fn next(&mut self) -> OperatorResult {
413        // Get transaction context for versioned creation
414        let epoch = self
415            .viewing_epoch
416            .unwrap_or_else(|| self.store.current_epoch());
417        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
418
419        if let Some(chunk) = self.input.next()? {
420            let mut builder =
421                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
422
423            for row in chunk.selected_indices() {
424                // Get source and target node IDs
425                let from_id = chunk
426                    .column(self.from_column)
427                    .and_then(|c| c.get_value(row))
428                    .ok_or_else(|| {
429                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
430                    })?;
431
432                let to_id = chunk
433                    .column(self.to_column)
434                    .and_then(|c| c.get_value(row))
435                    .ok_or_else(|| {
436                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
437                    })?;
438
439                // Extract node IDs
440                let from_node_id = match from_id {
441                    Value::Int64(id) => NodeId(id as u64),
442                    _ => {
443                        return Err(OperatorError::TypeMismatch {
444                            expected: "Int64 (node ID)".to_string(),
445                            found: format!("{from_id:?}"),
446                        });
447                    }
448                };
449
450                let to_node_id = match to_id {
451                    Value::Int64(id) => NodeId(id as u64),
452                    _ => {
453                        return Err(OperatorError::TypeMismatch {
454                            expected: "Int64 (node ID)".to_string(),
455                            found: format!("{to_id:?}"),
456                        });
457                    }
458                };
459
460                // Resolve property values
461                let resolved_props: Vec<(String, Value)> = self
462                    .properties
463                    .iter()
464                    .map(|(name, source)| {
465                        let value =
466                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
467                        (name.clone(), value)
468                    })
469                    .collect();
470
471                // Validate constraints before writing
472                if let Some(ref validator) = self.validator {
473                    for (name, value) in &resolved_props {
474                        validator.validate_edge_property(&self.edge_type, name, value)?;
475                    }
476                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
477                }
478
479                // Create the edge with MVCC versioning
480                let edge_id = self.store.create_edge_versioned(
481                    from_node_id,
482                    to_node_id,
483                    &self.edge_type,
484                    epoch,
485                    tx,
486                );
487
488                // Set properties
489                for (name, value) in resolved_props {
490                    self.store.set_edge_property(edge_id, &name, value);
491                }
492
493                // Copy input columns
494                for col_idx in 0..chunk.column_count() {
495                    if let (Some(src), Some(dst)) =
496                        (chunk.column(col_idx), builder.column_mut(col_idx))
497                    {
498                        if let Some(val) = src.get_value(row) {
499                            dst.push_value(val);
500                        } else {
501                            dst.push_value(Value::Null);
502                        }
503                    }
504                }
505
506                // Add edge ID if requested
507                if let Some(out_col) = self.output_column
508                    && let Some(dst) = builder.column_mut(out_col)
509                {
510                    dst.push_value(Value::Int64(edge_id.0 as i64));
511                }
512
513                builder.advance_row();
514            }
515
516            return Ok(Some(builder.finish()));
517        }
518        Ok(None)
519    }
520
521    fn reset(&mut self) {
522        self.input.reset();
523    }
524
525    fn name(&self) -> &'static str {
526        "CreateEdge"
527    }
528}
529
530/// Operator that deletes nodes.
531pub struct DeleteNodeOperator {
532    /// The graph store to modify.
533    store: Arc<dyn GraphStoreMut>,
534    /// Input operator.
535    input: Box<dyn Operator>,
536    /// Column index for the node to delete.
537    node_column: usize,
538    /// Output schema.
539    output_schema: Vec<LogicalType>,
540    /// Whether to detach (delete connected edges) before deleting.
541    detach: bool,
542    /// Epoch for MVCC versioning.
543    viewing_epoch: Option<EpochId>,
544    /// Transaction ID for MVCC versioning.
545    tx_id: Option<TxId>,
546}
547
548impl DeleteNodeOperator {
549    /// Creates a new node deletion operator.
550    pub fn new(
551        store: Arc<dyn GraphStoreMut>,
552        input: Box<dyn Operator>,
553        node_column: usize,
554        output_schema: Vec<LogicalType>,
555        detach: bool,
556    ) -> Self {
557        Self {
558            store,
559            input,
560            node_column,
561            output_schema,
562            detach,
563            viewing_epoch: None,
564            tx_id: None,
565        }
566    }
567
568    /// Sets the transaction context for MVCC versioning.
569    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
570        self.viewing_epoch = Some(epoch);
571        self.tx_id = tx_id;
572        self
573    }
574}
575
576impl Operator for DeleteNodeOperator {
577    fn next(&mut self) -> OperatorResult {
578        // Get transaction context for versioned deletion
579        let epoch = self
580            .viewing_epoch
581            .unwrap_or_else(|| self.store.current_epoch());
582        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
583
584        if let Some(chunk) = self.input.next()? {
585            let mut deleted_count = 0;
586
587            for row in chunk.selected_indices() {
588                let node_val = chunk
589                    .column(self.node_column)
590                    .and_then(|c| c.get_value(row))
591                    .ok_or_else(|| {
592                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
593                    })?;
594
595                let node_id = match node_val {
596                    Value::Int64(id) => NodeId(id as u64),
597                    _ => {
598                        return Err(OperatorError::TypeMismatch {
599                            expected: "Int64 (node ID)".to_string(),
600                            found: format!("{node_val:?}"),
601                        });
602                    }
603                };
604
605                if self.detach {
606                    // Delete all connected edges first
607                    self.store.delete_node_edges(node_id);
608                } else {
609                    // NODETACH: check that node has no connected edges
610                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
611                    if degree > 0 {
612                        return Err(OperatorError::ConstraintViolation(format!(
613                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
614                            degree
615                        )));
616                    }
617                }
618
619                // Delete the node with MVCC versioning
620                if self.store.delete_node_versioned(node_id, epoch, tx) {
621                    deleted_count += 1;
622                }
623            }
624
625            // Return a chunk with the delete count
626            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627            if let Some(dst) = builder.column_mut(0) {
628                dst.push_value(Value::Int64(deleted_count));
629            }
630            builder.advance_row();
631
632            return Ok(Some(builder.finish()));
633        }
634        Ok(None)
635    }
636
637    fn reset(&mut self) {
638        self.input.reset();
639    }
640
641    fn name(&self) -> &'static str {
642        "DeleteNode"
643    }
644}
645
646/// Operator that deletes edges.
647pub struct DeleteEdgeOperator {
648    /// The graph store to modify.
649    store: Arc<dyn GraphStoreMut>,
650    /// Input operator.
651    input: Box<dyn Operator>,
652    /// Column index for the edge to delete.
653    edge_column: usize,
654    /// Output schema.
655    output_schema: Vec<LogicalType>,
656    /// Epoch for MVCC versioning.
657    viewing_epoch: Option<EpochId>,
658    /// Transaction ID for MVCC versioning.
659    tx_id: Option<TxId>,
660}
661
662impl DeleteEdgeOperator {
663    /// Creates a new edge deletion operator.
664    pub fn new(
665        store: Arc<dyn GraphStoreMut>,
666        input: Box<dyn Operator>,
667        edge_column: usize,
668        output_schema: Vec<LogicalType>,
669    ) -> Self {
670        Self {
671            store,
672            input,
673            edge_column,
674            output_schema,
675            viewing_epoch: None,
676            tx_id: None,
677        }
678    }
679
680    /// Sets the transaction context for MVCC versioning.
681    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
682        self.viewing_epoch = Some(epoch);
683        self.tx_id = tx_id;
684        self
685    }
686}
687
688impl Operator for DeleteEdgeOperator {
689    fn next(&mut self) -> OperatorResult {
690        // Get transaction context for versioned deletion
691        let epoch = self
692            .viewing_epoch
693            .unwrap_or_else(|| self.store.current_epoch());
694        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
695
696        if let Some(chunk) = self.input.next()? {
697            let mut deleted_count = 0;
698
699            for row in chunk.selected_indices() {
700                let edge_val = chunk
701                    .column(self.edge_column)
702                    .and_then(|c| c.get_value(row))
703                    .ok_or_else(|| {
704                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
705                    })?;
706
707                let edge_id = match edge_val {
708                    Value::Int64(id) => EdgeId(id as u64),
709                    _ => {
710                        return Err(OperatorError::TypeMismatch {
711                            expected: "Int64 (edge ID)".to_string(),
712                            found: format!("{edge_val:?}"),
713                        });
714                    }
715                };
716
717                // Delete the edge with MVCC versioning
718                if self.store.delete_edge_versioned(edge_id, epoch, tx) {
719                    deleted_count += 1;
720                }
721            }
722
723            // Return a chunk with the delete count
724            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725            if let Some(dst) = builder.column_mut(0) {
726                dst.push_value(Value::Int64(deleted_count));
727            }
728            builder.advance_row();
729
730            return Ok(Some(builder.finish()));
731        }
732        Ok(None)
733    }
734
735    fn reset(&mut self) {
736        self.input.reset();
737    }
738
739    fn name(&self) -> &'static str {
740        "DeleteEdge"
741    }
742}
743
744/// Operator that adds labels to nodes.
745pub struct AddLabelOperator {
746    /// The graph store.
747    store: Arc<dyn GraphStoreMut>,
748    /// Child operator providing nodes.
749    input: Box<dyn Operator>,
750    /// Column index containing node IDs.
751    node_column: usize,
752    /// Labels to add.
753    labels: Vec<String>,
754    /// Output schema.
755    output_schema: Vec<LogicalType>,
756}
757
758impl AddLabelOperator {
759    /// Creates a new add label operator.
760    pub fn new(
761        store: Arc<dyn GraphStoreMut>,
762        input: Box<dyn Operator>,
763        node_column: usize,
764        labels: Vec<String>,
765        output_schema: Vec<LogicalType>,
766    ) -> Self {
767        Self {
768            store,
769            input,
770            node_column,
771            labels,
772            output_schema,
773        }
774    }
775}
776
777impl Operator for AddLabelOperator {
778    fn next(&mut self) -> OperatorResult {
779        if let Some(chunk) = self.input.next()? {
780            let mut updated_count = 0;
781
782            for row in chunk.selected_indices() {
783                let node_val = chunk
784                    .column(self.node_column)
785                    .and_then(|c| c.get_value(row))
786                    .ok_or_else(|| {
787                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
788                    })?;
789
790                let node_id = match node_val {
791                    Value::Int64(id) => NodeId(id as u64),
792                    _ => {
793                        return Err(OperatorError::TypeMismatch {
794                            expected: "Int64 (node ID)".to_string(),
795                            found: format!("{node_val:?}"),
796                        });
797                    }
798                };
799
800                // Add all labels
801                for label in &self.labels {
802                    if self.store.add_label(node_id, label) {
803                        updated_count += 1;
804                    }
805                }
806            }
807
808            // Return a chunk with the update count
809            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
810            if let Some(dst) = builder.column_mut(0) {
811                dst.push_value(Value::Int64(updated_count));
812            }
813            builder.advance_row();
814
815            return Ok(Some(builder.finish()));
816        }
817        Ok(None)
818    }
819
820    fn reset(&mut self) {
821        self.input.reset();
822    }
823
824    fn name(&self) -> &'static str {
825        "AddLabel"
826    }
827}
828
829/// Operator that removes labels from nodes.
830pub struct RemoveLabelOperator {
831    /// The graph store.
832    store: Arc<dyn GraphStoreMut>,
833    /// Child operator providing nodes.
834    input: Box<dyn Operator>,
835    /// Column index containing node IDs.
836    node_column: usize,
837    /// Labels to remove.
838    labels: Vec<String>,
839    /// Output schema.
840    output_schema: Vec<LogicalType>,
841}
842
843impl RemoveLabelOperator {
844    /// Creates a new remove label operator.
845    pub fn new(
846        store: Arc<dyn GraphStoreMut>,
847        input: Box<dyn Operator>,
848        node_column: usize,
849        labels: Vec<String>,
850        output_schema: Vec<LogicalType>,
851    ) -> Self {
852        Self {
853            store,
854            input,
855            node_column,
856            labels,
857            output_schema,
858        }
859    }
860}
861
862impl Operator for RemoveLabelOperator {
863    fn next(&mut self) -> OperatorResult {
864        if let Some(chunk) = self.input.next()? {
865            let mut updated_count = 0;
866
867            for row in chunk.selected_indices() {
868                let node_val = chunk
869                    .column(self.node_column)
870                    .and_then(|c| c.get_value(row))
871                    .ok_or_else(|| {
872                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
873                    })?;
874
875                let node_id = match node_val {
876                    Value::Int64(id) => NodeId(id as u64),
877                    _ => {
878                        return Err(OperatorError::TypeMismatch {
879                            expected: "Int64 (node ID)".to_string(),
880                            found: format!("{node_val:?}"),
881                        });
882                    }
883                };
884
885                // Remove all labels
886                for label in &self.labels {
887                    if self.store.remove_label(node_id, label) {
888                        updated_count += 1;
889                    }
890                }
891            }
892
893            // Return a chunk with the update count
894            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
895            if let Some(dst) = builder.column_mut(0) {
896                dst.push_value(Value::Int64(updated_count));
897            }
898            builder.advance_row();
899
900            return Ok(Some(builder.finish()));
901        }
902        Ok(None)
903    }
904
905    fn reset(&mut self) {
906        self.input.reset();
907    }
908
909    fn name(&self) -> &'static str {
910        "RemoveLabel"
911    }
912}
913
914/// Operator that sets properties on nodes or edges.
915///
916/// This operator reads node/edge IDs from a column and sets the
917/// specified properties on each entity.
918pub struct SetPropertyOperator {
919    /// The graph store.
920    store: Arc<dyn GraphStoreMut>,
921    /// Child operator providing entities.
922    input: Box<dyn Operator>,
923    /// Column index containing entity IDs (node or edge).
924    entity_column: usize,
925    /// Whether the entity is an edge (false = node).
926    is_edge: bool,
927    /// Properties to set (name -> source).
928    properties: Vec<(String, PropertySource)>,
929    /// Output schema.
930    output_schema: Vec<LogicalType>,
931    /// Whether to replace all properties (true) or merge (false) for map assignments.
932    replace: bool,
933    /// Optional constraint validator for schema enforcement.
934    validator: Option<Arc<dyn ConstraintValidator>>,
935    /// Entity labels (for node constraint validation).
936    labels: Vec<String>,
937    /// Edge type (for edge constraint validation).
938    edge_type_name: Option<String>,
939}
940
941impl SetPropertyOperator {
942    /// Creates a new set property operator for nodes.
943    pub fn new_for_node(
944        store: Arc<dyn GraphStoreMut>,
945        input: Box<dyn Operator>,
946        node_column: usize,
947        properties: Vec<(String, PropertySource)>,
948        output_schema: Vec<LogicalType>,
949    ) -> Self {
950        Self {
951            store,
952            input,
953            entity_column: node_column,
954            is_edge: false,
955            properties,
956            output_schema,
957            replace: false,
958            validator: None,
959            labels: Vec::new(),
960            edge_type_name: None,
961        }
962    }
963
964    /// Creates a new set property operator for edges.
965    pub fn new_for_edge(
966        store: Arc<dyn GraphStoreMut>,
967        input: Box<dyn Operator>,
968        edge_column: usize,
969        properties: Vec<(String, PropertySource)>,
970        output_schema: Vec<LogicalType>,
971    ) -> Self {
972        Self {
973            store,
974            input,
975            entity_column: edge_column,
976            is_edge: true,
977            properties,
978            output_schema,
979            replace: false,
980            validator: None,
981            labels: Vec::new(),
982            edge_type_name: None,
983        }
984    }
985
986    /// Sets whether this operator replaces all properties (for map assignment).
987    pub fn with_replace(mut self, replace: bool) -> Self {
988        self.replace = replace;
989        self
990    }
991
992    /// Sets the constraint validator for schema enforcement.
993    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
994        self.validator = Some(validator);
995        self
996    }
997
998    /// Sets the entity labels (for node constraint validation).
999    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1000        self.labels = labels;
1001        self
1002    }
1003
1004    /// Sets the edge type name (for edge constraint validation).
1005    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1006        self.edge_type_name = Some(edge_type);
1007        self
1008    }
1009}
1010
1011impl Operator for SetPropertyOperator {
1012    fn next(&mut self) -> OperatorResult {
1013        if let Some(chunk) = self.input.next()? {
1014            let mut builder =
1015                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1016
1017            for row in chunk.selected_indices() {
1018                let entity_val = chunk
1019                    .column(self.entity_column)
1020                    .and_then(|c| c.get_value(row))
1021                    .ok_or_else(|| {
1022                        OperatorError::ColumnNotFound(format!(
1023                            "entity column {}",
1024                            self.entity_column
1025                        ))
1026                    })?;
1027
1028                let entity_id = match entity_val {
1029                    Value::Int64(id) => id as u64,
1030                    _ => {
1031                        return Err(OperatorError::TypeMismatch {
1032                            expected: "Int64 (entity ID)".to_string(),
1033                            found: format!("{entity_val:?}"),
1034                        });
1035                    }
1036                };
1037
1038                // Resolve all property values
1039                let resolved_props: Vec<(String, Value)> = self
1040                    .properties
1041                    .iter()
1042                    .map(|(name, source)| {
1043                        let value =
1044                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1045                        (name.clone(), value)
1046                    })
1047                    .collect();
1048
1049                // Validate constraints before writing
1050                if let Some(ref validator) = self.validator {
1051                    if self.is_edge {
1052                        if let Some(ref et) = self.edge_type_name {
1053                            for (name, value) in &resolved_props {
1054                                validator.validate_edge_property(et, name, value)?;
1055                            }
1056                        }
1057                    } else {
1058                        for (name, value) in &resolved_props {
1059                            validator.validate_node_property(&self.labels, name, value)?;
1060                            validator.check_unique_node_property(&self.labels, name, value)?;
1061                        }
1062                    }
1063                }
1064
1065                // Write all properties
1066                for (prop_name, value) in resolved_props {
1067                    if prop_name == "*" {
1068                        // Map assignment: value should be a Map
1069                        if let Value::Map(map) = value {
1070                            if self.replace {
1071                                // Replace: remove all existing properties first
1072                                if self.is_edge {
1073                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1074                                        let keys: Vec<String> = edge
1075                                            .properties
1076                                            .iter()
1077                                            .map(|(k, _)| k.as_str().to_string())
1078                                            .collect();
1079                                        for key in keys {
1080                                            self.store
1081                                                .remove_edge_property(EdgeId(entity_id), &key);
1082                                        }
1083                                    }
1084                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1085                                    let keys: Vec<String> = node
1086                                        .properties
1087                                        .iter()
1088                                        .map(|(k, _)| k.as_str().to_string())
1089                                        .collect();
1090                                    for key in keys {
1091                                        self.store.remove_node_property(NodeId(entity_id), &key);
1092                                    }
1093                                }
1094                            }
1095                            // Set each map entry
1096                            for (key, val) in map.iter() {
1097                                if self.is_edge {
1098                                    self.store.set_edge_property(
1099                                        EdgeId(entity_id),
1100                                        key.as_str(),
1101                                        val.clone(),
1102                                    );
1103                                } else {
1104                                    self.store.set_node_property(
1105                                        NodeId(entity_id),
1106                                        key.as_str(),
1107                                        val.clone(),
1108                                    );
1109                                }
1110                            }
1111                        }
1112                    } else if self.is_edge {
1113                        self.store
1114                            .set_edge_property(EdgeId(entity_id), &prop_name, value);
1115                    } else {
1116                        self.store
1117                            .set_node_property(NodeId(entity_id), &prop_name, value);
1118                    }
1119                }
1120
1121                // Copy input columns to output
1122                for col_idx in 0..chunk.column_count() {
1123                    if let (Some(src), Some(dst)) =
1124                        (chunk.column(col_idx), builder.column_mut(col_idx))
1125                    {
1126                        if let Some(val) = src.get_value(row) {
1127                            dst.push_value(val);
1128                        } else {
1129                            dst.push_value(Value::Null);
1130                        }
1131                    }
1132                }
1133
1134                builder.advance_row();
1135            }
1136
1137            return Ok(Some(builder.finish()));
1138        }
1139        Ok(None)
1140    }
1141
1142    fn reset(&mut self) {
1143        self.input.reset();
1144    }
1145
1146    fn name(&self) -> &'static str {
1147        "SetProperty"
1148    }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use super::*;
1154    use crate::execution::DataChunk;
1155    use crate::execution::chunk::DataChunkBuilder;
1156    use crate::graph::lpg::LpgStore;
1157
1158    // ── Helpers ────────────────────────────────────────────────────
1159
1160    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1161        Arc::new(LpgStore::new().unwrap())
1162    }
1163
1164    struct MockInput {
1165        chunk: Option<DataChunk>,
1166    }
1167
1168    impl MockInput {
1169        fn boxed(chunk: DataChunk) -> Box<Self> {
1170            Box::new(Self { chunk: Some(chunk) })
1171        }
1172    }
1173
1174    impl Operator for MockInput {
1175        fn next(&mut self) -> OperatorResult {
1176            Ok(self.chunk.take())
1177        }
1178        fn reset(&mut self) {}
1179        fn name(&self) -> &'static str {
1180            "MockInput"
1181        }
1182    }
1183
1184    struct EmptyInput;
1185    impl Operator for EmptyInput {
1186        fn next(&mut self) -> OperatorResult {
1187            Ok(None)
1188        }
1189        fn reset(&mut self) {}
1190        fn name(&self) -> &'static str {
1191            "EmptyInput"
1192        }
1193    }
1194
1195    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1196        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1197        for id in ids {
1198            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1199            builder.advance_row();
1200        }
1201        builder.finish()
1202    }
1203
1204    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1205        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1206        for id in ids {
1207            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1208            builder.advance_row();
1209        }
1210        builder.finish()
1211    }
1212
1213    // ── CreateNodeOperator ──────────────────────────────────────
1214
1215    #[test]
1216    fn test_create_node_standalone() {
1217        let store = create_test_store();
1218
1219        let mut op = CreateNodeOperator::new(
1220            Arc::clone(&store),
1221            None,
1222            vec!["Person".to_string()],
1223            vec![(
1224                "name".to_string(),
1225                PropertySource::Constant(Value::String("Alix".into())),
1226            )],
1227            vec![LogicalType::Int64],
1228            0,
1229        );
1230
1231        let chunk = op.next().unwrap().unwrap();
1232        assert_eq!(chunk.row_count(), 1);
1233
1234        // Second call should return None (standalone executes once)
1235        assert!(op.next().unwrap().is_none());
1236
1237        assert_eq!(store.node_count(), 1);
1238    }
1239
1240    #[test]
1241    fn test_create_edge() {
1242        let store = create_test_store();
1243
1244        let node1 = store.create_node(&["Person"]);
1245        let node2 = store.create_node(&["Person"]);
1246
1247        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1248        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1249        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1250        builder.advance_row();
1251
1252        let mut op = CreateEdgeOperator::new(
1253            Arc::clone(&store),
1254            MockInput::boxed(builder.finish()),
1255            0,
1256            1,
1257            "KNOWS".to_string(),
1258            vec![LogicalType::Int64, LogicalType::Int64],
1259        );
1260
1261        let _chunk = op.next().unwrap().unwrap();
1262        assert_eq!(store.edge_count(), 1);
1263    }
1264
1265    #[test]
1266    fn test_delete_node() {
1267        let store = create_test_store();
1268
1269        let node_id = store.create_node(&["Person"]);
1270        assert_eq!(store.node_count(), 1);
1271
1272        let mut op = DeleteNodeOperator::new(
1273            Arc::clone(&store),
1274            MockInput::boxed(node_id_chunk(&[node_id])),
1275            0,
1276            vec![LogicalType::Int64],
1277            false,
1278        );
1279
1280        let chunk = op.next().unwrap().unwrap();
1281        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1282        assert_eq!(deleted, 1);
1283        assert_eq!(store.node_count(), 0);
1284    }
1285
1286    // ── DeleteEdgeOperator ───────────────────────────────────────
1287
1288    #[test]
1289    fn test_delete_edge() {
1290        let store = create_test_store();
1291
1292        let n1 = store.create_node(&["Person"]);
1293        let n2 = store.create_node(&["Person"]);
1294        let eid = store.create_edge(n1, n2, "KNOWS");
1295        assert_eq!(store.edge_count(), 1);
1296
1297        let mut op = DeleteEdgeOperator::new(
1298            Arc::clone(&store),
1299            MockInput::boxed(edge_id_chunk(&[eid])),
1300            0,
1301            vec![LogicalType::Int64],
1302        );
1303
1304        let chunk = op.next().unwrap().unwrap();
1305        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1306        assert_eq!(deleted, 1);
1307        assert_eq!(store.edge_count(), 0);
1308    }
1309
1310    #[test]
1311    fn test_delete_edge_no_input_returns_none() {
1312        let store = create_test_store();
1313
1314        let mut op = DeleteEdgeOperator::new(
1315            Arc::clone(&store),
1316            Box::new(EmptyInput),
1317            0,
1318            vec![LogicalType::Int64],
1319        );
1320
1321        assert!(op.next().unwrap().is_none());
1322    }
1323
1324    #[test]
1325    fn test_delete_multiple_edges() {
1326        let store = create_test_store();
1327
1328        let n1 = store.create_node(&["N"]);
1329        let n2 = store.create_node(&["N"]);
1330        let e1 = store.create_edge(n1, n2, "R");
1331        let e2 = store.create_edge(n2, n1, "S");
1332        assert_eq!(store.edge_count(), 2);
1333
1334        let mut op = DeleteEdgeOperator::new(
1335            Arc::clone(&store),
1336            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1337            0,
1338            vec![LogicalType::Int64],
1339        );
1340
1341        let chunk = op.next().unwrap().unwrap();
1342        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1343        assert_eq!(deleted, 2);
1344        assert_eq!(store.edge_count(), 0);
1345    }
1346
1347    // ── DeleteNodeOperator with DETACH ───────────────────────────
1348
1349    #[test]
1350    fn test_delete_node_detach() {
1351        let store = create_test_store();
1352
1353        let n1 = store.create_node(&["Person"]);
1354        let n2 = store.create_node(&["Person"]);
1355        store.create_edge(n1, n2, "KNOWS");
1356        store.create_edge(n2, n1, "FOLLOWS");
1357        assert_eq!(store.edge_count(), 2);
1358
1359        let mut op = DeleteNodeOperator::new(
1360            Arc::clone(&store),
1361            MockInput::boxed(node_id_chunk(&[n1])),
1362            0,
1363            vec![LogicalType::Int64],
1364            true, // detach = true
1365        );
1366
1367        let chunk = op.next().unwrap().unwrap();
1368        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1369        assert_eq!(deleted, 1);
1370        assert_eq!(store.node_count(), 1);
1371        assert_eq!(store.edge_count(), 0); // edges detached
1372    }
1373
1374    // ── AddLabelOperator ─────────────────────────────────────────
1375
1376    #[test]
1377    fn test_add_label() {
1378        let store = create_test_store();
1379
1380        let node = store.create_node(&["Person"]);
1381
1382        let mut op = AddLabelOperator::new(
1383            Arc::clone(&store),
1384            MockInput::boxed(node_id_chunk(&[node])),
1385            0,
1386            vec!["Employee".to_string()],
1387            vec![LogicalType::Int64],
1388        );
1389
1390        let chunk = op.next().unwrap().unwrap();
1391        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1392        assert_eq!(updated, 1);
1393
1394        // Verify label was added
1395        let node_data = store.get_node(node).unwrap();
1396        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1397        assert!(labels.contains(&"Person"));
1398        assert!(labels.contains(&"Employee"));
1399    }
1400
1401    #[test]
1402    fn test_add_multiple_labels() {
1403        let store = create_test_store();
1404
1405        let node = store.create_node(&["Base"]);
1406
1407        let mut op = AddLabelOperator::new(
1408            Arc::clone(&store),
1409            MockInput::boxed(node_id_chunk(&[node])),
1410            0,
1411            vec!["LabelA".to_string(), "LabelB".to_string()],
1412            vec![LogicalType::Int64],
1413        );
1414
1415        let chunk = op.next().unwrap().unwrap();
1416        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1417        assert_eq!(updated, 2); // 2 labels added
1418
1419        let node_data = store.get_node(node).unwrap();
1420        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1421        assert!(labels.contains(&"LabelA"));
1422        assert!(labels.contains(&"LabelB"));
1423    }
1424
1425    #[test]
1426    fn test_add_label_no_input_returns_none() {
1427        let store = create_test_store();
1428
1429        let mut op = AddLabelOperator::new(
1430            Arc::clone(&store),
1431            Box::new(EmptyInput),
1432            0,
1433            vec!["Foo".to_string()],
1434            vec![LogicalType::Int64],
1435        );
1436
1437        assert!(op.next().unwrap().is_none());
1438    }
1439
1440    // ── RemoveLabelOperator ──────────────────────────────────────
1441
1442    #[test]
1443    fn test_remove_label() {
1444        let store = create_test_store();
1445
1446        let node = store.create_node(&["Person", "Employee"]);
1447
1448        let mut op = RemoveLabelOperator::new(
1449            Arc::clone(&store),
1450            MockInput::boxed(node_id_chunk(&[node])),
1451            0,
1452            vec!["Employee".to_string()],
1453            vec![LogicalType::Int64],
1454        );
1455
1456        let chunk = op.next().unwrap().unwrap();
1457        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1458        assert_eq!(updated, 1);
1459
1460        // Verify label was removed
1461        let node_data = store.get_node(node).unwrap();
1462        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1463        assert!(labels.contains(&"Person"));
1464        assert!(!labels.contains(&"Employee"));
1465    }
1466
1467    #[test]
1468    fn test_remove_nonexistent_label() {
1469        let store = create_test_store();
1470
1471        let node = store.create_node(&["Person"]);
1472
1473        let mut op = RemoveLabelOperator::new(
1474            Arc::clone(&store),
1475            MockInput::boxed(node_id_chunk(&[node])),
1476            0,
1477            vec!["NonExistent".to_string()],
1478            vec![LogicalType::Int64],
1479        );
1480
1481        let chunk = op.next().unwrap().unwrap();
1482        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1483        assert_eq!(updated, 0); // nothing removed
1484    }
1485
1486    // ── SetPropertyOperator ──────────────────────────────────────
1487
1488    #[test]
1489    fn test_set_node_property_constant() {
1490        let store = create_test_store();
1491
1492        let node = store.create_node(&["Person"]);
1493
1494        let mut op = SetPropertyOperator::new_for_node(
1495            Arc::clone(&store),
1496            MockInput::boxed(node_id_chunk(&[node])),
1497            0,
1498            vec![(
1499                "name".to_string(),
1500                PropertySource::Constant(Value::String("Alix".into())),
1501            )],
1502            vec![LogicalType::Int64],
1503        );
1504
1505        let chunk = op.next().unwrap().unwrap();
1506        assert_eq!(chunk.row_count(), 1);
1507
1508        // Verify property was set
1509        let node_data = store.get_node(node).unwrap();
1510        assert_eq!(
1511            node_data
1512                .properties
1513                .get(&grafeo_common::types::PropertyKey::new("name")),
1514            Some(&Value::String("Alix".into()))
1515        );
1516    }
1517
1518    #[test]
1519    fn test_set_node_property_from_column() {
1520        let store = create_test_store();
1521
1522        let node = store.create_node(&["Person"]);
1523
1524        // Input: column 0 = node ID, column 1 = property value
1525        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1526        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1527        builder
1528            .column_mut(1)
1529            .unwrap()
1530            .push_value(Value::String("Gus".into()));
1531        builder.advance_row();
1532
1533        let mut op = SetPropertyOperator::new_for_node(
1534            Arc::clone(&store),
1535            MockInput::boxed(builder.finish()),
1536            0,
1537            vec![("name".to_string(), PropertySource::Column(1))],
1538            vec![LogicalType::Int64, LogicalType::String],
1539        );
1540
1541        let chunk = op.next().unwrap().unwrap();
1542        assert_eq!(chunk.row_count(), 1);
1543
1544        let node_data = store.get_node(node).unwrap();
1545        assert_eq!(
1546            node_data
1547                .properties
1548                .get(&grafeo_common::types::PropertyKey::new("name")),
1549            Some(&Value::String("Gus".into()))
1550        );
1551    }
1552
1553    #[test]
1554    fn test_set_edge_property() {
1555        let store = create_test_store();
1556
1557        let n1 = store.create_node(&["N"]);
1558        let n2 = store.create_node(&["N"]);
1559        let eid = store.create_edge(n1, n2, "KNOWS");
1560
1561        let mut op = SetPropertyOperator::new_for_edge(
1562            Arc::clone(&store),
1563            MockInput::boxed(edge_id_chunk(&[eid])),
1564            0,
1565            vec![(
1566                "weight".to_string(),
1567                PropertySource::Constant(Value::Float64(0.75)),
1568            )],
1569            vec![LogicalType::Int64],
1570        );
1571
1572        let chunk = op.next().unwrap().unwrap();
1573        assert_eq!(chunk.row_count(), 1);
1574
1575        let edge_data = store.get_edge(eid).unwrap();
1576        assert_eq!(
1577            edge_data
1578                .properties
1579                .get(&grafeo_common::types::PropertyKey::new("weight")),
1580            Some(&Value::Float64(0.75))
1581        );
1582    }
1583
1584    #[test]
1585    fn test_set_multiple_properties() {
1586        let store = create_test_store();
1587
1588        let node = store.create_node(&["Person"]);
1589
1590        let mut op = SetPropertyOperator::new_for_node(
1591            Arc::clone(&store),
1592            MockInput::boxed(node_id_chunk(&[node])),
1593            0,
1594            vec![
1595                (
1596                    "name".to_string(),
1597                    PropertySource::Constant(Value::String("Alix".into())),
1598                ),
1599                (
1600                    "age".to_string(),
1601                    PropertySource::Constant(Value::Int64(30)),
1602                ),
1603            ],
1604            vec![LogicalType::Int64],
1605        );
1606
1607        op.next().unwrap().unwrap();
1608
1609        let node_data = store.get_node(node).unwrap();
1610        assert_eq!(
1611            node_data
1612                .properties
1613                .get(&grafeo_common::types::PropertyKey::new("name")),
1614            Some(&Value::String("Alix".into()))
1615        );
1616        assert_eq!(
1617            node_data
1618                .properties
1619                .get(&grafeo_common::types::PropertyKey::new("age")),
1620            Some(&Value::Int64(30))
1621        );
1622    }
1623
1624    #[test]
1625    fn test_set_property_no_input_returns_none() {
1626        let store = create_test_store();
1627
1628        let mut op = SetPropertyOperator::new_for_node(
1629            Arc::clone(&store),
1630            Box::new(EmptyInput),
1631            0,
1632            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1633            vec![LogicalType::Int64],
1634        );
1635
1636        assert!(op.next().unwrap().is_none());
1637    }
1638
1639    // ── Error paths ──────────────────────────────────────────────
1640
1641    #[test]
1642    fn test_delete_node_without_detach_errors_when_edges_exist() {
1643        let store = create_test_store();
1644
1645        let n1 = store.create_node(&["Person"]);
1646        let n2 = store.create_node(&["Person"]);
1647        store.create_edge(n1, n2, "KNOWS");
1648
1649        let mut op = DeleteNodeOperator::new(
1650            Arc::clone(&store),
1651            MockInput::boxed(node_id_chunk(&[n1])),
1652            0,
1653            vec![LogicalType::Int64],
1654            false, // no detach
1655        );
1656
1657        let err = op.next().unwrap_err();
1658        match err {
1659            OperatorError::ConstraintViolation(msg) => {
1660                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1661            }
1662            other => panic!("expected ConstraintViolation, got {other:?}"),
1663        }
1664        // Node should still exist
1665        assert_eq!(store.node_count(), 2);
1666    }
1667
1668    // ── CreateNodeOperator with input ───────────────────────────
1669
1670    #[test]
1671    fn test_create_node_with_input_operator() {
1672        let store = create_test_store();
1673
1674        // Seed node to provide input rows
1675        let existing = store.create_node(&["Seed"]);
1676
1677        let mut op = CreateNodeOperator::new(
1678            Arc::clone(&store),
1679            Some(MockInput::boxed(node_id_chunk(&[existing]))),
1680            vec!["Created".to_string()],
1681            vec![(
1682                "source".to_string(),
1683                PropertySource::Constant(Value::String("from_input".into())),
1684            )],
1685            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
1686            1,                                            // output column for new node ID
1687        );
1688
1689        let chunk = op.next().unwrap().unwrap();
1690        assert_eq!(chunk.row_count(), 1);
1691
1692        // Should have created one new node (2 total: Seed + Created)
1693        assert_eq!(store.node_count(), 2);
1694
1695        // Exhausted
1696        assert!(op.next().unwrap().is_none());
1697    }
1698
1699    // ── CreateEdgeOperator with properties and output column ────
1700
1701    #[test]
1702    fn test_create_edge_with_properties_and_output_column() {
1703        let store = create_test_store();
1704
1705        let n1 = store.create_node(&["Person"]);
1706        let n2 = store.create_node(&["Person"]);
1707
1708        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1709        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1710        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1711        builder.advance_row();
1712
1713        let mut op = CreateEdgeOperator::new(
1714            Arc::clone(&store),
1715            MockInput::boxed(builder.finish()),
1716            0,
1717            1,
1718            "KNOWS".to_string(),
1719            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1720        )
1721        .with_properties(vec![(
1722            "since".to_string(),
1723            PropertySource::Constant(Value::Int64(2024)),
1724        )])
1725        .with_output_column(2);
1726
1727        let chunk = op.next().unwrap().unwrap();
1728        assert_eq!(chunk.row_count(), 1);
1729        assert_eq!(store.edge_count(), 1);
1730
1731        // Verify the output chunk contains the edge ID in column 2
1732        let edge_id_raw = chunk
1733            .column(2)
1734            .and_then(|c| c.get_int64(0))
1735            .expect("edge ID should be in output column 2");
1736        let edge_id = EdgeId(edge_id_raw as u64);
1737
1738        // Verify the edge has the property
1739        let edge = store.get_edge(edge_id).expect("edge should exist");
1740        assert_eq!(
1741            edge.properties
1742                .get(&grafeo_common::types::PropertyKey::new("since")),
1743            Some(&Value::Int64(2024))
1744        );
1745    }
1746
1747    // ── SetPropertyOperator with map replacement ────────────────
1748
1749    #[test]
1750    fn test_set_property_map_replace() {
1751        use std::collections::BTreeMap;
1752
1753        let store = create_test_store();
1754
1755        let node = store.create_node(&["Person"]);
1756        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1757
1758        let mut map = BTreeMap::new();
1759        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1760
1761        let mut op = SetPropertyOperator::new_for_node(
1762            Arc::clone(&store),
1763            MockInput::boxed(node_id_chunk(&[node])),
1764            0,
1765            vec![(
1766                "*".to_string(),
1767                PropertySource::Constant(Value::Map(Arc::new(map))),
1768            )],
1769            vec![LogicalType::Int64],
1770        )
1771        .with_replace(true);
1772
1773        op.next().unwrap().unwrap();
1774
1775        let node_data = store.get_node(node).unwrap();
1776        // Old property should be gone
1777        assert!(
1778            node_data
1779                .properties
1780                .get(&PropertyKey::new("old_prop"))
1781                .is_none()
1782        );
1783        // New property should exist
1784        assert_eq!(
1785            node_data.properties.get(&PropertyKey::new("new_key")),
1786            Some(&Value::String("new_val".into()))
1787        );
1788    }
1789
1790    // ── SetPropertyOperator with map merge (no replace) ─────────
1791
1792    #[test]
1793    fn test_set_property_map_merge() {
1794        use std::collections::BTreeMap;
1795
1796        let store = create_test_store();
1797
1798        let node = store.create_node(&["Person"]);
1799        store.set_node_property(node, "existing", Value::Int64(42));
1800
1801        let mut map = BTreeMap::new();
1802        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1803
1804        let mut op = SetPropertyOperator::new_for_node(
1805            Arc::clone(&store),
1806            MockInput::boxed(node_id_chunk(&[node])),
1807            0,
1808            vec![(
1809                "*".to_string(),
1810                PropertySource::Constant(Value::Map(Arc::new(map))),
1811            )],
1812            vec![LogicalType::Int64],
1813        ); // replace defaults to false
1814
1815        op.next().unwrap().unwrap();
1816
1817        let node_data = store.get_node(node).unwrap();
1818        // Existing property should still be there
1819        assert_eq!(
1820            node_data.properties.get(&PropertyKey::new("existing")),
1821            Some(&Value::Int64(42))
1822        );
1823        // New property should also exist
1824        assert_eq!(
1825            node_data.properties.get(&PropertyKey::new("added")),
1826            Some(&Value::String("hello".into()))
1827        );
1828    }
1829
1830    // ── PropertySource::PropertyAccess ──────────────────────────
1831
1832    #[test]
1833    fn test_property_source_property_access() {
1834        let store = create_test_store();
1835
1836        let source_node = store.create_node(&["Source"]);
1837        store.set_node_property(source_node, "name", Value::String("Alix".into()));
1838
1839        let target_node = store.create_node(&["Target"]);
1840
1841        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
1842        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1843        builder.column_mut(0).unwrap().push_node_id(source_node);
1844        builder
1845            .column_mut(1)
1846            .unwrap()
1847            .push_int64(target_node.0 as i64);
1848        builder.advance_row();
1849
1850        let mut op = SetPropertyOperator::new_for_node(
1851            Arc::clone(&store),
1852            MockInput::boxed(builder.finish()),
1853            1, // entity column = target node
1854            vec![(
1855                "copied_name".to_string(),
1856                PropertySource::PropertyAccess {
1857                    column: 0,
1858                    property: "name".to_string(),
1859                },
1860            )],
1861            vec![LogicalType::Node, LogicalType::Int64],
1862        );
1863
1864        op.next().unwrap().unwrap();
1865
1866        let target_data = store.get_node(target_node).unwrap();
1867        assert_eq!(
1868            target_data.properties.get(&PropertyKey::new("copied_name")),
1869            Some(&Value::String("Alix".into()))
1870        );
1871    }
1872
1873    // ── ConstraintValidator integration ─────────────────────────
1874
1875    #[test]
1876    fn test_create_node_with_constraint_validator() {
1877        let store = create_test_store();
1878
1879        struct RejectAgeValidator;
1880        impl ConstraintValidator for RejectAgeValidator {
1881            fn validate_node_property(
1882                &self,
1883                _labels: &[String],
1884                key: &str,
1885                _value: &Value,
1886            ) -> Result<(), OperatorError> {
1887                if key == "forbidden" {
1888                    return Err(OperatorError::ConstraintViolation(
1889                        "property 'forbidden' is not allowed".to_string(),
1890                    ));
1891                }
1892                Ok(())
1893            }
1894            fn validate_node_complete(
1895                &self,
1896                _labels: &[String],
1897                _properties: &[(String, Value)],
1898            ) -> Result<(), OperatorError> {
1899                Ok(())
1900            }
1901            fn check_unique_node_property(
1902                &self,
1903                _labels: &[String],
1904                _key: &str,
1905                _value: &Value,
1906            ) -> Result<(), OperatorError> {
1907                Ok(())
1908            }
1909            fn validate_edge_property(
1910                &self,
1911                _edge_type: &str,
1912                _key: &str,
1913                _value: &Value,
1914            ) -> Result<(), OperatorError> {
1915                Ok(())
1916            }
1917            fn validate_edge_complete(
1918                &self,
1919                _edge_type: &str,
1920                _properties: &[(String, Value)],
1921            ) -> Result<(), OperatorError> {
1922                Ok(())
1923            }
1924        }
1925
1926        // Valid property should succeed
1927        let mut op = CreateNodeOperator::new(
1928            Arc::clone(&store),
1929            None,
1930            vec!["Thing".to_string()],
1931            vec![(
1932                "name".to_string(),
1933                PropertySource::Constant(Value::String("ok".into())),
1934            )],
1935            vec![LogicalType::Int64],
1936            0,
1937        )
1938        .with_validator(Arc::new(RejectAgeValidator));
1939
1940        assert!(op.next().is_ok());
1941        assert_eq!(store.node_count(), 1);
1942
1943        // Forbidden property should fail
1944        let mut op = CreateNodeOperator::new(
1945            Arc::clone(&store),
1946            None,
1947            vec!["Thing".to_string()],
1948            vec![(
1949                "forbidden".to_string(),
1950                PropertySource::Constant(Value::Int64(1)),
1951            )],
1952            vec![LogicalType::Int64],
1953            0,
1954        )
1955        .with_validator(Arc::new(RejectAgeValidator));
1956
1957        let err = op.next().unwrap_err();
1958        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
1959        // Node count should still be 2 (the node is created before validation, but the error
1960        // propagates - this tests the validation logic fires)
1961    }
1962
1963    // ── Reset behavior ──────────────────────────────────────────
1964
1965    #[test]
1966    fn test_create_node_reset_allows_re_execution() {
1967        let store = create_test_store();
1968
1969        let mut op = CreateNodeOperator::new(
1970            Arc::clone(&store),
1971            None,
1972            vec!["Person".to_string()],
1973            vec![],
1974            vec![LogicalType::Int64],
1975            0,
1976        );
1977
1978        // First execution
1979        assert!(op.next().unwrap().is_some());
1980        assert!(op.next().unwrap().is_none());
1981
1982        // Reset and re-execute
1983        op.reset();
1984        assert!(op.next().unwrap().is_some());
1985
1986        assert_eq!(store.node_count(), 2);
1987    }
1988
1989    // ── Operator name() ──────────────────────────────────────────
1990
1991    #[test]
1992    fn test_operator_names() {
1993        let store = create_test_store();
1994
1995        let op = CreateNodeOperator::new(
1996            Arc::clone(&store),
1997            None,
1998            vec![],
1999            vec![],
2000            vec![LogicalType::Int64],
2001            0,
2002        );
2003        assert_eq!(op.name(), "CreateNode");
2004
2005        let op = CreateEdgeOperator::new(
2006            Arc::clone(&store),
2007            Box::new(EmptyInput),
2008            0,
2009            1,
2010            "R".to_string(),
2011            vec![LogicalType::Int64],
2012        );
2013        assert_eq!(op.name(), "CreateEdge");
2014
2015        let op = DeleteNodeOperator::new(
2016            Arc::clone(&store),
2017            Box::new(EmptyInput),
2018            0,
2019            vec![LogicalType::Int64],
2020            false,
2021        );
2022        assert_eq!(op.name(), "DeleteNode");
2023
2024        let op = DeleteEdgeOperator::new(
2025            Arc::clone(&store),
2026            Box::new(EmptyInput),
2027            0,
2028            vec![LogicalType::Int64],
2029        );
2030        assert_eq!(op.name(), "DeleteEdge");
2031
2032        let op = AddLabelOperator::new(
2033            Arc::clone(&store),
2034            Box::new(EmptyInput),
2035            0,
2036            vec!["L".to_string()],
2037            vec![LogicalType::Int64],
2038        );
2039        assert_eq!(op.name(), "AddLabel");
2040
2041        let op = RemoveLabelOperator::new(
2042            Arc::clone(&store),
2043            Box::new(EmptyInput),
2044            0,
2045            vec!["L".to_string()],
2046            vec![LogicalType::Int64],
2047        );
2048        assert_eq!(op.name(), "RemoveLabel");
2049
2050        let op = SetPropertyOperator::new_for_node(
2051            Arc::clone(&store),
2052            Box::new(EmptyInput),
2053            0,
2054            vec![],
2055            vec![LogicalType::Int64],
2056        );
2057        assert_eq!(op.name(), "SetProperty");
2058    }
2059}