Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{
12    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19/// Trait for validating schema constraints during mutation operations.
20///
21/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
22/// before data is written to the store.
23pub trait ConstraintValidator: Send + Sync {
24    /// Validates a single property value for a node with the given labels.
25    ///
26    /// Checks type compatibility and NOT NULL constraints.
27    ///
28    /// # Errors
29    ///
30    /// Returns `Err` if the value type is incompatible or a NOT NULL constraint is violated.
31    fn validate_node_property(
32        &self,
33        labels: &[String],
34        key: &str,
35        value: &Value,
36    ) -> Result<(), OperatorError>;
37
38    /// Validates that all required properties are present after creating a node.
39    ///
40    /// Checks NOT NULL constraints for properties that were not explicitly set.
41    ///
42    /// # Errors
43    ///
44    /// Returns `Err` if a required (NOT NULL) property is missing.
45    fn validate_node_complete(
46        &self,
47        labels: &[String],
48        properties: &[(String, Value)],
49    ) -> Result<(), OperatorError>;
50
51    /// Checks UNIQUE constraint for a node property value.
52    ///
53    /// # Errors
54    ///
55    /// Returns `Err` if a node with the same label already has this value.
56    fn check_unique_node_property(
57        &self,
58        labels: &[String],
59        key: &str,
60        value: &Value,
61    ) -> Result<(), OperatorError>;
62
63    /// Validates a single property value for an edge of the given type.
64    ///
65    /// # Errors
66    ///
67    /// Returns `Err` if the value type is incompatible with the edge schema.
68    fn validate_edge_property(
69        &self,
70        edge_type: &str,
71        key: &str,
72        value: &Value,
73    ) -> Result<(), OperatorError>;
74
75    /// Validates that all required properties are present after creating an edge.
76    ///
77    /// # Errors
78    ///
79    /// Returns `Err` if a required property is missing from the edge.
80    fn validate_edge_complete(
81        &self,
82        edge_type: &str,
83        properties: &[(String, Value)],
84    ) -> Result<(), OperatorError>;
85
86    /// Validates that the node labels are allowed by the bound graph type.
87    ///
88    /// # Errors
89    ///
90    /// Returns `Err` if any label is not defined in the graph type.
91    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92        let _ = labels;
93        Ok(())
94    }
95
96    /// Validates that the edge type is allowed by the bound graph type.
97    ///
98    /// # Errors
99    ///
100    /// Returns `Err` if the edge type is not defined in the graph type.
101    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102        let _ = edge_type;
103        Ok(())
104    }
105
106    /// Validates that edge endpoints have the correct node type labels.
107    ///
108    /// # Errors
109    ///
110    /// Returns `Err` if the source or target node labels do not match the edge type definition.
111    fn validate_edge_endpoints(
112        &self,
113        edge_type: &str,
114        source_labels: &[String],
115        target_labels: &[String],
116    ) -> Result<(), OperatorError> {
117        let _ = (edge_type, source_labels, target_labels);
118        Ok(())
119    }
120
121    /// Injects default values for properties that are defined in a type but
122    /// not explicitly provided.
123    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124        let _ = (labels, properties);
125    }
126}
127
128/// Operator that creates new nodes.
129///
130/// For each input row, creates a new node with the specified labels
131/// and properties, then outputs the row with the new node.
132pub struct CreateNodeOperator {
133    /// The graph store to modify.
134    store: Arc<dyn GraphStoreMut>,
135    /// Input operator.
136    input: Option<Box<dyn Operator>>,
137    /// Labels for the new nodes.
138    labels: Vec<String>,
139    /// Properties to set (name -> column index or constant value).
140    properties: Vec<(String, PropertySource)>,
141    /// Output schema.
142    output_schema: Vec<LogicalType>,
143    /// Column index for the created node variable.
144    output_column: usize,
145    /// Whether this operator has been executed (for no-input case).
146    executed: bool,
147    /// Epoch for MVCC versioning.
148    viewing_epoch: Option<EpochId>,
149    /// Transaction ID for MVCC versioning.
150    transaction_id: Option<TransactionId>,
151    /// Optional constraint validator for schema enforcement.
152    validator: Option<Arc<dyn ConstraintValidator>>,
153    /// Optional write tracker for conflict detection.
154    write_tracker: Option<SharedWriteTracker>,
155}
156
157/// Source for a property value.
158#[derive(Debug, Clone)]
159#[non_exhaustive]
160pub enum PropertySource {
161    /// Get value from an input column.
162    Column(usize),
163    /// Use a constant value.
164    Constant(Value),
165    /// Access a named property from a map/node/edge in an input column.
166    PropertyAccess {
167        /// The column containing the map, node ID, or edge ID.
168        column: usize,
169        /// The property name to extract.
170        property: String,
171    },
172}
173
174impl PropertySource {
175    /// Resolves a property value from a data chunk row.
176    pub fn resolve(
177        &self,
178        chunk: &crate::execution::chunk::DataChunk,
179        row: usize,
180        store: &dyn GraphStore,
181    ) -> Value {
182        match self {
183            PropertySource::Column(col_idx) => chunk
184                .column(*col_idx)
185                .and_then(|c| c.get_value(row))
186                .unwrap_or(Value::Null),
187            PropertySource::Constant(v) => v.clone(),
188            PropertySource::PropertyAccess { column, property } => {
189                let Some(col) = chunk.column(*column) else {
190                    return Value::Null;
191                };
192                // Try node ID first, then edge ID, then map value
193                if let Some(node_id) = col.get_node_id(row) {
194                    store
195                        .get_node(node_id)
196                        .and_then(|node| node.get_property(property).cloned())
197                        .unwrap_or(Value::Null)
198                } else if let Some(edge_id) = col.get_edge_id(row) {
199                    store
200                        .get_edge(edge_id)
201                        .and_then(|edge| edge.get_property(property).cloned())
202                        .unwrap_or(Value::Null)
203                } else if let Some(Value::Map(map)) = col.get_value(row) {
204                    let key = PropertyKey::new(property);
205                    map.get(&key).cloned().unwrap_or(Value::Null)
206                } else {
207                    Value::Null
208                }
209            }
210        }
211    }
212}
213
214impl CreateNodeOperator {
215    /// Creates a new node creation operator.
216    ///
217    /// # Arguments
218    /// * `store` - The graph store to modify.
219    /// * `input` - Optional input operator (None for standalone CREATE).
220    /// * `labels` - Labels to assign to created nodes.
221    /// * `properties` - Properties to set on created nodes.
222    /// * `output_schema` - Schema of the output.
223    /// * `output_column` - Column index where the created node ID goes.
224    pub fn new(
225        store: Arc<dyn GraphStoreMut>,
226        input: Option<Box<dyn Operator>>,
227        labels: Vec<String>,
228        properties: Vec<(String, PropertySource)>,
229        output_schema: Vec<LogicalType>,
230        output_column: usize,
231    ) -> Self {
232        Self {
233            store,
234            input,
235            labels,
236            properties,
237            output_schema,
238            output_column,
239            executed: false,
240            viewing_epoch: None,
241            transaction_id: None,
242            validator: None,
243            write_tracker: None,
244        }
245    }
246
247    /// Sets the transaction context for MVCC versioning.
248    pub fn with_transaction_context(
249        mut self,
250        epoch: EpochId,
251        transaction_id: Option<TransactionId>,
252    ) -> Self {
253        self.viewing_epoch = Some(epoch);
254        self.transaction_id = transaction_id;
255        self
256    }
257
258    /// Sets the constraint validator for schema enforcement.
259    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
260        self.validator = Some(validator);
261        self
262    }
263
264    /// Sets the write tracker for conflict detection.
265    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
266        self.write_tracker = Some(tracker);
267        self
268    }
269}
270
271impl CreateNodeOperator {
272    /// Validates and sets properties on a newly created node.
273    fn validate_and_set_properties(
274        &self,
275        node_id: NodeId,
276        resolved_props: &mut Vec<(String, Value)>,
277    ) -> Result<(), OperatorError> {
278        // Phase 0: Validate that node labels are allowed by the bound graph type
279        if let Some(ref validator) = self.validator {
280            validator.validate_node_labels_allowed(&self.labels)?;
281        }
282
283        // Phase 0.5: Inject defaults for properties not explicitly provided
284        if let Some(ref validator) = self.validator {
285            validator.inject_defaults(&self.labels, resolved_props);
286        }
287
288        // Phase 1: Validate each property value
289        if let Some(ref validator) = self.validator {
290            for (name, value) in resolved_props.iter() {
291                validator.validate_node_property(&self.labels, name, value)?;
292                validator.check_unique_node_property(&self.labels, name, value)?;
293            }
294            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
295            validator.validate_node_complete(&self.labels, resolved_props)?;
296        }
297
298        // Phase 3: Write properties to the store
299        if let Some(tid) = self.transaction_id {
300            for (name, value) in resolved_props.iter() {
301                self.store
302                    .set_node_property_versioned(node_id, name, value.clone(), tid);
303            }
304        } else {
305            for (name, value) in resolved_props.iter() {
306                self.store.set_node_property(node_id, name, value.clone());
307            }
308        }
309        Ok(())
310    }
311}
312
313impl Operator for CreateNodeOperator {
314    fn next(&mut self) -> OperatorResult {
315        // Get transaction context for versioned creation
316        let epoch = self
317            .viewing_epoch
318            .unwrap_or_else(|| self.store.current_epoch());
319        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
320
321        if let Some(ref mut input) = self.input {
322            // For each input row, create a node
323            if let Some(chunk) = input.next()? {
324                let mut builder =
325                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
326
327                for row in chunk.selected_indices() {
328                    // Resolve all property values first (before creating node)
329                    let mut resolved_props: Vec<(String, Value)> = self
330                        .properties
331                        .iter()
332                        .map(|(name, source)| {
333                            let value =
334                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
335                            (name.clone(), value)
336                        })
337                        .collect();
338
339                    // Create the node with MVCC versioning
340                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
341                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
342
343                    // Record write for conflict detection
344                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
345                        tracker.record_node_write(tid, node_id)?;
346                    }
347
348                    // Validate and set properties
349                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
350
351                    // Copy input columns to output
352                    for col_idx in 0..chunk.column_count() {
353                        if col_idx < self.output_column
354                            && let (Some(src), Some(dst)) =
355                                (chunk.column(col_idx), builder.column_mut(col_idx))
356                        {
357                            if let Some(val) = src.get_value(row) {
358                                dst.push_value(val);
359                            } else {
360                                dst.push_value(Value::Null);
361                            }
362                        }
363                    }
364
365                    // Add the new node ID
366                    if let Some(dst) = builder.column_mut(self.output_column) {
367                        // reason: entity IDs stored as i64, standard encoding
368                        #[allow(clippy::cast_possible_wrap)]
369                        // reason: entity IDs stored as i64, standard encoding
370                        #[allow(clippy::cast_possible_wrap)]
371                        dst.push_value(Value::Int64(node_id.0 as i64));
372                    }
373
374                    builder.advance_row();
375                }
376
377                return Ok(Some(builder.finish()));
378            }
379            Ok(None)
380        } else {
381            // No input - create a single node
382            if self.executed {
383                return Ok(None);
384            }
385            self.executed = true;
386
387            // Resolve constant properties
388            let mut resolved_props: Vec<(String, Value)> = self
389                .properties
390                .iter()
391                .filter_map(|(name, source)| {
392                    if let PropertySource::Constant(value) = source {
393                        Some((name.clone(), value.clone()))
394                    } else {
395                        None
396                    }
397                })
398                .collect();
399
400            // Create the node with MVCC versioning
401            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
402            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
403
404            // Record write for conflict detection
405            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
406                tracker.record_node_write(tid, node_id)?;
407            }
408
409            // Validate and set properties
410            self.validate_and_set_properties(node_id, &mut resolved_props)?;
411
412            // Build output chunk with just the node ID
413            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
414            if let Some(dst) = builder.column_mut(self.output_column) {
415                // reason: entity IDs stored as i64, standard encoding
416                #[allow(clippy::cast_possible_wrap)]
417                dst.push_value(Value::Int64(node_id.0 as i64));
418            }
419            builder.advance_row();
420
421            Ok(Some(builder.finish()))
422        }
423    }
424
425    fn reset(&mut self) {
426        if let Some(ref mut input) = self.input {
427            input.reset();
428        }
429        self.executed = false;
430    }
431
432    fn name(&self) -> &'static str {
433        "CreateNode"
434    }
435
436    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
437        self
438    }
439}
440
441/// Operator that creates new edges.
442pub struct CreateEdgeOperator {
443    /// The graph store to modify.
444    store: Arc<dyn GraphStoreMut>,
445    /// Input operator.
446    input: Box<dyn Operator>,
447    /// Column index for the source node.
448    from_column: usize,
449    /// Column index for the target node.
450    to_column: usize,
451    /// Edge type.
452    edge_type: String,
453    /// Properties to set.
454    properties: Vec<(String, PropertySource)>,
455    /// Output schema.
456    output_schema: Vec<LogicalType>,
457    /// Column index for the created edge variable (if any).
458    output_column: Option<usize>,
459    /// Epoch for MVCC versioning.
460    viewing_epoch: Option<EpochId>,
461    /// Transaction ID for MVCC versioning.
462    transaction_id: Option<TransactionId>,
463    /// Optional constraint validator for schema enforcement.
464    validator: Option<Arc<dyn ConstraintValidator>>,
465    /// Optional write tracker for conflict detection.
466    write_tracker: Option<SharedWriteTracker>,
467}
468
469impl CreateEdgeOperator {
470    /// Creates a new edge creation operator.
471    ///
472    /// Use builder methods to set additional options:
473    /// - [`with_properties`](Self::with_properties) - set edge properties
474    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
475    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
476    pub fn new(
477        store: Arc<dyn GraphStoreMut>,
478        input: Box<dyn Operator>,
479        from_column: usize,
480        to_column: usize,
481        edge_type: String,
482        output_schema: Vec<LogicalType>,
483    ) -> Self {
484        Self {
485            store,
486            input,
487            from_column,
488            to_column,
489            edge_type,
490            properties: Vec::new(),
491            output_schema,
492            output_column: None,
493            viewing_epoch: None,
494            transaction_id: None,
495            validator: None,
496            write_tracker: None,
497        }
498    }
499
500    /// Sets the properties to assign to created edges.
501    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
502        self.properties = properties;
503        self
504    }
505
506    /// Sets the output column for the created edge ID.
507    pub fn with_output_column(mut self, column: usize) -> Self {
508        self.output_column = Some(column);
509        self
510    }
511
512    /// Sets the transaction context for MVCC versioning.
513    pub fn with_transaction_context(
514        mut self,
515        epoch: EpochId,
516        transaction_id: Option<TransactionId>,
517    ) -> Self {
518        self.viewing_epoch = Some(epoch);
519        self.transaction_id = transaction_id;
520        self
521    }
522
523    /// Sets the constraint validator for schema enforcement.
524    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
525        self.validator = Some(validator);
526        self
527    }
528
529    /// Sets the write tracker for conflict detection.
530    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
531        self.write_tracker = Some(tracker);
532        self
533    }
534}
535
536impl Operator for CreateEdgeOperator {
537    fn next(&mut self) -> OperatorResult {
538        // Get transaction context for versioned creation
539        let epoch = self
540            .viewing_epoch
541            .unwrap_or_else(|| self.store.current_epoch());
542        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
543
544        if let Some(chunk) = self.input.next()? {
545            let mut builder =
546                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
547
548            for row in chunk.selected_indices() {
549                // Get source and target node IDs
550                let from_id = chunk
551                    .column(self.from_column)
552                    .and_then(|c| c.get_value(row))
553                    .ok_or_else(|| {
554                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
555                    })?;
556
557                let to_id = chunk
558                    .column(self.to_column)
559                    .and_then(|c| c.get_value(row))
560                    .ok_or_else(|| {
561                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
562                    })?;
563
564                // Extract node IDs
565                let from_node_id = match from_id {
566                    // reason: ID encoding: i64 <-> u64 round-trip
567                    #[allow(clippy::cast_sign_loss)]
568                    Value::Int64(id) => NodeId(id as u64),
569                    _ => {
570                        return Err(OperatorError::TypeMismatch {
571                            expected: "Int64 (node ID)".to_string(),
572                            found: format!("{from_id:?}"),
573                        });
574                    }
575                };
576
577                let to_node_id = match to_id {
578                    // reason: ID encoding: i64 <-> u64 round-trip
579                    #[allow(clippy::cast_sign_loss)]
580                    Value::Int64(id) => NodeId(id as u64),
581                    _ => {
582                        return Err(OperatorError::TypeMismatch {
583                            expected: "Int64 (node ID)".to_string(),
584                            found: format!("{to_id:?}"),
585                        });
586                    }
587                };
588
589                // Validate graph type and edge endpoint constraints
590                if let Some(ref validator) = self.validator {
591                    validator.validate_edge_type_allowed(&self.edge_type)?;
592
593                    // Look up source and target node labels for endpoint validation
594                    let source_labels: Vec<String> = self
595                        .store
596                        .get_node(from_node_id)
597                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
598                        .unwrap_or_default();
599                    let target_labels: Vec<String> = self
600                        .store
601                        .get_node(to_node_id)
602                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
603                        .unwrap_or_default();
604                    validator.validate_edge_endpoints(
605                        &self.edge_type,
606                        &source_labels,
607                        &target_labels,
608                    )?;
609                }
610
611                // Resolve property values
612                let resolved_props: Vec<(String, Value)> = self
613                    .properties
614                    .iter()
615                    .map(|(name, source)| {
616                        let value =
617                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
618                        (name.clone(), value)
619                    })
620                    .collect();
621
622                // Validate constraints before writing
623                if let Some(ref validator) = self.validator {
624                    for (name, value) in &resolved_props {
625                        validator.validate_edge_property(&self.edge_type, name, value)?;
626                    }
627                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
628                }
629
630                // Create the edge with MVCC versioning
631                let edge_id = self.store.create_edge_versioned(
632                    from_node_id,
633                    to_node_id,
634                    &self.edge_type,
635                    epoch,
636                    tx,
637                );
638
639                // Record write for conflict detection
640                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
641                    tracker.record_edge_write(tid, edge_id)?;
642                }
643
644                // Set properties
645                if let Some(tid) = self.transaction_id {
646                    for (name, value) in resolved_props {
647                        self.store
648                            .set_edge_property_versioned(edge_id, &name, value, tid);
649                    }
650                } else {
651                    for (name, value) in resolved_props {
652                        self.store.set_edge_property(edge_id, &name, value);
653                    }
654                }
655
656                // Copy input columns
657                for col_idx in 0..chunk.column_count() {
658                    if let (Some(src), Some(dst)) =
659                        (chunk.column(col_idx), builder.column_mut(col_idx))
660                    {
661                        if let Some(val) = src.get_value(row) {
662                            dst.push_value(val);
663                        } else {
664                            dst.push_value(Value::Null);
665                        }
666                    }
667                }
668
669                // Add edge ID if requested
670                if let Some(out_col) = self.output_column
671                    && let Some(dst) = builder.column_mut(out_col)
672                {
673                    // reason: entity IDs stored as i64, standard encoding
674                    #[allow(clippy::cast_possible_wrap)]
675                    dst.push_value(Value::Int64(edge_id.0 as i64));
676                }
677
678                builder.advance_row();
679            }
680
681            return Ok(Some(builder.finish()));
682        }
683        Ok(None)
684    }
685
686    fn reset(&mut self) {
687        self.input.reset();
688    }
689
690    fn name(&self) -> &'static str {
691        "CreateEdge"
692    }
693
694    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
695        self
696    }
697}
698
699/// Operator that deletes nodes.
700pub struct DeleteNodeOperator {
701    /// The graph store to modify.
702    store: Arc<dyn GraphStoreMut>,
703    /// Input operator.
704    input: Box<dyn Operator>,
705    /// Column index for the node to delete.
706    node_column: usize,
707    /// Output schema.
708    output_schema: Vec<LogicalType>,
709    /// Whether to detach (delete connected edges) before deleting.
710    detach: bool,
711    /// Epoch for MVCC versioning.
712    viewing_epoch: Option<EpochId>,
713    /// Transaction ID for MVCC versioning.
714    transaction_id: Option<TransactionId>,
715    /// Optional write tracker for conflict detection.
716    write_tracker: Option<SharedWriteTracker>,
717}
718
719impl DeleteNodeOperator {
720    /// Creates a new node deletion operator.
721    pub fn new(
722        store: Arc<dyn GraphStoreMut>,
723        input: Box<dyn Operator>,
724        node_column: usize,
725        output_schema: Vec<LogicalType>,
726        detach: bool,
727    ) -> Self {
728        Self {
729            store,
730            input,
731            node_column,
732            output_schema,
733            detach,
734            viewing_epoch: None,
735            transaction_id: None,
736            write_tracker: None,
737        }
738    }
739
740    /// Sets the transaction context for MVCC versioning.
741    pub fn with_transaction_context(
742        mut self,
743        epoch: EpochId,
744        transaction_id: Option<TransactionId>,
745    ) -> Self {
746        self.viewing_epoch = Some(epoch);
747        self.transaction_id = transaction_id;
748        self
749    }
750
751    /// Sets the write tracker for conflict detection.
752    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
753        self.write_tracker = Some(tracker);
754        self
755    }
756}
757
758impl Operator for DeleteNodeOperator {
759    fn next(&mut self) -> OperatorResult {
760        // Get transaction context for versioned deletion
761        let epoch = self
762            .viewing_epoch
763            .unwrap_or_else(|| self.store.current_epoch());
764        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
765
766        if let Some(chunk) = self.input.next()? {
767            let mut builder =
768                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
769
770            for row in chunk.selected_indices() {
771                let node_val = chunk
772                    .column(self.node_column)
773                    .and_then(|c| c.get_value(row))
774                    .ok_or_else(|| {
775                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
776                    })?;
777
778                let node_id = match node_val {
779                    // reason: ID encoding: i64 <-> u64 round-trip
780                    #[allow(clippy::cast_sign_loss)]
781                    Value::Int64(id) => NodeId(id as u64),
782                    _ => {
783                        return Err(OperatorError::TypeMismatch {
784                            expected: "Int64 (node ID)".to_string(),
785                            found: format!("{node_val:?}"),
786                        });
787                    }
788                };
789
790                if self.detach {
791                    // Delete all connected edges first, using versioned deletion
792                    // so rollback can restore them
793                    let outgoing = self
794                        .store
795                        .edges_from(node_id, crate::graph::Direction::Outgoing);
796                    let incoming = self
797                        .store
798                        .edges_from(node_id, crate::graph::Direction::Incoming);
799                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
800                        self.store.delete_edge_versioned(edge_id, epoch, tx);
801                        if let (Some(tracker), Some(tid)) =
802                            (&self.write_tracker, self.transaction_id)
803                        {
804                            tracker.record_edge_write(tid, edge_id)?;
805                        }
806                    }
807                } else {
808                    // NODETACH: check that node has no connected edges
809                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
810                    if degree > 0 {
811                        return Err(OperatorError::ConstraintViolation(format!(
812                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
813                            degree
814                        )));
815                    }
816                }
817
818                // Delete the node with MVCC versioning
819                self.store.delete_node_versioned(node_id, epoch, tx);
820
821                // Record write for conflict detection
822                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
823                    tracker.record_node_write(tid, node_id)?;
824                }
825
826                // Pass through all input columns so downstream RETURN can
827                // reference the variable (e.g., count(n) after DELETE n).
828                for col_idx in 0..chunk.column_count() {
829                    if let (Some(src), Some(dst)) =
830                        (chunk.column(col_idx), builder.column_mut(col_idx))
831                    {
832                        if let Some(val) = src.get_value(row) {
833                            dst.push_value(val);
834                        } else {
835                            dst.push_value(Value::Null);
836                        }
837                    }
838                }
839                builder.advance_row();
840            }
841
842            return Ok(Some(builder.finish()));
843        }
844        Ok(None)
845    }
846
847    fn reset(&mut self) {
848        self.input.reset();
849    }
850
851    fn name(&self) -> &'static str {
852        "DeleteNode"
853    }
854
855    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
856        self
857    }
858}
859
860/// Operator that deletes edges.
861pub struct DeleteEdgeOperator {
862    /// The graph store to modify.
863    store: Arc<dyn GraphStoreMut>,
864    /// Input operator.
865    input: Box<dyn Operator>,
866    /// Column index for the edge to delete.
867    edge_column: usize,
868    /// Output schema.
869    output_schema: Vec<LogicalType>,
870    /// Epoch for MVCC versioning.
871    viewing_epoch: Option<EpochId>,
872    /// Transaction ID for MVCC versioning.
873    transaction_id: Option<TransactionId>,
874    /// Optional write tracker for conflict detection.
875    write_tracker: Option<SharedWriteTracker>,
876}
877
878impl DeleteEdgeOperator {
879    /// Creates a new edge deletion operator.
880    pub fn new(
881        store: Arc<dyn GraphStoreMut>,
882        input: Box<dyn Operator>,
883        edge_column: usize,
884        output_schema: Vec<LogicalType>,
885    ) -> Self {
886        Self {
887            store,
888            input,
889            edge_column,
890            output_schema,
891            viewing_epoch: None,
892            transaction_id: None,
893            write_tracker: None,
894        }
895    }
896
897    /// Sets the transaction context for MVCC versioning.
898    pub fn with_transaction_context(
899        mut self,
900        epoch: EpochId,
901        transaction_id: Option<TransactionId>,
902    ) -> Self {
903        self.viewing_epoch = Some(epoch);
904        self.transaction_id = transaction_id;
905        self
906    }
907
908    /// Sets the write tracker for conflict detection.
909    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
910        self.write_tracker = Some(tracker);
911        self
912    }
913}
914
915impl Operator for DeleteEdgeOperator {
916    fn next(&mut self) -> OperatorResult {
917        // Get transaction context for versioned deletion
918        let epoch = self
919            .viewing_epoch
920            .unwrap_or_else(|| self.store.current_epoch());
921        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
922
923        if let Some(chunk) = self.input.next()? {
924            let mut builder =
925                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
926
927            for row in chunk.selected_indices() {
928                let edge_val = chunk
929                    .column(self.edge_column)
930                    .and_then(|c| c.get_value(row))
931                    .ok_or_else(|| {
932                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
933                    })?;
934
935                let edge_id = match edge_val {
936                    // reason: ID encoding: i64 <-> u64 round-trip
937                    #[allow(clippy::cast_sign_loss)]
938                    Value::Int64(id) => EdgeId(id as u64),
939                    _ => {
940                        return Err(OperatorError::TypeMismatch {
941                            expected: "Int64 (edge ID)".to_string(),
942                            found: format!("{edge_val:?}"),
943                        });
944                    }
945                };
946
947                // Delete the edge with MVCC versioning
948                self.store.delete_edge_versioned(edge_id, epoch, tx);
949
950                // Record write for conflict detection
951                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
952                    tracker.record_edge_write(tid, edge_id)?;
953                }
954
955                // Pass through all input columns
956                for col_idx in 0..chunk.column_count() {
957                    if let (Some(src), Some(dst)) =
958                        (chunk.column(col_idx), builder.column_mut(col_idx))
959                    {
960                        if let Some(val) = src.get_value(row) {
961                            dst.push_value(val);
962                        } else {
963                            dst.push_value(Value::Null);
964                        }
965                    }
966                }
967                builder.advance_row();
968            }
969
970            return Ok(Some(builder.finish()));
971        }
972        Ok(None)
973    }
974
975    fn reset(&mut self) {
976        self.input.reset();
977    }
978
979    fn name(&self) -> &'static str {
980        "DeleteEdge"
981    }
982
983    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
984        self
985    }
986}
987
988/// Operator that adds labels to nodes.
989pub struct AddLabelOperator {
990    /// The graph store.
991    store: Arc<dyn GraphStoreMut>,
992    /// Child operator providing nodes.
993    input: Box<dyn Operator>,
994    /// Column index containing node IDs.
995    node_column: usize,
996    /// Labels to add.
997    labels: Vec<String>,
998    /// Output schema.
999    output_schema: Vec<LogicalType>,
1000    /// Column index for the update count (last column).
1001    count_column: usize,
1002    /// Epoch for MVCC versioning.
1003    viewing_epoch: Option<EpochId>,
1004    /// Transaction ID for undo log tracking.
1005    transaction_id: Option<TransactionId>,
1006    /// Optional write tracker for conflict detection.
1007    write_tracker: Option<SharedWriteTracker>,
1008}
1009
1010impl AddLabelOperator {
1011    /// Creates a new add label operator.
1012    pub fn new(
1013        store: Arc<dyn GraphStoreMut>,
1014        input: Box<dyn Operator>,
1015        node_column: usize,
1016        labels: Vec<String>,
1017        output_schema: Vec<LogicalType>,
1018    ) -> Self {
1019        let count_column = output_schema.len() - 1;
1020        Self {
1021            store,
1022            input,
1023            node_column,
1024            labels,
1025            count_column,
1026            output_schema,
1027            viewing_epoch: None,
1028            transaction_id: None,
1029            write_tracker: None,
1030        }
1031    }
1032
1033    /// Sets the transaction context for versioned label mutations.
1034    pub fn with_transaction_context(
1035        mut self,
1036        epoch: EpochId,
1037        transaction_id: Option<TransactionId>,
1038    ) -> Self {
1039        self.viewing_epoch = Some(epoch);
1040        self.transaction_id = transaction_id;
1041        self
1042    }
1043
1044    /// Sets the write tracker for conflict detection.
1045    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1046        self.write_tracker = Some(tracker);
1047        self
1048    }
1049}
1050
1051impl Operator for AddLabelOperator {
1052    fn next(&mut self) -> OperatorResult {
1053        if let Some(chunk) = self.input.next()? {
1054            let mut builder =
1055                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1056
1057            for row in chunk.selected_indices() {
1058                let node_val = chunk
1059                    .column(self.node_column)
1060                    .and_then(|c| c.get_value(row))
1061                    .ok_or_else(|| {
1062                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1063                    })?;
1064
1065                let node_id = match node_val {
1066                    // reason: ID encoding: i64 <-> u64 round-trip
1067                    #[allow(clippy::cast_sign_loss)]
1068                    Value::Int64(id) => NodeId(id as u64),
1069                    _ => {
1070                        return Err(OperatorError::TypeMismatch {
1071                            expected: "Int64 (node ID)".to_string(),
1072                            found: format!("{node_val:?}"),
1073                        });
1074                    }
1075                };
1076
1077                // Record write for conflict detection
1078                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1079                    tracker.record_node_write(tid, node_id)?;
1080                }
1081
1082                // Add all labels
1083                let mut row_count: i64 = 0;
1084                for label in &self.labels {
1085                    let added = if let Some(tid) = self.transaction_id {
1086                        self.store.add_label_versioned(node_id, label, tid)
1087                    } else {
1088                        self.store.add_label(node_id, label)
1089                    };
1090                    if added {
1091                        row_count += 1;
1092                    }
1093                }
1094
1095                // Copy input columns to output (pass-through)
1096                for col_idx in 0..chunk.column_count() {
1097                    if let (Some(src), Some(dst)) =
1098                        (chunk.column(col_idx), builder.column_mut(col_idx))
1099                    {
1100                        if let Some(val) = src.get_value(row) {
1101                            dst.push_value(val);
1102                        } else {
1103                            dst.push_value(Value::Null);
1104                        }
1105                    }
1106                }
1107                // Append the update count column
1108                if let Some(dst) = builder.column_mut(self.count_column) {
1109                    dst.push_value(Value::Int64(row_count));
1110                }
1111
1112                builder.advance_row();
1113            }
1114
1115            return Ok(Some(builder.finish()));
1116        }
1117        Ok(None)
1118    }
1119
1120    fn reset(&mut self) {
1121        self.input.reset();
1122    }
1123
1124    fn name(&self) -> &'static str {
1125        "AddLabel"
1126    }
1127
1128    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1129        self
1130    }
1131}
1132
1133/// Operator that removes labels from nodes.
1134pub struct RemoveLabelOperator {
1135    /// The graph store.
1136    store: Arc<dyn GraphStoreMut>,
1137    /// Child operator providing nodes.
1138    input: Box<dyn Operator>,
1139    /// Column index containing node IDs.
1140    node_column: usize,
1141    /// Labels to remove.
1142    labels: Vec<String>,
1143    /// Output schema.
1144    output_schema: Vec<LogicalType>,
1145    /// Column index for the update count (last column).
1146    count_column: usize,
1147    /// Epoch for MVCC versioning.
1148    viewing_epoch: Option<EpochId>,
1149    /// Transaction ID for undo log tracking.
1150    transaction_id: Option<TransactionId>,
1151    /// Optional write tracker for conflict detection.
1152    write_tracker: Option<SharedWriteTracker>,
1153}
1154
1155impl RemoveLabelOperator {
1156    /// Creates a new remove label operator.
1157    pub fn new(
1158        store: Arc<dyn GraphStoreMut>,
1159        input: Box<dyn Operator>,
1160        node_column: usize,
1161        labels: Vec<String>,
1162        output_schema: Vec<LogicalType>,
1163    ) -> Self {
1164        let count_column = output_schema.len() - 1;
1165        Self {
1166            store,
1167            input,
1168            node_column,
1169            labels,
1170            count_column,
1171            output_schema,
1172            viewing_epoch: None,
1173            transaction_id: None,
1174            write_tracker: None,
1175        }
1176    }
1177
1178    /// Sets the transaction context for versioned label mutations.
1179    pub fn with_transaction_context(
1180        mut self,
1181        epoch: EpochId,
1182        transaction_id: Option<TransactionId>,
1183    ) -> Self {
1184        self.viewing_epoch = Some(epoch);
1185        self.transaction_id = transaction_id;
1186        self
1187    }
1188
1189    /// Sets the write tracker for conflict detection.
1190    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1191        self.write_tracker = Some(tracker);
1192        self
1193    }
1194}
1195
1196impl Operator for RemoveLabelOperator {
1197    fn next(&mut self) -> OperatorResult {
1198        if let Some(chunk) = self.input.next()? {
1199            let mut builder =
1200                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1201
1202            for row in chunk.selected_indices() {
1203                let node_val = chunk
1204                    .column(self.node_column)
1205                    .and_then(|c| c.get_value(row))
1206                    .ok_or_else(|| {
1207                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1208                    })?;
1209
1210                let node_id = match node_val {
1211                    // reason: ID encoding: i64 <-> u64 round-trip
1212                    #[allow(clippy::cast_sign_loss)]
1213                    Value::Int64(id) => NodeId(id as u64),
1214                    _ => {
1215                        return Err(OperatorError::TypeMismatch {
1216                            expected: "Int64 (node ID)".to_string(),
1217                            found: format!("{node_val:?}"),
1218                        });
1219                    }
1220                };
1221
1222                // Record write for conflict detection
1223                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1224                    tracker.record_node_write(tid, node_id)?;
1225                }
1226
1227                // Remove all labels
1228                let mut row_count: i64 = 0;
1229                for label in &self.labels {
1230                    let removed = if let Some(tid) = self.transaction_id {
1231                        self.store.remove_label_versioned(node_id, label, tid)
1232                    } else {
1233                        self.store.remove_label(node_id, label)
1234                    };
1235                    if removed {
1236                        row_count += 1;
1237                    }
1238                }
1239
1240                // Copy input columns to output (pass-through)
1241                for col_idx in 0..chunk.column_count() {
1242                    if let (Some(src), Some(dst)) =
1243                        (chunk.column(col_idx), builder.column_mut(col_idx))
1244                    {
1245                        if let Some(val) = src.get_value(row) {
1246                            dst.push_value(val);
1247                        } else {
1248                            dst.push_value(Value::Null);
1249                        }
1250                    }
1251                }
1252                // Append the update count column
1253                if let Some(dst) = builder.column_mut(self.count_column) {
1254                    dst.push_value(Value::Int64(row_count));
1255                }
1256
1257                builder.advance_row();
1258            }
1259
1260            return Ok(Some(builder.finish()));
1261        }
1262        Ok(None)
1263    }
1264
1265    fn reset(&mut self) {
1266        self.input.reset();
1267    }
1268
1269    fn name(&self) -> &'static str {
1270        "RemoveLabel"
1271    }
1272
1273    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1274        self
1275    }
1276}
1277
1278/// Operator that sets properties on nodes or edges.
1279///
1280/// This operator reads node/edge IDs from a column and sets the
1281/// specified properties on each entity.
1282pub struct SetPropertyOperator {
1283    /// The graph store.
1284    store: Arc<dyn GraphStoreMut>,
1285    /// Child operator providing entities.
1286    input: Box<dyn Operator>,
1287    /// Column index containing entity IDs (node or edge).
1288    entity_column: usize,
1289    /// Whether the entity is an edge (false = node).
1290    is_edge: bool,
1291    /// Properties to set (name -> source).
1292    properties: Vec<(String, PropertySource)>,
1293    /// Output schema.
1294    output_schema: Vec<LogicalType>,
1295    /// Whether to replace all properties (true) or merge (false) for map assignments.
1296    replace: bool,
1297    /// Optional constraint validator for schema enforcement.
1298    validator: Option<Arc<dyn ConstraintValidator>>,
1299    /// Entity labels (for node constraint validation).
1300    labels: Vec<String>,
1301    /// Edge type (for edge constraint validation).
1302    edge_type_name: Option<String>,
1303    /// Epoch for MVCC versioning.
1304    viewing_epoch: Option<EpochId>,
1305    /// Transaction ID for undo log tracking.
1306    transaction_id: Option<TransactionId>,
1307    /// Optional write tracker for conflict detection.
1308    write_tracker: Option<SharedWriteTracker>,
1309}
1310
1311impl SetPropertyOperator {
1312    /// Creates a new set property operator for nodes.
1313    pub fn new_for_node(
1314        store: Arc<dyn GraphStoreMut>,
1315        input: Box<dyn Operator>,
1316        node_column: usize,
1317        properties: Vec<(String, PropertySource)>,
1318        output_schema: Vec<LogicalType>,
1319    ) -> Self {
1320        Self {
1321            store,
1322            input,
1323            entity_column: node_column,
1324            is_edge: false,
1325            properties,
1326            output_schema,
1327            replace: false,
1328            validator: None,
1329            labels: Vec::new(),
1330            edge_type_name: None,
1331            viewing_epoch: None,
1332            transaction_id: None,
1333            write_tracker: None,
1334        }
1335    }
1336
1337    /// Creates a new set property operator for edges.
1338    pub fn new_for_edge(
1339        store: Arc<dyn GraphStoreMut>,
1340        input: Box<dyn Operator>,
1341        edge_column: usize,
1342        properties: Vec<(String, PropertySource)>,
1343        output_schema: Vec<LogicalType>,
1344    ) -> Self {
1345        Self {
1346            store,
1347            input,
1348            entity_column: edge_column,
1349            is_edge: true,
1350            properties,
1351            output_schema,
1352            replace: false,
1353            validator: None,
1354            labels: Vec::new(),
1355            edge_type_name: None,
1356            viewing_epoch: None,
1357            transaction_id: None,
1358            write_tracker: None,
1359        }
1360    }
1361
1362    /// Sets whether this operator replaces all properties (for map assignment).
1363    pub fn with_replace(mut self, replace: bool) -> Self {
1364        self.replace = replace;
1365        self
1366    }
1367
1368    /// Sets the constraint validator for schema enforcement.
1369    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1370        self.validator = Some(validator);
1371        self
1372    }
1373
1374    /// Sets the entity labels (for node constraint validation).
1375    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1376        self.labels = labels;
1377        self
1378    }
1379
1380    /// Sets the edge type name (for edge constraint validation).
1381    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1382        self.edge_type_name = Some(edge_type);
1383        self
1384    }
1385
1386    /// Sets the transaction context for versioned property mutations.
1387    ///
1388    /// When a transaction ID is provided, property changes are recorded in
1389    /// an undo log so they can be restored on rollback.
1390    pub fn with_transaction_context(
1391        mut self,
1392        epoch: EpochId,
1393        transaction_id: Option<TransactionId>,
1394    ) -> Self {
1395        self.viewing_epoch = Some(epoch);
1396        self.transaction_id = transaction_id;
1397        self
1398    }
1399
1400    /// Sets the write tracker for conflict detection.
1401    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1402        self.write_tracker = Some(tracker);
1403        self
1404    }
1405}
1406
1407impl Operator for SetPropertyOperator {
1408    fn next(&mut self) -> OperatorResult {
1409        if let Some(chunk) = self.input.next()? {
1410            let mut builder =
1411                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1412
1413            for row in chunk.selected_indices() {
1414                let entity_val = chunk
1415                    .column(self.entity_column)
1416                    .and_then(|c| c.get_value(row))
1417                    .ok_or_else(|| {
1418                        OperatorError::ColumnNotFound(format!(
1419                            "entity column {}",
1420                            self.entity_column
1421                        ))
1422                    })?;
1423
1424                let entity_id = match entity_val {
1425                    // reason: ID encoding: i64 <-> u64 round-trip
1426                    #[allow(clippy::cast_sign_loss)]
1427                    Value::Int64(id) => id as u64,
1428                    _ => {
1429                        return Err(OperatorError::TypeMismatch {
1430                            expected: "Int64 (entity ID)".to_string(),
1431                            found: format!("{entity_val:?}"),
1432                        });
1433                    }
1434                };
1435
1436                // Record write for conflict detection
1437                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1438                    if self.is_edge {
1439                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1440                    } else {
1441                        tracker.record_node_write(tid, NodeId(entity_id))?;
1442                    }
1443                }
1444
1445                // Resolve all property values
1446                let resolved_props: Vec<(String, Value)> = self
1447                    .properties
1448                    .iter()
1449                    .map(|(name, source)| {
1450                        let value =
1451                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1452                        (name.clone(), value)
1453                    })
1454                    .collect();
1455
1456                // Validate constraints before writing
1457                if let Some(ref validator) = self.validator {
1458                    if self.is_edge {
1459                        if let Some(ref et) = self.edge_type_name {
1460                            for (name, value) in &resolved_props {
1461                                validator.validate_edge_property(et, name, value)?;
1462                            }
1463                        }
1464                    } else {
1465                        for (name, value) in &resolved_props {
1466                            validator.validate_node_property(&self.labels, name, value)?;
1467                            validator.check_unique_node_property(&self.labels, name, value)?;
1468                        }
1469                    }
1470                }
1471
1472                // Write all properties (use versioned methods when inside a transaction)
1473                let tx_id = self.transaction_id;
1474                for (prop_name, value) in resolved_props {
1475                    if prop_name == "*" {
1476                        // Map assignment: value should be a Map
1477                        if let Value::Map(map) = value {
1478                            if self.replace {
1479                                // Replace: remove all existing properties first
1480                                if self.is_edge {
1481                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1482                                        let keys: Vec<String> = edge
1483                                            .properties
1484                                            .iter()
1485                                            .map(|(k, _)| k.as_str().to_string())
1486                                            .collect();
1487                                        for key in keys {
1488                                            if let Some(tid) = tx_id {
1489                                                self.store.remove_edge_property_versioned(
1490                                                    EdgeId(entity_id),
1491                                                    &key,
1492                                                    tid,
1493                                                );
1494                                            } else {
1495                                                self.store
1496                                                    .remove_edge_property(EdgeId(entity_id), &key);
1497                                            }
1498                                        }
1499                                    }
1500                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1501                                    let keys: Vec<String> = node
1502                                        .properties
1503                                        .iter()
1504                                        .map(|(k, _)| k.as_str().to_string())
1505                                        .collect();
1506                                    for key in keys {
1507                                        if let Some(tid) = tx_id {
1508                                            self.store.remove_node_property_versioned(
1509                                                NodeId(entity_id),
1510                                                &key,
1511                                                tid,
1512                                            );
1513                                        } else {
1514                                            self.store
1515                                                .remove_node_property(NodeId(entity_id), &key);
1516                                        }
1517                                    }
1518                                }
1519                            }
1520                            // Set each map entry (null values remove the property)
1521                            for (key, val) in map.iter() {
1522                                if val.is_null() {
1523                                    // Null in SET += removes the property (Cypher/GQL semantics)
1524                                    if self.is_edge {
1525                                        if let Some(tid) = tx_id {
1526                                            self.store.remove_edge_property_versioned(
1527                                                EdgeId(entity_id),
1528                                                key.as_str(),
1529                                                tid,
1530                                            );
1531                                        } else {
1532                                            self.store.remove_edge_property(
1533                                                EdgeId(entity_id),
1534                                                key.as_str(),
1535                                            );
1536                                        }
1537                                    } else if let Some(tid) = tx_id {
1538                                        self.store.remove_node_property_versioned(
1539                                            NodeId(entity_id),
1540                                            key.as_str(),
1541                                            tid,
1542                                        );
1543                                    } else {
1544                                        self.store
1545                                            .remove_node_property(NodeId(entity_id), key.as_str());
1546                                    }
1547                                } else if self.is_edge {
1548                                    if let Some(tid) = tx_id {
1549                                        self.store.set_edge_property_versioned(
1550                                            EdgeId(entity_id),
1551                                            key.as_str(),
1552                                            val.clone(),
1553                                            tid,
1554                                        );
1555                                    } else {
1556                                        self.store.set_edge_property(
1557                                            EdgeId(entity_id),
1558                                            key.as_str(),
1559                                            val.clone(),
1560                                        );
1561                                    }
1562                                } else if let Some(tid) = tx_id {
1563                                    self.store.set_node_property_versioned(
1564                                        NodeId(entity_id),
1565                                        key.as_str(),
1566                                        val.clone(),
1567                                        tid,
1568                                    );
1569                                } else {
1570                                    self.store.set_node_property(
1571                                        NodeId(entity_id),
1572                                        key.as_str(),
1573                                        val.clone(),
1574                                    );
1575                                }
1576                            }
1577                        }
1578                    } else if self.is_edge {
1579                        if let Some(tid) = tx_id {
1580                            self.store.set_edge_property_versioned(
1581                                EdgeId(entity_id),
1582                                &prop_name,
1583                                value,
1584                                tid,
1585                            );
1586                        } else {
1587                            self.store
1588                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1589                        }
1590                    } else if let Some(tid) = tx_id {
1591                        self.store.set_node_property_versioned(
1592                            NodeId(entity_id),
1593                            &prop_name,
1594                            value,
1595                            tid,
1596                        );
1597                    } else {
1598                        self.store
1599                            .set_node_property(NodeId(entity_id), &prop_name, value);
1600                    }
1601                }
1602
1603                // Copy input columns to output
1604                for col_idx in 0..chunk.column_count() {
1605                    if let (Some(src), Some(dst)) =
1606                        (chunk.column(col_idx), builder.column_mut(col_idx))
1607                    {
1608                        if let Some(val) = src.get_value(row) {
1609                            dst.push_value(val);
1610                        } else {
1611                            dst.push_value(Value::Null);
1612                        }
1613                    }
1614                }
1615
1616                builder.advance_row();
1617            }
1618
1619            return Ok(Some(builder.finish()));
1620        }
1621        Ok(None)
1622    }
1623
1624    fn reset(&mut self) {
1625        self.input.reset();
1626    }
1627
1628    fn name(&self) -> &'static str {
1629        "SetProperty"
1630    }
1631
1632    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1633        self
1634    }
1635}
1636
1637#[cfg(all(test, feature = "lpg"))]
1638mod tests {
1639    use super::*;
1640    use crate::execution::DataChunk;
1641    use crate::execution::chunk::DataChunkBuilder;
1642    use crate::graph::lpg::LpgStore;
1643
1644    // ── Helpers ────────────────────────────────────────────────────
1645
1646    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1647        Arc::new(LpgStore::new().unwrap())
1648    }
1649
1650    struct MockInput {
1651        chunk: Option<DataChunk>,
1652    }
1653
1654    impl MockInput {
1655        fn boxed(chunk: DataChunk) -> Box<Self> {
1656            Box::new(Self { chunk: Some(chunk) })
1657        }
1658    }
1659
1660    impl Operator for MockInput {
1661        fn next(&mut self) -> OperatorResult {
1662            Ok(self.chunk.take())
1663        }
1664        fn reset(&mut self) {}
1665        fn name(&self) -> &'static str {
1666            "MockInput"
1667        }
1668
1669        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1670            self
1671        }
1672    }
1673
1674    struct EmptyInput;
1675    impl Operator for EmptyInput {
1676        fn next(&mut self) -> OperatorResult {
1677            Ok(None)
1678        }
1679        fn reset(&mut self) {}
1680        fn name(&self) -> &'static str {
1681            "EmptyInput"
1682        }
1683
1684        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1685            self
1686        }
1687    }
1688
1689    // reason: test IDs are small sequential counters
1690    #[allow(clippy::cast_possible_wrap)]
1691    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1692        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1693        for id in ids {
1694            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1695            builder.advance_row();
1696        }
1697        builder.finish()
1698    }
1699
1700    // reason: test IDs are small sequential counters
1701    #[allow(clippy::cast_possible_wrap)]
1702    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1703        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1704        for id in ids {
1705            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1706            builder.advance_row();
1707        }
1708        builder.finish()
1709    }
1710
1711    // ── CreateNodeOperator ──────────────────────────────────────
1712
1713    #[test]
1714    fn test_create_node_standalone() {
1715        let store = create_test_store();
1716
1717        let mut op = CreateNodeOperator::new(
1718            Arc::clone(&store),
1719            None,
1720            vec!["Person".to_string()],
1721            vec![(
1722                "name".to_string(),
1723                PropertySource::Constant(Value::String("Alix".into())),
1724            )],
1725            vec![LogicalType::Int64],
1726            0,
1727        );
1728
1729        let chunk = op.next().unwrap().unwrap();
1730        assert_eq!(chunk.row_count(), 1);
1731
1732        // Second call should return None (standalone executes once)
1733        assert!(op.next().unwrap().is_none());
1734
1735        assert_eq!(store.node_count(), 1);
1736    }
1737
1738    #[test]
1739    // reason: test IDs are small sequential counters
1740    #[allow(clippy::cast_possible_wrap)]
1741    fn test_create_edge() {
1742        let store = create_test_store();
1743
1744        let node1 = store.create_node(&["Person"]);
1745        let node2 = store.create_node(&["Person"]);
1746
1747        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1748        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1749        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1750        builder.advance_row();
1751
1752        let mut op = CreateEdgeOperator::new(
1753            Arc::clone(&store),
1754            MockInput::boxed(builder.finish()),
1755            0,
1756            1,
1757            "KNOWS".to_string(),
1758            vec![LogicalType::Int64, LogicalType::Int64],
1759        );
1760
1761        let _chunk = op.next().unwrap().unwrap();
1762        assert_eq!(store.edge_count(), 1);
1763    }
1764
1765    #[test]
1766    fn test_delete_node() {
1767        let store = create_test_store();
1768
1769        let node_id = store.create_node(&["Person"]);
1770        assert_eq!(store.node_count(), 1);
1771
1772        let mut op = DeleteNodeOperator::new(
1773            Arc::clone(&store),
1774            MockInput::boxed(node_id_chunk(&[node_id])),
1775            0,
1776            vec![LogicalType::Node],
1777            false,
1778        );
1779
1780        let chunk = op.next().unwrap().unwrap();
1781        // Pass-through: output row contains the original node ID
1782        assert_eq!(chunk.row_count(), 1);
1783        assert_eq!(store.node_count(), 0);
1784    }
1785
1786    // ── DeleteEdgeOperator ───────────────────────────────────────
1787
1788    #[test]
1789    fn test_delete_edge() {
1790        let store = create_test_store();
1791
1792        let n1 = store.create_node(&["Person"]);
1793        let n2 = store.create_node(&["Person"]);
1794        let eid = store.create_edge(n1, n2, "KNOWS");
1795        assert_eq!(store.edge_count(), 1);
1796
1797        let mut op = DeleteEdgeOperator::new(
1798            Arc::clone(&store),
1799            MockInput::boxed(edge_id_chunk(&[eid])),
1800            0,
1801            vec![LogicalType::Node],
1802        );
1803
1804        let chunk = op.next().unwrap().unwrap();
1805        assert_eq!(chunk.row_count(), 1);
1806        assert_eq!(store.edge_count(), 0);
1807    }
1808
1809    #[test]
1810    fn test_delete_edge_no_input_returns_none() {
1811        let store = create_test_store();
1812
1813        let mut op = DeleteEdgeOperator::new(
1814            Arc::clone(&store),
1815            Box::new(EmptyInput),
1816            0,
1817            vec![LogicalType::Int64],
1818        );
1819
1820        assert!(op.next().unwrap().is_none());
1821    }
1822
1823    #[test]
1824    fn test_delete_multiple_edges() {
1825        let store = create_test_store();
1826
1827        let n1 = store.create_node(&["N"]);
1828        let n2 = store.create_node(&["N"]);
1829        let e1 = store.create_edge(n1, n2, "R");
1830        let e2 = store.create_edge(n2, n1, "S");
1831        assert_eq!(store.edge_count(), 2);
1832
1833        let mut op = DeleteEdgeOperator::new(
1834            Arc::clone(&store),
1835            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1836            0,
1837            vec![LogicalType::Node],
1838        );
1839
1840        let chunk = op.next().unwrap().unwrap();
1841        assert_eq!(chunk.row_count(), 2);
1842        assert_eq!(store.edge_count(), 0);
1843    }
1844
1845    // ── DeleteNodeOperator with DETACH ───────────────────────────
1846
1847    #[test]
1848    fn test_delete_node_detach() {
1849        let store = create_test_store();
1850
1851        let n1 = store.create_node(&["Person"]);
1852        let n2 = store.create_node(&["Person"]);
1853        store.create_edge(n1, n2, "KNOWS");
1854        store.create_edge(n2, n1, "FOLLOWS");
1855        assert_eq!(store.edge_count(), 2);
1856
1857        let mut op = DeleteNodeOperator::new(
1858            Arc::clone(&store),
1859            MockInput::boxed(node_id_chunk(&[n1])),
1860            0,
1861            vec![LogicalType::Node],
1862            true, // detach = true
1863        );
1864
1865        let chunk = op.next().unwrap().unwrap();
1866        assert_eq!(chunk.row_count(), 1);
1867        assert_eq!(store.node_count(), 1);
1868        assert_eq!(store.edge_count(), 0); // edges detached
1869    }
1870
1871    // ── AddLabelOperator ─────────────────────────────────────────
1872
1873    #[test]
1874    fn test_add_label() {
1875        let store = create_test_store();
1876
1877        let node = store.create_node(&["Person"]);
1878
1879        let mut op = AddLabelOperator::new(
1880            Arc::clone(&store),
1881            MockInput::boxed(node_id_chunk(&[node])),
1882            0,
1883            vec!["Employee".to_string()],
1884            vec![LogicalType::Int64, LogicalType::Int64],
1885        );
1886
1887        let chunk = op.next().unwrap().unwrap();
1888        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1889        assert_eq!(updated, 1);
1890
1891        // Verify label was added
1892        let node_data = store.get_node(node).unwrap();
1893        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1894        assert!(labels.contains(&"Person"));
1895        assert!(labels.contains(&"Employee"));
1896    }
1897
1898    #[test]
1899    fn test_add_multiple_labels() {
1900        let store = create_test_store();
1901
1902        let node = store.create_node(&["Base"]);
1903
1904        let mut op = AddLabelOperator::new(
1905            Arc::clone(&store),
1906            MockInput::boxed(node_id_chunk(&[node])),
1907            0,
1908            vec!["LabelA".to_string(), "LabelB".to_string()],
1909            vec![LogicalType::Int64, LogicalType::Int64],
1910        );
1911
1912        let chunk = op.next().unwrap().unwrap();
1913        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1914        assert_eq!(updated, 2); // 2 labels added
1915
1916        let node_data = store.get_node(node).unwrap();
1917        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1918        assert!(labels.contains(&"LabelA"));
1919        assert!(labels.contains(&"LabelB"));
1920    }
1921
1922    #[test]
1923    fn test_add_label_no_input_returns_none() {
1924        let store = create_test_store();
1925
1926        let mut op = AddLabelOperator::new(
1927            Arc::clone(&store),
1928            Box::new(EmptyInput),
1929            0,
1930            vec!["Foo".to_string()],
1931            vec![LogicalType::Int64, LogicalType::Int64],
1932        );
1933
1934        assert!(op.next().unwrap().is_none());
1935    }
1936
1937    // ── RemoveLabelOperator ──────────────────────────────────────
1938
1939    #[test]
1940    fn test_remove_label() {
1941        let store = create_test_store();
1942
1943        let node = store.create_node(&["Person", "Employee"]);
1944
1945        let mut op = RemoveLabelOperator::new(
1946            Arc::clone(&store),
1947            MockInput::boxed(node_id_chunk(&[node])),
1948            0,
1949            vec!["Employee".to_string()],
1950            vec![LogicalType::Int64, LogicalType::Int64],
1951        );
1952
1953        let chunk = op.next().unwrap().unwrap();
1954        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1955        assert_eq!(updated, 1);
1956
1957        // Verify label was removed
1958        let node_data = store.get_node(node).unwrap();
1959        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1960        assert!(labels.contains(&"Person"));
1961        assert!(!labels.contains(&"Employee"));
1962    }
1963
1964    #[test]
1965    fn test_remove_nonexistent_label() {
1966        let store = create_test_store();
1967
1968        let node = store.create_node(&["Person"]);
1969
1970        let mut op = RemoveLabelOperator::new(
1971            Arc::clone(&store),
1972            MockInput::boxed(node_id_chunk(&[node])),
1973            0,
1974            vec!["NonExistent".to_string()],
1975            vec![LogicalType::Int64, LogicalType::Int64],
1976        );
1977
1978        let chunk = op.next().unwrap().unwrap();
1979        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1980        assert_eq!(updated, 0); // nothing removed
1981    }
1982
1983    // ── SetPropertyOperator ──────────────────────────────────────
1984
1985    #[test]
1986    fn test_set_node_property_constant() {
1987        let store = create_test_store();
1988
1989        let node = store.create_node(&["Person"]);
1990
1991        let mut op = SetPropertyOperator::new_for_node(
1992            Arc::clone(&store),
1993            MockInput::boxed(node_id_chunk(&[node])),
1994            0,
1995            vec![(
1996                "name".to_string(),
1997                PropertySource::Constant(Value::String("Alix".into())),
1998            )],
1999            vec![LogicalType::Int64],
2000        );
2001
2002        let chunk = op.next().unwrap().unwrap();
2003        assert_eq!(chunk.row_count(), 1);
2004
2005        // Verify property was set
2006        let node_data = store.get_node(node).unwrap();
2007        assert_eq!(
2008            node_data
2009                .properties
2010                .get(&grafeo_common::types::PropertyKey::new("name")),
2011            Some(&Value::String("Alix".into()))
2012        );
2013    }
2014
2015    #[test]
2016    // reason: test IDs are small sequential counters
2017    #[allow(clippy::cast_possible_wrap)]
2018    fn test_set_node_property_from_column() {
2019        let store = create_test_store();
2020
2021        let node = store.create_node(&["Person"]);
2022
2023        // Input: column 0 = node ID, column 1 = property value
2024        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
2025        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
2026        builder
2027            .column_mut(1)
2028            .unwrap()
2029            .push_value(Value::String("Gus".into()));
2030        builder.advance_row();
2031
2032        let mut op = SetPropertyOperator::new_for_node(
2033            Arc::clone(&store),
2034            MockInput::boxed(builder.finish()),
2035            0,
2036            vec![("name".to_string(), PropertySource::Column(1))],
2037            vec![LogicalType::Int64, LogicalType::String],
2038        );
2039
2040        let chunk = op.next().unwrap().unwrap();
2041        assert_eq!(chunk.row_count(), 1);
2042
2043        let node_data = store.get_node(node).unwrap();
2044        assert_eq!(
2045            node_data
2046                .properties
2047                .get(&grafeo_common::types::PropertyKey::new("name")),
2048            Some(&Value::String("Gus".into()))
2049        );
2050    }
2051
2052    #[test]
2053    fn test_set_edge_property() {
2054        let store = create_test_store();
2055
2056        let n1 = store.create_node(&["N"]);
2057        let n2 = store.create_node(&["N"]);
2058        let eid = store.create_edge(n1, n2, "KNOWS");
2059
2060        let mut op = SetPropertyOperator::new_for_edge(
2061            Arc::clone(&store),
2062            MockInput::boxed(edge_id_chunk(&[eid])),
2063            0,
2064            vec![(
2065                "weight".to_string(),
2066                PropertySource::Constant(Value::Float64(0.75)),
2067            )],
2068            vec![LogicalType::Int64],
2069        );
2070
2071        let chunk = op.next().unwrap().unwrap();
2072        assert_eq!(chunk.row_count(), 1);
2073
2074        let edge_data = store.get_edge(eid).unwrap();
2075        assert_eq!(
2076            edge_data
2077                .properties
2078                .get(&grafeo_common::types::PropertyKey::new("weight")),
2079            Some(&Value::Float64(0.75))
2080        );
2081    }
2082
2083    #[test]
2084    fn test_set_multiple_properties() {
2085        let store = create_test_store();
2086
2087        let node = store.create_node(&["Person"]);
2088
2089        let mut op = SetPropertyOperator::new_for_node(
2090            Arc::clone(&store),
2091            MockInput::boxed(node_id_chunk(&[node])),
2092            0,
2093            vec![
2094                (
2095                    "name".to_string(),
2096                    PropertySource::Constant(Value::String("Alix".into())),
2097                ),
2098                (
2099                    "age".to_string(),
2100                    PropertySource::Constant(Value::Int64(30)),
2101                ),
2102            ],
2103            vec![LogicalType::Int64],
2104        );
2105
2106        op.next().unwrap().unwrap();
2107
2108        let node_data = store.get_node(node).unwrap();
2109        assert_eq!(
2110            node_data
2111                .properties
2112                .get(&grafeo_common::types::PropertyKey::new("name")),
2113            Some(&Value::String("Alix".into()))
2114        );
2115        assert_eq!(
2116            node_data
2117                .properties
2118                .get(&grafeo_common::types::PropertyKey::new("age")),
2119            Some(&Value::Int64(30))
2120        );
2121    }
2122
2123    #[test]
2124    fn test_set_property_no_input_returns_none() {
2125        let store = create_test_store();
2126
2127        let mut op = SetPropertyOperator::new_for_node(
2128            Arc::clone(&store),
2129            Box::new(EmptyInput),
2130            0,
2131            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2132            vec![LogicalType::Int64],
2133        );
2134
2135        assert!(op.next().unwrap().is_none());
2136    }
2137
2138    // ── Error paths ──────────────────────────────────────────────
2139
2140    #[test]
2141    fn test_delete_node_without_detach_errors_when_edges_exist() {
2142        let store = create_test_store();
2143
2144        let n1 = store.create_node(&["Person"]);
2145        let n2 = store.create_node(&["Person"]);
2146        store.create_edge(n1, n2, "KNOWS");
2147
2148        let mut op = DeleteNodeOperator::new(
2149            Arc::clone(&store),
2150            MockInput::boxed(node_id_chunk(&[n1])),
2151            0,
2152            vec![LogicalType::Int64],
2153            false, // no detach
2154        );
2155
2156        let err = op.next().unwrap_err();
2157        match err {
2158            OperatorError::ConstraintViolation(msg) => {
2159                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2160            }
2161            other => panic!("expected ConstraintViolation, got {other:?}"),
2162        }
2163        // Node should still exist
2164        assert_eq!(store.node_count(), 2);
2165    }
2166
2167    // ── CreateNodeOperator with input ───────────────────────────
2168
2169    #[test]
2170    fn test_create_node_with_input_operator() {
2171        let store = create_test_store();
2172
2173        // Seed node to provide input rows
2174        let existing = store.create_node(&["Seed"]);
2175
2176        let mut op = CreateNodeOperator::new(
2177            Arc::clone(&store),
2178            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2179            vec!["Created".to_string()],
2180            vec![(
2181                "source".to_string(),
2182                PropertySource::Constant(Value::String("from_input".into())),
2183            )],
2184            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2185            1,                                            // output column for new node ID
2186        );
2187
2188        let chunk = op.next().unwrap().unwrap();
2189        assert_eq!(chunk.row_count(), 1);
2190
2191        // Should have created one new node (2 total: Seed + Created)
2192        assert_eq!(store.node_count(), 2);
2193
2194        // Exhausted
2195        assert!(op.next().unwrap().is_none());
2196    }
2197
2198    // ── CreateEdgeOperator with properties and output column ────
2199
2200    #[test]
2201    // reason: test IDs are small sequential counters
2202    #[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)]
2203    fn test_create_edge_with_properties_and_output_column() {
2204        let store = create_test_store();
2205
2206        let n1 = store.create_node(&["Person"]);
2207        let n2 = store.create_node(&["Person"]);
2208
2209        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2210        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2211        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2212        builder.advance_row();
2213
2214        let mut op = CreateEdgeOperator::new(
2215            Arc::clone(&store),
2216            MockInput::boxed(builder.finish()),
2217            0,
2218            1,
2219            "KNOWS".to_string(),
2220            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2221        )
2222        .with_properties(vec![(
2223            "since".to_string(),
2224            PropertySource::Constant(Value::Int64(2024)),
2225        )])
2226        .with_output_column(2);
2227
2228        let chunk = op.next().unwrap().unwrap();
2229        assert_eq!(chunk.row_count(), 1);
2230        assert_eq!(store.edge_count(), 1);
2231
2232        // Verify the output chunk contains the edge ID in column 2
2233        let edge_id_raw = chunk
2234            .column(2)
2235            .and_then(|c| c.get_int64(0))
2236            .expect("edge ID should be in output column 2");
2237        let edge_id = EdgeId(edge_id_raw as u64);
2238
2239        // Verify the edge has the property
2240        let edge = store.get_edge(edge_id).expect("edge should exist");
2241        assert_eq!(
2242            edge.properties
2243                .get(&grafeo_common::types::PropertyKey::new("since")),
2244            Some(&Value::Int64(2024))
2245        );
2246    }
2247
2248    // ── SetPropertyOperator with map replacement ────────────────
2249
2250    #[test]
2251    fn test_set_property_map_replace() {
2252        use std::collections::BTreeMap;
2253
2254        let store = create_test_store();
2255
2256        let node = store.create_node(&["Person"]);
2257        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2258
2259        let mut map = BTreeMap::new();
2260        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2261
2262        let mut op = SetPropertyOperator::new_for_node(
2263            Arc::clone(&store),
2264            MockInput::boxed(node_id_chunk(&[node])),
2265            0,
2266            vec![(
2267                "*".to_string(),
2268                PropertySource::Constant(Value::Map(Arc::new(map))),
2269            )],
2270            vec![LogicalType::Int64],
2271        )
2272        .with_replace(true);
2273
2274        op.next().unwrap().unwrap();
2275
2276        let node_data = store.get_node(node).unwrap();
2277        // Old property should be gone
2278        assert!(
2279            node_data
2280                .properties
2281                .get(&PropertyKey::new("old_prop"))
2282                .is_none()
2283        );
2284        // New property should exist
2285        assert_eq!(
2286            node_data.properties.get(&PropertyKey::new("new_key")),
2287            Some(&Value::String("new_val".into()))
2288        );
2289    }
2290
2291    // ── SetPropertyOperator with map merge (no replace) ─────────
2292
2293    #[test]
2294    fn test_set_property_map_merge() {
2295        use std::collections::BTreeMap;
2296
2297        let store = create_test_store();
2298
2299        let node = store.create_node(&["Person"]);
2300        store.set_node_property(node, "existing", Value::Int64(42));
2301
2302        let mut map = BTreeMap::new();
2303        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2304
2305        let mut op = SetPropertyOperator::new_for_node(
2306            Arc::clone(&store),
2307            MockInput::boxed(node_id_chunk(&[node])),
2308            0,
2309            vec![(
2310                "*".to_string(),
2311                PropertySource::Constant(Value::Map(Arc::new(map))),
2312            )],
2313            vec![LogicalType::Int64],
2314        ); // replace defaults to false
2315
2316        op.next().unwrap().unwrap();
2317
2318        let node_data = store.get_node(node).unwrap();
2319        // Existing property should still be there
2320        assert_eq!(
2321            node_data.properties.get(&PropertyKey::new("existing")),
2322            Some(&Value::Int64(42))
2323        );
2324        // New property should also exist
2325        assert_eq!(
2326            node_data.properties.get(&PropertyKey::new("added")),
2327            Some(&Value::String("hello".into()))
2328        );
2329    }
2330
2331    // ── PropertySource::PropertyAccess ──────────────────────────
2332
2333    #[test]
2334    // reason: test IDs are small sequential counters
2335    #[allow(clippy::cast_possible_wrap)]
2336    fn test_property_source_property_access() {
2337        let store = create_test_store();
2338
2339        let source_node = store.create_node(&["Source"]);
2340        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2341
2342        let target_node = store.create_node(&["Target"]);
2343
2344        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2345        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2346        builder.column_mut(0).unwrap().push_node_id(source_node);
2347        builder
2348            .column_mut(1)
2349            .unwrap()
2350            .push_int64(target_node.0 as i64);
2351        builder.advance_row();
2352
2353        let mut op = SetPropertyOperator::new_for_node(
2354            Arc::clone(&store),
2355            MockInput::boxed(builder.finish()),
2356            1, // entity column = target node
2357            vec![(
2358                "copied_name".to_string(),
2359                PropertySource::PropertyAccess {
2360                    column: 0,
2361                    property: "name".to_string(),
2362                },
2363            )],
2364            vec![LogicalType::Node, LogicalType::Int64],
2365        );
2366
2367        op.next().unwrap().unwrap();
2368
2369        let target_data = store.get_node(target_node).unwrap();
2370        assert_eq!(
2371            target_data.properties.get(&PropertyKey::new("copied_name")),
2372            Some(&Value::String("Alix".into()))
2373        );
2374    }
2375
2376    // ── ConstraintValidator integration ─────────────────────────
2377
2378    #[test]
2379    fn test_create_node_with_constraint_validator() {
2380        let store = create_test_store();
2381
2382        struct RejectAgeValidator;
2383        impl ConstraintValidator for RejectAgeValidator {
2384            fn validate_node_property(
2385                &self,
2386                _labels: &[String],
2387                key: &str,
2388                _value: &Value,
2389            ) -> Result<(), OperatorError> {
2390                if key == "forbidden" {
2391                    return Err(OperatorError::ConstraintViolation(
2392                        "property 'forbidden' is not allowed".to_string(),
2393                    ));
2394                }
2395                Ok(())
2396            }
2397            fn validate_node_complete(
2398                &self,
2399                _labels: &[String],
2400                _properties: &[(String, Value)],
2401            ) -> Result<(), OperatorError> {
2402                Ok(())
2403            }
2404            fn check_unique_node_property(
2405                &self,
2406                _labels: &[String],
2407                _key: &str,
2408                _value: &Value,
2409            ) -> Result<(), OperatorError> {
2410                Ok(())
2411            }
2412            fn validate_edge_property(
2413                &self,
2414                _edge_type: &str,
2415                _key: &str,
2416                _value: &Value,
2417            ) -> Result<(), OperatorError> {
2418                Ok(())
2419            }
2420            fn validate_edge_complete(
2421                &self,
2422                _edge_type: &str,
2423                _properties: &[(String, Value)],
2424            ) -> Result<(), OperatorError> {
2425                Ok(())
2426            }
2427        }
2428
2429        // Valid property should succeed
2430        let mut op = CreateNodeOperator::new(
2431            Arc::clone(&store),
2432            None,
2433            vec!["Thing".to_string()],
2434            vec![(
2435                "name".to_string(),
2436                PropertySource::Constant(Value::String("ok".into())),
2437            )],
2438            vec![LogicalType::Int64],
2439            0,
2440        )
2441        .with_validator(Arc::new(RejectAgeValidator));
2442
2443        assert!(op.next().is_ok());
2444        assert_eq!(store.node_count(), 1);
2445
2446        // Forbidden property should fail
2447        let mut op = CreateNodeOperator::new(
2448            Arc::clone(&store),
2449            None,
2450            vec!["Thing".to_string()],
2451            vec![(
2452                "forbidden".to_string(),
2453                PropertySource::Constant(Value::Int64(1)),
2454            )],
2455            vec![LogicalType::Int64],
2456            0,
2457        )
2458        .with_validator(Arc::new(RejectAgeValidator));
2459
2460        let err = op.next().unwrap_err();
2461        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2462        // Node count should still be 2 (the node is created before validation, but the error
2463        // propagates - this tests the validation logic fires)
2464    }
2465
2466    // ── Reset behavior ──────────────────────────────────────────
2467
2468    #[test]
2469    fn test_create_node_reset_allows_re_execution() {
2470        let store = create_test_store();
2471
2472        let mut op = CreateNodeOperator::new(
2473            Arc::clone(&store),
2474            None,
2475            vec!["Person".to_string()],
2476            vec![],
2477            vec![LogicalType::Int64],
2478            0,
2479        );
2480
2481        // First execution
2482        assert!(op.next().unwrap().is_some());
2483        assert!(op.next().unwrap().is_none());
2484
2485        // Reset and re-execute
2486        op.reset();
2487        assert!(op.next().unwrap().is_some());
2488
2489        assert_eq!(store.node_count(), 2);
2490    }
2491
2492    // ── Operator name() ──────────────────────────────────────────
2493
2494    #[test]
2495    fn test_operator_names() {
2496        let store = create_test_store();
2497
2498        let op = CreateNodeOperator::new(
2499            Arc::clone(&store),
2500            None,
2501            vec![],
2502            vec![],
2503            vec![LogicalType::Int64],
2504            0,
2505        );
2506        assert_eq!(op.name(), "CreateNode");
2507
2508        let op = CreateEdgeOperator::new(
2509            Arc::clone(&store),
2510            Box::new(EmptyInput),
2511            0,
2512            1,
2513            "R".to_string(),
2514            vec![LogicalType::Int64],
2515        );
2516        assert_eq!(op.name(), "CreateEdge");
2517
2518        let op = DeleteNodeOperator::new(
2519            Arc::clone(&store),
2520            Box::new(EmptyInput),
2521            0,
2522            vec![LogicalType::Int64],
2523            false,
2524        );
2525        assert_eq!(op.name(), "DeleteNode");
2526
2527        let op = DeleteEdgeOperator::new(
2528            Arc::clone(&store),
2529            Box::new(EmptyInput),
2530            0,
2531            vec![LogicalType::Int64],
2532        );
2533        assert_eq!(op.name(), "DeleteEdge");
2534
2535        let op = AddLabelOperator::new(
2536            Arc::clone(&store),
2537            Box::new(EmptyInput),
2538            0,
2539            vec!["L".to_string()],
2540            vec![LogicalType::Int64],
2541        );
2542        assert_eq!(op.name(), "AddLabel");
2543
2544        let op = RemoveLabelOperator::new(
2545            Arc::clone(&store),
2546            Box::new(EmptyInput),
2547            0,
2548            vec!["L".to_string()],
2549            vec![LogicalType::Int64],
2550        );
2551        assert_eq!(op.name(), "RemoveLabel");
2552
2553        let op = SetPropertyOperator::new_for_node(
2554            Arc::clone(&store),
2555            Box::new(EmptyInput),
2556            0,
2557            vec![],
2558            vec![LogicalType::Int64],
2559        );
2560        assert_eq!(op.name(), "SetProperty");
2561    }
2562
2563    // ── into_any() coverage ─────────────────────────────────────
2564
2565    #[test]
2566    fn test_create_node_into_any() {
2567        let store = create_test_store();
2568        let op = CreateNodeOperator::new(
2569            Arc::clone(&store),
2570            None,
2571            vec!["Person".to_string()],
2572            vec![],
2573            vec![LogicalType::Int64],
2574            0,
2575        );
2576        let any = Box::new(op).into_any();
2577        assert!(any.downcast::<CreateNodeOperator>().is_ok());
2578    }
2579
2580    #[test]
2581    fn test_create_edge_into_any() {
2582        let store = create_test_store();
2583        let op = CreateEdgeOperator::new(
2584            Arc::clone(&store),
2585            Box::new(EmptyInput),
2586            0,
2587            1,
2588            "KNOWS".to_string(),
2589            vec![LogicalType::Int64],
2590        );
2591        let any = Box::new(op).into_any();
2592        assert!(any.downcast::<CreateEdgeOperator>().is_ok());
2593    }
2594
2595    #[test]
2596    fn test_delete_node_into_any() {
2597        let store = create_test_store();
2598        let op = DeleteNodeOperator::new(
2599            Arc::clone(&store),
2600            Box::new(EmptyInput),
2601            0,
2602            vec![LogicalType::Int64],
2603            false,
2604        );
2605        let any = Box::new(op).into_any();
2606        assert!(any.downcast::<DeleteNodeOperator>().is_ok());
2607    }
2608
2609    #[test]
2610    fn test_delete_edge_into_any() {
2611        let store = create_test_store();
2612        let op = DeleteEdgeOperator::new(
2613            Arc::clone(&store),
2614            Box::new(EmptyInput),
2615            0,
2616            vec![LogicalType::Int64],
2617        );
2618        let any = Box::new(op).into_any();
2619        assert!(any.downcast::<DeleteEdgeOperator>().is_ok());
2620    }
2621
2622    #[test]
2623    fn test_add_label_into_any() {
2624        let store = create_test_store();
2625        let op = AddLabelOperator::new(
2626            Arc::clone(&store),
2627            Box::new(EmptyInput),
2628            0,
2629            vec!["Label".to_string()],
2630            vec![LogicalType::Int64],
2631        );
2632        let any = Box::new(op).into_any();
2633        assert!(any.downcast::<AddLabelOperator>().is_ok());
2634    }
2635
2636    #[test]
2637    fn test_remove_label_into_any() {
2638        let store = create_test_store();
2639        let op = RemoveLabelOperator::new(
2640            Arc::clone(&store),
2641            Box::new(EmptyInput),
2642            0,
2643            vec!["Label".to_string()],
2644            vec![LogicalType::Int64],
2645        );
2646        let any = Box::new(op).into_any();
2647        assert!(any.downcast::<RemoveLabelOperator>().is_ok());
2648    }
2649
2650    #[test]
2651    fn test_set_property_into_any() {
2652        let store = create_test_store();
2653        let op = SetPropertyOperator::new_for_node(
2654            Arc::clone(&store),
2655            Box::new(EmptyInput),
2656            0,
2657            vec![],
2658            vec![LogicalType::Int64],
2659        );
2660        let any = Box::new(op).into_any();
2661        assert!(any.downcast::<SetPropertyOperator>().is_ok());
2662    }
2663
2664    // ── ConstraintValidator default methods ──────────────────────
2665
2666    /// A minimal validator that implements only the required methods,
2667    /// relying on defaults for the optional ones.
2668    struct MinimalValidator;
2669
2670    impl ConstraintValidator for MinimalValidator {
2671        fn validate_node_property(
2672            &self,
2673            _labels: &[String],
2674            _key: &str,
2675            _value: &Value,
2676        ) -> Result<(), OperatorError> {
2677            Ok(())
2678        }
2679        fn validate_node_complete(
2680            &self,
2681            _labels: &[String],
2682            _properties: &[(String, Value)],
2683        ) -> Result<(), OperatorError> {
2684            Ok(())
2685        }
2686        fn check_unique_node_property(
2687            &self,
2688            _labels: &[String],
2689            _key: &str,
2690            _value: &Value,
2691        ) -> Result<(), OperatorError> {
2692            Ok(())
2693        }
2694        fn validate_edge_property(
2695            &self,
2696            _edge_type: &str,
2697            _key: &str,
2698            _value: &Value,
2699        ) -> Result<(), OperatorError> {
2700            Ok(())
2701        }
2702        fn validate_edge_complete(
2703            &self,
2704            _edge_type: &str,
2705            _properties: &[(String, Value)],
2706        ) -> Result<(), OperatorError> {
2707            Ok(())
2708        }
2709    }
2710
2711    #[test]
2712    fn test_constraint_validator_default_node_labels_allowed() {
2713        let v = MinimalValidator;
2714        assert!(
2715            v.validate_node_labels_allowed(&["Person".to_string(), "Actor".to_string()])
2716                .is_ok()
2717        );
2718    }
2719
2720    #[test]
2721    fn test_constraint_validator_default_edge_type_allowed() {
2722        let v = MinimalValidator;
2723        assert!(v.validate_edge_type_allowed("KNOWS").is_ok());
2724    }
2725
2726    #[test]
2727    fn test_constraint_validator_default_edge_endpoints() {
2728        let v = MinimalValidator;
2729        assert!(
2730            v.validate_edge_endpoints("KNOWS", &["Person".to_string()], &["Person".to_string()],)
2731                .is_ok()
2732        );
2733    }
2734
2735    #[test]
2736    fn test_constraint_validator_default_inject_defaults() {
2737        let v = MinimalValidator;
2738        let mut props = vec![("name".to_string(), Value::String("Alix".into()))];
2739        v.inject_defaults(&["Person".to_string()], &mut props);
2740        // Default impl is a no-op
2741        assert_eq!(props.len(), 1);
2742    }
2743
2744    // ── PropertySource tests ────────────────────────────────────
2745
2746    #[test]
2747    fn test_property_source_column() {
2748        let store = LpgStore::new().unwrap();
2749        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
2750        builder.column_mut(0).unwrap().push_int64(42);
2751        builder.advance_row();
2752        let chunk = builder.finish();
2753
2754        let src = PropertySource::Column(0);
2755        assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(42));
2756    }
2757
2758    #[test]
2759    fn test_property_source_constant() {
2760        let store = LpgStore::new().unwrap();
2761        let chunk = DataChunk::empty();
2762
2763        let src = PropertySource::Constant(Value::String("hello".into()));
2764        assert_eq!(
2765            src.resolve(&chunk, 0, &store),
2766            Value::String("hello".into()),
2767        );
2768    }
2769
2770    #[test]
2771    fn test_property_source_column_out_of_bounds() {
2772        let store = LpgStore::new().unwrap();
2773        let chunk = DataChunk::empty();
2774
2775        let src = PropertySource::Column(99);
2776        assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2777    }
2778
2779    #[test]
2780    fn test_property_source_property_access_from_map() {
2781        let store = LpgStore::new().unwrap();
2782        let mut map = std::collections::BTreeMap::new();
2783        map.insert(PropertyKey::new("age"), Value::Int64(30));
2784
2785        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
2786        builder
2787            .column_mut(0)
2788            .unwrap()
2789            .push_value(Value::Map(Arc::new(map)));
2790        builder.advance_row();
2791        let chunk = builder.finish();
2792
2793        let src = PropertySource::PropertyAccess {
2794            column: 0,
2795            property: "age".to_string(),
2796        };
2797        assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(30));
2798    }
2799
2800    #[test]
2801    fn test_property_source_property_access_missing_column() {
2802        let store = LpgStore::new().unwrap();
2803        let chunk = DataChunk::empty();
2804
2805        let src = PropertySource::PropertyAccess {
2806            column: 99,
2807            property: "name".to_string(),
2808        };
2809        assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2810    }
2811}