Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{
13    EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
14};
15
16use super::filter::FilterExpression;
17use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
18use crate::execution::chunk::DataChunkBuilder;
19use crate::graph::{GraphStore, GraphStoreMut};
20
21/// Trait for validating schema constraints during mutation operations.
22///
23/// Implementors check type definitions, NOT NULL, and UNIQUE constraints
24/// before data is written to the store.
25pub trait ConstraintValidator: Send + Sync {
26    /// Validates a single property value for a node with the given labels.
27    ///
28    /// Checks type compatibility and NOT NULL constraints.
29    ///
30    /// # Errors
31    ///
32    /// Returns `Err` if the value type is incompatible or a NOT NULL constraint is violated.
33    fn validate_node_property(
34        &self,
35        labels: &[String],
36        key: &str,
37        value: &Value,
38    ) -> Result<(), OperatorError>;
39
40    /// Validates that all required properties are present after creating a node.
41    ///
42    /// Checks NOT NULL constraints for properties that were not explicitly set.
43    ///
44    /// # Errors
45    ///
46    /// Returns `Err` if a required (NOT NULL) property is missing.
47    fn validate_node_complete(
48        &self,
49        labels: &[String],
50        properties: &[(String, Value)],
51    ) -> Result<(), OperatorError>;
52
53    /// Checks UNIQUE constraint for a node property value.
54    ///
55    /// # Errors
56    ///
57    /// Returns `Err` if a node with the same label already has this value.
58    fn check_unique_node_property(
59        &self,
60        labels: &[String],
61        key: &str,
62        value: &Value,
63    ) -> Result<(), OperatorError>;
64
65    /// Validates a single property value for an edge of the given type.
66    ///
67    /// # Errors
68    ///
69    /// Returns `Err` if the value type is incompatible with the edge schema.
70    fn validate_edge_property(
71        &self,
72        edge_type: &str,
73        key: &str,
74        value: &Value,
75    ) -> Result<(), OperatorError>;
76
77    /// Validates that all required properties are present after creating an edge.
78    ///
79    /// # Errors
80    ///
81    /// Returns `Err` if a required property is missing from the edge.
82    fn validate_edge_complete(
83        &self,
84        edge_type: &str,
85        properties: &[(String, Value)],
86    ) -> Result<(), OperatorError>;
87
88    /// Validates that the node labels are allowed by the bound graph type.
89    ///
90    /// # Errors
91    ///
92    /// Returns `Err` if any label is not defined in the graph type.
93    fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
94        let _ = labels;
95        Ok(())
96    }
97
98    /// Validates that the edge type is allowed by the bound graph type.
99    ///
100    /// # Errors
101    ///
102    /// Returns `Err` if the edge type is not defined in the graph type.
103    fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
104        let _ = edge_type;
105        Ok(())
106    }
107
108    /// Validates that edge endpoints have the correct node type labels.
109    ///
110    /// # Errors
111    ///
112    /// Returns `Err` if the source or target node labels do not match the edge type definition.
113    fn validate_edge_endpoints(
114        &self,
115        edge_type: &str,
116        source_labels: &[String],
117        target_labels: &[String],
118    ) -> Result<(), OperatorError> {
119        let _ = (edge_type, source_labels, target_labels);
120        Ok(())
121    }
122
123    /// Injects default values for properties that are defined in a type but
124    /// not explicitly provided.
125    fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
126        let _ = (labels, properties);
127    }
128}
129
130/// Operator that creates new nodes.
131///
132/// For each input row, creates a new node with the specified labels
133/// and properties, then outputs the row with the new node.
134pub struct CreateNodeOperator {
135    /// The graph store to modify.
136    store: Arc<dyn GraphStoreMut>,
137    /// Input operator.
138    input: Option<Box<dyn Operator>>,
139    /// Labels for the new nodes.
140    labels: Vec<String>,
141    /// Properties to set (name -> column index or constant value).
142    properties: Vec<(String, PropertySource)>,
143    /// Output schema.
144    output_schema: Vec<LogicalType>,
145    /// Column index for the created node variable.
146    output_column: usize,
147    /// Whether this operator has been executed (for no-input case).
148    executed: bool,
149    /// Epoch for MVCC versioning.
150    viewing_epoch: Option<EpochId>,
151    /// Transaction ID for MVCC versioning.
152    transaction_id: Option<TransactionId>,
153    /// Optional constraint validator for schema enforcement.
154    validator: Option<Arc<dyn ConstraintValidator>>,
155    /// Optional write tracker for conflict detection.
156    write_tracker: Option<SharedWriteTracker>,
157}
158
159/// Source for a property value.
160#[derive(Debug, Clone)]
161#[non_exhaustive]
162pub enum PropertySource {
163    /// Get value from an input column.
164    Column(usize),
165    /// Use a constant value.
166    Constant(Value),
167    /// Access a named property from a map/node/edge in an input column.
168    PropertyAccess {
169        /// The column containing the map, node ID, or edge ID.
170        column: usize,
171        /// The property name to extract.
172        property: String,
173    },
174    /// A runtime expression that may reference a variable not present in the
175    /// caller's input chunk. Used by `MERGE ... ON CREATE/MATCH SET` so that
176    /// expressions like `coalesce(c.description, 'fallback')` can reference
177    /// the MERGE variable, which only exists after the merge resolves.
178    ///
179    /// Generic operators do not know how to evaluate this variant: the default
180    /// `resolve` implementation returns `Value::Null`. Operators that produce
181    /// an augmented row (e.g., MERGE) must intercept this variant before
182    /// calling `resolve`.
183    Expression {
184        /// The expression to evaluate against an augmented row.
185        expr: Box<FilterExpression>,
186        /// Variable-name to column-index map for the augmented row layout.
187        variable_columns: HashMap<String, usize>,
188    },
189}
190
191impl PropertySource {
192    /// Resolves a property value from a data chunk row.
193    ///
194    /// Returns `Value::Null` for [`PropertySource::Expression`]: those sources
195    /// require an operator-specific augmented row and must be intercepted by
196    /// the producing operator (currently the MERGE node and edge operators).
197    pub fn resolve(
198        &self,
199        chunk: &crate::execution::chunk::DataChunk,
200        row: usize,
201        store: &dyn GraphStore,
202    ) -> Value {
203        match self {
204            PropertySource::Column(col_idx) => chunk
205                .column(*col_idx)
206                .and_then(|c| c.get_value(row))
207                .unwrap_or(Value::Null),
208            PropertySource::Constant(v) => v.clone(),
209            PropertySource::PropertyAccess { column, property } => {
210                let Some(col) = chunk.column(*column) else {
211                    return Value::Null;
212                };
213                // Try node ID first, then edge ID, then map value
214                if let Some(node_id) = col.get_node_id(row) {
215                    store
216                        .get_node(node_id)
217                        .and_then(|node| node.get_property(property).cloned())
218                        .unwrap_or(Value::Null)
219                } else if let Some(edge_id) = col.get_edge_id(row) {
220                    store
221                        .get_edge(edge_id)
222                        .and_then(|edge| edge.get_property(property).cloned())
223                        .unwrap_or(Value::Null)
224                } else if let Some(Value::Map(map)) = col.get_value(row) {
225                    let key = PropertyKey::new(property);
226                    map.get(&key).cloned().unwrap_or(Value::Null)
227                } else {
228                    Value::Null
229                }
230            }
231            // Expression sources require an augmented row built by the producer.
232            // Reaching this branch means an operator forgot to intercept it.
233            PropertySource::Expression { .. } => Value::Null,
234        }
235    }
236}
237
238impl CreateNodeOperator {
239    /// Creates a new node creation operator.
240    ///
241    /// # Arguments
242    /// * `store` - The graph store to modify.
243    /// * `input` - Optional input operator (None for standalone CREATE).
244    /// * `labels` - Labels to assign to created nodes.
245    /// * `properties` - Properties to set on created nodes.
246    /// * `output_schema` - Schema of the output.
247    /// * `output_column` - Column index where the created node ID goes.
248    pub fn new(
249        store: Arc<dyn GraphStoreMut>,
250        input: Option<Box<dyn Operator>>,
251        labels: Vec<String>,
252        properties: Vec<(String, PropertySource)>,
253        output_schema: Vec<LogicalType>,
254        output_column: usize,
255    ) -> Self {
256        Self {
257            store,
258            input,
259            labels,
260            properties,
261            output_schema,
262            output_column,
263            executed: false,
264            viewing_epoch: None,
265            transaction_id: None,
266            validator: None,
267            write_tracker: None,
268        }
269    }
270
271    /// Sets the transaction context for MVCC versioning.
272    pub fn with_transaction_context(
273        mut self,
274        epoch: EpochId,
275        transaction_id: Option<TransactionId>,
276    ) -> Self {
277        self.viewing_epoch = Some(epoch);
278        self.transaction_id = transaction_id;
279        self
280    }
281
282    /// Sets the constraint validator for schema enforcement.
283    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
284        self.validator = Some(validator);
285        self
286    }
287
288    /// Sets the write tracker for conflict detection.
289    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
290        self.write_tracker = Some(tracker);
291        self
292    }
293}
294
295impl CreateNodeOperator {
296    /// Validates and sets properties on a newly created node.
297    fn validate_and_set_properties(
298        &self,
299        node_id: NodeId,
300        resolved_props: &mut Vec<(String, Value)>,
301    ) -> Result<(), OperatorError> {
302        // Phase 0: Validate that node labels are allowed by the bound graph type
303        if let Some(ref validator) = self.validator {
304            validator.validate_node_labels_allowed(&self.labels)?;
305        }
306
307        // Phase 0.5: Inject defaults for properties not explicitly provided
308        if let Some(ref validator) = self.validator {
309            validator.inject_defaults(&self.labels, resolved_props);
310        }
311
312        // Phase 1: Validate each property value
313        if let Some(ref validator) = self.validator {
314            for (name, value) in resolved_props.iter() {
315                validator.validate_node_property(&self.labels, name, value)?;
316                validator.check_unique_node_property(&self.labels, name, value)?;
317            }
318            // Phase 2: Validate completeness (NOT NULL checks for missing required properties)
319            validator.validate_node_complete(&self.labels, resolved_props)?;
320        }
321
322        // Phase 3: Write properties to the store
323        if let Some(tid) = self.transaction_id {
324            for (name, value) in resolved_props.iter() {
325                self.store
326                    .set_node_property_versioned(node_id, name, value.clone(), tid);
327            }
328        } else {
329            for (name, value) in resolved_props.iter() {
330                self.store.set_node_property(node_id, name, value.clone());
331            }
332        }
333        Ok(())
334    }
335}
336
337impl Operator for CreateNodeOperator {
338    fn next(&mut self) -> OperatorResult {
339        // Get transaction context for versioned creation
340        let epoch = self
341            .viewing_epoch
342            .unwrap_or_else(|| self.store.current_epoch());
343        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
344
345        if let Some(ref mut input) = self.input {
346            // For each input row, create a node
347            if let Some(chunk) = input.next()? {
348                let mut builder =
349                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
350
351                for row in chunk.selected_indices() {
352                    // Resolve all property values first (before creating node)
353                    let mut resolved_props: Vec<(String, Value)> = self
354                        .properties
355                        .iter()
356                        .map(|(name, source)| {
357                            let value =
358                                source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
359                            (name.clone(), value)
360                        })
361                        .collect();
362
363                    // Create the node with MVCC versioning
364                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
365                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
366
367                    // Record write for conflict detection
368                    if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
369                        tracker.record_node_write(tid, node_id)?;
370                    }
371
372                    // Validate and set properties
373                    self.validate_and_set_properties(node_id, &mut resolved_props)?;
374
375                    // Copy input columns to output
376                    for col_idx in 0..chunk.column_count() {
377                        if col_idx < self.output_column
378                            && let (Some(src), Some(dst)) =
379                                (chunk.column(col_idx), builder.column_mut(col_idx))
380                        {
381                            if let Some(val) = src.get_value(row) {
382                                dst.push_value(val);
383                            } else {
384                                dst.push_value(Value::Null);
385                            }
386                        }
387                    }
388
389                    // Add the new node ID
390                    if let Some(dst) = builder.column_mut(self.output_column) {
391                        // reason: entity IDs stored as i64, standard encoding
392                        #[allow(clippy::cast_possible_wrap)]
393                        // reason: entity IDs stored as i64, standard encoding
394                        #[allow(clippy::cast_possible_wrap)]
395                        dst.push_value(Value::Int64(node_id.0 as i64));
396                    }
397
398                    builder.advance_row();
399                }
400
401                return Ok(Some(builder.finish()));
402            }
403            Ok(None)
404        } else {
405            // No input - create a single node
406            if self.executed {
407                return Ok(None);
408            }
409            self.executed = true;
410
411            // Resolve constant properties
412            let mut resolved_props: Vec<(String, Value)> = self
413                .properties
414                .iter()
415                .filter_map(|(name, source)| {
416                    if let PropertySource::Constant(value) = source {
417                        Some((name.clone(), value.clone()))
418                    } else {
419                        None
420                    }
421                })
422                .collect();
423
424            // Create the node with MVCC versioning
425            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
426            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
427
428            // Record write for conflict detection
429            if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
430                tracker.record_node_write(tid, node_id)?;
431            }
432
433            // Validate and set properties
434            self.validate_and_set_properties(node_id, &mut resolved_props)?;
435
436            // Build output chunk with just the node ID
437            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
438            if let Some(dst) = builder.column_mut(self.output_column) {
439                // reason: entity IDs stored as i64, standard encoding
440                #[allow(clippy::cast_possible_wrap)]
441                dst.push_value(Value::Int64(node_id.0 as i64));
442            }
443            builder.advance_row();
444
445            Ok(Some(builder.finish()))
446        }
447    }
448
449    fn reset(&mut self) {
450        if let Some(ref mut input) = self.input {
451            input.reset();
452        }
453        self.executed = false;
454    }
455
456    fn name(&self) -> &'static str {
457        "CreateNode"
458    }
459
460    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
461        self
462    }
463}
464
465/// Operator that creates new edges.
466pub struct CreateEdgeOperator {
467    /// The graph store to modify.
468    store: Arc<dyn GraphStoreMut>,
469    /// Input operator.
470    input: Box<dyn Operator>,
471    /// Column index for the source node.
472    from_column: usize,
473    /// Column index for the target node.
474    to_column: usize,
475    /// Edge type.
476    edge_type: String,
477    /// Properties to set.
478    properties: Vec<(String, PropertySource)>,
479    /// Output schema.
480    output_schema: Vec<LogicalType>,
481    /// Column index for the created edge variable (if any).
482    output_column: Option<usize>,
483    /// Epoch for MVCC versioning.
484    viewing_epoch: Option<EpochId>,
485    /// Transaction ID for MVCC versioning.
486    transaction_id: Option<TransactionId>,
487    /// Optional constraint validator for schema enforcement.
488    validator: Option<Arc<dyn ConstraintValidator>>,
489    /// Optional write tracker for conflict detection.
490    write_tracker: Option<SharedWriteTracker>,
491}
492
493impl CreateEdgeOperator {
494    /// Creates a new edge creation operator.
495    ///
496    /// Use builder methods to set additional options:
497    /// - [`with_properties`](Self::with_properties) - set edge properties
498    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
499    /// - [`with_transaction_context`](Self::with_transaction_context) - set transaction context
500    pub fn new(
501        store: Arc<dyn GraphStoreMut>,
502        input: Box<dyn Operator>,
503        from_column: usize,
504        to_column: usize,
505        edge_type: String,
506        output_schema: Vec<LogicalType>,
507    ) -> Self {
508        Self {
509            store,
510            input,
511            from_column,
512            to_column,
513            edge_type,
514            properties: Vec::new(),
515            output_schema,
516            output_column: None,
517            viewing_epoch: None,
518            transaction_id: None,
519            validator: None,
520            write_tracker: None,
521        }
522    }
523
524    /// Sets the properties to assign to created edges.
525    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
526        self.properties = properties;
527        self
528    }
529
530    /// Sets the output column for the created edge ID.
531    pub fn with_output_column(mut self, column: usize) -> Self {
532        self.output_column = Some(column);
533        self
534    }
535
536    /// Sets the transaction context for MVCC versioning.
537    pub fn with_transaction_context(
538        mut self,
539        epoch: EpochId,
540        transaction_id: Option<TransactionId>,
541    ) -> Self {
542        self.viewing_epoch = Some(epoch);
543        self.transaction_id = transaction_id;
544        self
545    }
546
547    /// Sets the constraint validator for schema enforcement.
548    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
549        self.validator = Some(validator);
550        self
551    }
552
553    /// Sets the write tracker for conflict detection.
554    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
555        self.write_tracker = Some(tracker);
556        self
557    }
558}
559
560impl Operator for CreateEdgeOperator {
561    fn next(&mut self) -> OperatorResult {
562        // Get transaction context for versioned creation
563        let epoch = self
564            .viewing_epoch
565            .unwrap_or_else(|| self.store.current_epoch());
566        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
567
568        if let Some(chunk) = self.input.next()? {
569            let mut builder =
570                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
571
572            for row in chunk.selected_indices() {
573                // Get source and target node IDs
574                let from_id = chunk
575                    .column(self.from_column)
576                    .and_then(|c| c.get_value(row))
577                    .ok_or_else(|| {
578                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
579                    })?;
580
581                let to_id = chunk
582                    .column(self.to_column)
583                    .and_then(|c| c.get_value(row))
584                    .ok_or_else(|| {
585                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
586                    })?;
587
588                // Extract node IDs
589                let from_node_id = match from_id {
590                    // reason: ID encoding: i64 <-> u64 round-trip
591                    #[allow(clippy::cast_sign_loss)]
592                    Value::Int64(id) => NodeId(id as u64),
593                    _ => {
594                        return Err(OperatorError::TypeMismatch {
595                            expected: "Int64 (node ID)".to_string(),
596                            found: format!("{from_id:?}"),
597                        });
598                    }
599                };
600
601                let to_node_id = match to_id {
602                    // reason: ID encoding: i64 <-> u64 round-trip
603                    #[allow(clippy::cast_sign_loss)]
604                    Value::Int64(id) => NodeId(id as u64),
605                    _ => {
606                        return Err(OperatorError::TypeMismatch {
607                            expected: "Int64 (node ID)".to_string(),
608                            found: format!("{to_id:?}"),
609                        });
610                    }
611                };
612
613                // Validate graph type and edge endpoint constraints
614                if let Some(ref validator) = self.validator {
615                    validator.validate_edge_type_allowed(&self.edge_type)?;
616
617                    // Look up source and target node labels for endpoint validation
618                    let source_labels: Vec<String> = self
619                        .store
620                        .get_node(from_node_id)
621                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
622                        .unwrap_or_default();
623                    let target_labels: Vec<String> = self
624                        .store
625                        .get_node(to_node_id)
626                        .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
627                        .unwrap_or_default();
628                    validator.validate_edge_endpoints(
629                        &self.edge_type,
630                        &source_labels,
631                        &target_labels,
632                    )?;
633                }
634
635                // Resolve property values
636                let resolved_props: Vec<(String, Value)> = self
637                    .properties
638                    .iter()
639                    .map(|(name, source)| {
640                        let value =
641                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
642                        (name.clone(), value)
643                    })
644                    .collect();
645
646                // Validate constraints before writing
647                if let Some(ref validator) = self.validator {
648                    for (name, value) in &resolved_props {
649                        validator.validate_edge_property(&self.edge_type, name, value)?;
650                    }
651                    validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
652                }
653
654                // Create the edge with MVCC versioning
655                let edge_id = self.store.create_edge_versioned(
656                    from_node_id,
657                    to_node_id,
658                    &self.edge_type,
659                    epoch,
660                    tx,
661                );
662
663                // Record write for conflict detection
664                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
665                    tracker.record_edge_write(tid, edge_id)?;
666                }
667
668                // Set properties
669                if let Some(tid) = self.transaction_id {
670                    for (name, value) in resolved_props {
671                        self.store
672                            .set_edge_property_versioned(edge_id, &name, value, tid);
673                    }
674                } else {
675                    for (name, value) in resolved_props {
676                        self.store.set_edge_property(edge_id, &name, value);
677                    }
678                }
679
680                // Copy input columns
681                for col_idx in 0..chunk.column_count() {
682                    if let (Some(src), Some(dst)) =
683                        (chunk.column(col_idx), builder.column_mut(col_idx))
684                    {
685                        if let Some(val) = src.get_value(row) {
686                            dst.push_value(val);
687                        } else {
688                            dst.push_value(Value::Null);
689                        }
690                    }
691                }
692
693                // Add edge ID if requested
694                if let Some(out_col) = self.output_column
695                    && let Some(dst) = builder.column_mut(out_col)
696                {
697                    // reason: entity IDs stored as i64, standard encoding
698                    #[allow(clippy::cast_possible_wrap)]
699                    dst.push_value(Value::Int64(edge_id.0 as i64));
700                }
701
702                builder.advance_row();
703            }
704
705            return Ok(Some(builder.finish()));
706        }
707        Ok(None)
708    }
709
710    fn reset(&mut self) {
711        self.input.reset();
712    }
713
714    fn name(&self) -> &'static str {
715        "CreateEdge"
716    }
717
718    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
719        self
720    }
721}
722
723/// Operator that deletes nodes.
724pub struct DeleteNodeOperator {
725    /// The graph store to modify.
726    store: Arc<dyn GraphStoreMut>,
727    /// Input operator.
728    input: Box<dyn Operator>,
729    /// Column index for the node to delete.
730    node_column: usize,
731    /// Output schema.
732    output_schema: Vec<LogicalType>,
733    /// Whether to detach (delete connected edges) before deleting.
734    detach: bool,
735    /// Epoch for MVCC versioning.
736    viewing_epoch: Option<EpochId>,
737    /// Transaction ID for MVCC versioning.
738    transaction_id: Option<TransactionId>,
739    /// Optional write tracker for conflict detection.
740    write_tracker: Option<SharedWriteTracker>,
741}
742
743impl DeleteNodeOperator {
744    /// Creates a new node deletion operator.
745    pub fn new(
746        store: Arc<dyn GraphStoreMut>,
747        input: Box<dyn Operator>,
748        node_column: usize,
749        output_schema: Vec<LogicalType>,
750        detach: bool,
751    ) -> Self {
752        Self {
753            store,
754            input,
755            node_column,
756            output_schema,
757            detach,
758            viewing_epoch: None,
759            transaction_id: None,
760            write_tracker: None,
761        }
762    }
763
764    /// Sets the transaction context for MVCC versioning.
765    pub fn with_transaction_context(
766        mut self,
767        epoch: EpochId,
768        transaction_id: Option<TransactionId>,
769    ) -> Self {
770        self.viewing_epoch = Some(epoch);
771        self.transaction_id = transaction_id;
772        self
773    }
774
775    /// Sets the write tracker for conflict detection.
776    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
777        self.write_tracker = Some(tracker);
778        self
779    }
780}
781
782impl Operator for DeleteNodeOperator {
783    fn next(&mut self) -> OperatorResult {
784        // Get transaction context for versioned deletion
785        let epoch = self
786            .viewing_epoch
787            .unwrap_or_else(|| self.store.current_epoch());
788        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
789
790        if let Some(chunk) = self.input.next()? {
791            let mut builder =
792                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
793
794            for row in chunk.selected_indices() {
795                let node_val = chunk
796                    .column(self.node_column)
797                    .and_then(|c| c.get_value(row))
798                    .ok_or_else(|| {
799                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
800                    })?;
801
802                let node_id = match node_val {
803                    // reason: ID encoding: i64 <-> u64 round-trip
804                    #[allow(clippy::cast_sign_loss)]
805                    Value::Int64(id) => NodeId(id as u64),
806                    _ => {
807                        return Err(OperatorError::TypeMismatch {
808                            expected: "Int64 (node ID)".to_string(),
809                            found: format!("{node_val:?}"),
810                        });
811                    }
812                };
813
814                if self.detach {
815                    // Delete all connected edges first, using versioned deletion
816                    // so rollback can restore them
817                    let outgoing = self
818                        .store
819                        .edges_from(node_id, crate::graph::Direction::Outgoing);
820                    let incoming = self
821                        .store
822                        .edges_from(node_id, crate::graph::Direction::Incoming);
823                    for (_, edge_id) in outgoing.into_iter().chain(incoming) {
824                        self.store.delete_edge_versioned(edge_id, epoch, tx);
825                        if let (Some(tracker), Some(tid)) =
826                            (&self.write_tracker, self.transaction_id)
827                        {
828                            tracker.record_edge_write(tid, edge_id)?;
829                        }
830                    }
831                } else {
832                    // NODETACH: check that node has no connected edges
833                    let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
834                    if degree > 0 {
835                        return Err(OperatorError::ConstraintViolation(format!(
836                            "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
837                            degree
838                        )));
839                    }
840                }
841
842                // Delete the node with MVCC versioning
843                self.store.delete_node_versioned(node_id, epoch, tx);
844
845                // Record write for conflict detection
846                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
847                    tracker.record_node_write(tid, node_id)?;
848                }
849
850                // Pass through all input columns so downstream RETURN can
851                // reference the variable (e.g., count(n) after DELETE n).
852                for col_idx in 0..chunk.column_count() {
853                    if let (Some(src), Some(dst)) =
854                        (chunk.column(col_idx), builder.column_mut(col_idx))
855                    {
856                        if let Some(val) = src.get_value(row) {
857                            dst.push_value(val);
858                        } else {
859                            dst.push_value(Value::Null);
860                        }
861                    }
862                }
863                builder.advance_row();
864            }
865
866            return Ok(Some(builder.finish()));
867        }
868        Ok(None)
869    }
870
871    fn reset(&mut self) {
872        self.input.reset();
873    }
874
875    fn name(&self) -> &'static str {
876        "DeleteNode"
877    }
878
879    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
880        self
881    }
882}
883
884/// Operator that deletes edges.
885pub struct DeleteEdgeOperator {
886    /// The graph store to modify.
887    store: Arc<dyn GraphStoreMut>,
888    /// Input operator.
889    input: Box<dyn Operator>,
890    /// Column index for the edge to delete.
891    edge_column: usize,
892    /// Output schema.
893    output_schema: Vec<LogicalType>,
894    /// Epoch for MVCC versioning.
895    viewing_epoch: Option<EpochId>,
896    /// Transaction ID for MVCC versioning.
897    transaction_id: Option<TransactionId>,
898    /// Optional write tracker for conflict detection.
899    write_tracker: Option<SharedWriteTracker>,
900}
901
902impl DeleteEdgeOperator {
903    /// Creates a new edge deletion operator.
904    pub fn new(
905        store: Arc<dyn GraphStoreMut>,
906        input: Box<dyn Operator>,
907        edge_column: usize,
908        output_schema: Vec<LogicalType>,
909    ) -> Self {
910        Self {
911            store,
912            input,
913            edge_column,
914            output_schema,
915            viewing_epoch: None,
916            transaction_id: None,
917            write_tracker: None,
918        }
919    }
920
921    /// Sets the transaction context for MVCC versioning.
922    pub fn with_transaction_context(
923        mut self,
924        epoch: EpochId,
925        transaction_id: Option<TransactionId>,
926    ) -> Self {
927        self.viewing_epoch = Some(epoch);
928        self.transaction_id = transaction_id;
929        self
930    }
931
932    /// Sets the write tracker for conflict detection.
933    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
934        self.write_tracker = Some(tracker);
935        self
936    }
937}
938
939impl Operator for DeleteEdgeOperator {
940    fn next(&mut self) -> OperatorResult {
941        // Get transaction context for versioned deletion
942        let epoch = self
943            .viewing_epoch
944            .unwrap_or_else(|| self.store.current_epoch());
945        let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
946
947        if let Some(chunk) = self.input.next()? {
948            let mut builder =
949                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
950
951            for row in chunk.selected_indices() {
952                let edge_val = chunk
953                    .column(self.edge_column)
954                    .and_then(|c| c.get_value(row))
955                    .ok_or_else(|| {
956                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
957                    })?;
958
959                let edge_id = match edge_val {
960                    // reason: ID encoding: i64 <-> u64 round-trip
961                    #[allow(clippy::cast_sign_loss)]
962                    Value::Int64(id) => EdgeId(id as u64),
963                    _ => {
964                        return Err(OperatorError::TypeMismatch {
965                            expected: "Int64 (edge ID)".to_string(),
966                            found: format!("{edge_val:?}"),
967                        });
968                    }
969                };
970
971                // Delete the edge with MVCC versioning
972                self.store.delete_edge_versioned(edge_id, epoch, tx);
973
974                // Record write for conflict detection
975                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
976                    tracker.record_edge_write(tid, edge_id)?;
977                }
978
979                // Pass through all input columns
980                for col_idx in 0..chunk.column_count() {
981                    if let (Some(src), Some(dst)) =
982                        (chunk.column(col_idx), builder.column_mut(col_idx))
983                    {
984                        if let Some(val) = src.get_value(row) {
985                            dst.push_value(val);
986                        } else {
987                            dst.push_value(Value::Null);
988                        }
989                    }
990                }
991                builder.advance_row();
992            }
993
994            return Ok(Some(builder.finish()));
995        }
996        Ok(None)
997    }
998
999    fn reset(&mut self) {
1000        self.input.reset();
1001    }
1002
1003    fn name(&self) -> &'static str {
1004        "DeleteEdge"
1005    }
1006
1007    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1008        self
1009    }
1010}
1011
1012/// Operator that adds labels to nodes.
1013pub struct AddLabelOperator {
1014    /// The graph store.
1015    store: Arc<dyn GraphStoreMut>,
1016    /// Child operator providing nodes.
1017    input: Box<dyn Operator>,
1018    /// Column index containing node IDs.
1019    node_column: usize,
1020    /// Labels to add.
1021    labels: Vec<String>,
1022    /// Output schema.
1023    output_schema: Vec<LogicalType>,
1024    /// Column index for the update count (last column).
1025    count_column: usize,
1026    /// Epoch for MVCC versioning.
1027    viewing_epoch: Option<EpochId>,
1028    /// Transaction ID for undo log tracking.
1029    transaction_id: Option<TransactionId>,
1030    /// Optional write tracker for conflict detection.
1031    write_tracker: Option<SharedWriteTracker>,
1032}
1033
1034impl AddLabelOperator {
1035    /// Creates a new add label operator.
1036    pub fn new(
1037        store: Arc<dyn GraphStoreMut>,
1038        input: Box<dyn Operator>,
1039        node_column: usize,
1040        labels: Vec<String>,
1041        output_schema: Vec<LogicalType>,
1042    ) -> Self {
1043        let count_column = output_schema.len() - 1;
1044        Self {
1045            store,
1046            input,
1047            node_column,
1048            labels,
1049            count_column,
1050            output_schema,
1051            viewing_epoch: None,
1052            transaction_id: None,
1053            write_tracker: None,
1054        }
1055    }
1056
1057    /// Sets the transaction context for versioned label mutations.
1058    pub fn with_transaction_context(
1059        mut self,
1060        epoch: EpochId,
1061        transaction_id: Option<TransactionId>,
1062    ) -> Self {
1063        self.viewing_epoch = Some(epoch);
1064        self.transaction_id = transaction_id;
1065        self
1066    }
1067
1068    /// Sets the write tracker for conflict detection.
1069    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1070        self.write_tracker = Some(tracker);
1071        self
1072    }
1073}
1074
1075impl Operator for AddLabelOperator {
1076    fn next(&mut self) -> OperatorResult {
1077        if let Some(chunk) = self.input.next()? {
1078            let mut builder =
1079                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1080
1081            for row in chunk.selected_indices() {
1082                let node_val = chunk
1083                    .column(self.node_column)
1084                    .and_then(|c| c.get_value(row))
1085                    .ok_or_else(|| {
1086                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1087                    })?;
1088
1089                let node_id = match node_val {
1090                    // reason: ID encoding: i64 <-> u64 round-trip
1091                    #[allow(clippy::cast_sign_loss)]
1092                    Value::Int64(id) => NodeId(id as u64),
1093                    _ => {
1094                        return Err(OperatorError::TypeMismatch {
1095                            expected: "Int64 (node ID)".to_string(),
1096                            found: format!("{node_val:?}"),
1097                        });
1098                    }
1099                };
1100
1101                // Record write for conflict detection
1102                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1103                    tracker.record_node_write(tid, node_id)?;
1104                }
1105
1106                // Add all labels
1107                let mut row_count: i64 = 0;
1108                for label in &self.labels {
1109                    let added = if let Some(tid) = self.transaction_id {
1110                        self.store.add_label_versioned(node_id, label, tid)
1111                    } else {
1112                        self.store.add_label(node_id, label)
1113                    };
1114                    if added {
1115                        row_count += 1;
1116                    }
1117                }
1118
1119                // Copy input columns to output (pass-through)
1120                for col_idx in 0..chunk.column_count() {
1121                    if let (Some(src), Some(dst)) =
1122                        (chunk.column(col_idx), builder.column_mut(col_idx))
1123                    {
1124                        if let Some(val) = src.get_value(row) {
1125                            dst.push_value(val);
1126                        } else {
1127                            dst.push_value(Value::Null);
1128                        }
1129                    }
1130                }
1131                // Append the update count column
1132                if let Some(dst) = builder.column_mut(self.count_column) {
1133                    dst.push_value(Value::Int64(row_count));
1134                }
1135
1136                builder.advance_row();
1137            }
1138
1139            return Ok(Some(builder.finish()));
1140        }
1141        Ok(None)
1142    }
1143
1144    fn reset(&mut self) {
1145        self.input.reset();
1146    }
1147
1148    fn name(&self) -> &'static str {
1149        "AddLabel"
1150    }
1151
1152    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1153        self
1154    }
1155}
1156
1157/// Operator that removes labels from nodes.
1158pub struct RemoveLabelOperator {
1159    /// The graph store.
1160    store: Arc<dyn GraphStoreMut>,
1161    /// Child operator providing nodes.
1162    input: Box<dyn Operator>,
1163    /// Column index containing node IDs.
1164    node_column: usize,
1165    /// Labels to remove.
1166    labels: Vec<String>,
1167    /// Output schema.
1168    output_schema: Vec<LogicalType>,
1169    /// Column index for the update count (last column).
1170    count_column: usize,
1171    /// Epoch for MVCC versioning.
1172    viewing_epoch: Option<EpochId>,
1173    /// Transaction ID for undo log tracking.
1174    transaction_id: Option<TransactionId>,
1175    /// Optional write tracker for conflict detection.
1176    write_tracker: Option<SharedWriteTracker>,
1177}
1178
1179impl RemoveLabelOperator {
1180    /// Creates a new remove label operator.
1181    pub fn new(
1182        store: Arc<dyn GraphStoreMut>,
1183        input: Box<dyn Operator>,
1184        node_column: usize,
1185        labels: Vec<String>,
1186        output_schema: Vec<LogicalType>,
1187    ) -> Self {
1188        let count_column = output_schema.len() - 1;
1189        Self {
1190            store,
1191            input,
1192            node_column,
1193            labels,
1194            count_column,
1195            output_schema,
1196            viewing_epoch: None,
1197            transaction_id: None,
1198            write_tracker: None,
1199        }
1200    }
1201
1202    /// Sets the transaction context for versioned label mutations.
1203    pub fn with_transaction_context(
1204        mut self,
1205        epoch: EpochId,
1206        transaction_id: Option<TransactionId>,
1207    ) -> Self {
1208        self.viewing_epoch = Some(epoch);
1209        self.transaction_id = transaction_id;
1210        self
1211    }
1212
1213    /// Sets the write tracker for conflict detection.
1214    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1215        self.write_tracker = Some(tracker);
1216        self
1217    }
1218}
1219
1220impl Operator for RemoveLabelOperator {
1221    fn next(&mut self) -> OperatorResult {
1222        if let Some(chunk) = self.input.next()? {
1223            let mut builder =
1224                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1225
1226            for row in chunk.selected_indices() {
1227                let node_val = chunk
1228                    .column(self.node_column)
1229                    .and_then(|c| c.get_value(row))
1230                    .ok_or_else(|| {
1231                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1232                    })?;
1233
1234                let node_id = match node_val {
1235                    // reason: ID encoding: i64 <-> u64 round-trip
1236                    #[allow(clippy::cast_sign_loss)]
1237                    Value::Int64(id) => NodeId(id as u64),
1238                    _ => {
1239                        return Err(OperatorError::TypeMismatch {
1240                            expected: "Int64 (node ID)".to_string(),
1241                            found: format!("{node_val:?}"),
1242                        });
1243                    }
1244                };
1245
1246                // Record write for conflict detection
1247                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1248                    tracker.record_node_write(tid, node_id)?;
1249                }
1250
1251                // Remove all labels
1252                let mut row_count: i64 = 0;
1253                for label in &self.labels {
1254                    let removed = if let Some(tid) = self.transaction_id {
1255                        self.store.remove_label_versioned(node_id, label, tid)
1256                    } else {
1257                        self.store.remove_label(node_id, label)
1258                    };
1259                    if removed {
1260                        row_count += 1;
1261                    }
1262                }
1263
1264                // Copy input columns to output (pass-through)
1265                for col_idx in 0..chunk.column_count() {
1266                    if let (Some(src), Some(dst)) =
1267                        (chunk.column(col_idx), builder.column_mut(col_idx))
1268                    {
1269                        if let Some(val) = src.get_value(row) {
1270                            dst.push_value(val);
1271                        } else {
1272                            dst.push_value(Value::Null);
1273                        }
1274                    }
1275                }
1276                // Append the update count column
1277                if let Some(dst) = builder.column_mut(self.count_column) {
1278                    dst.push_value(Value::Int64(row_count));
1279                }
1280
1281                builder.advance_row();
1282            }
1283
1284            return Ok(Some(builder.finish()));
1285        }
1286        Ok(None)
1287    }
1288
1289    fn reset(&mut self) {
1290        self.input.reset();
1291    }
1292
1293    fn name(&self) -> &'static str {
1294        "RemoveLabel"
1295    }
1296
1297    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1298        self
1299    }
1300}
1301
1302/// Operator that sets properties on nodes or edges.
1303///
1304/// This operator reads node/edge IDs from a column and sets the
1305/// specified properties on each entity.
1306pub struct SetPropertyOperator {
1307    /// The graph store.
1308    store: Arc<dyn GraphStoreMut>,
1309    /// Child operator providing entities.
1310    input: Box<dyn Operator>,
1311    /// Column index containing entity IDs (node or edge).
1312    entity_column: usize,
1313    /// Whether the entity is an edge (false = node).
1314    is_edge: bool,
1315    /// Properties to set (name -> source).
1316    properties: Vec<(String, PropertySource)>,
1317    /// Output schema.
1318    output_schema: Vec<LogicalType>,
1319    /// Whether to replace all properties (true) or merge (false) for map assignments.
1320    replace: bool,
1321    /// Optional constraint validator for schema enforcement.
1322    validator: Option<Arc<dyn ConstraintValidator>>,
1323    /// Entity labels (for node constraint validation).
1324    labels: Vec<String>,
1325    /// Edge type (for edge constraint validation).
1326    edge_type_name: Option<String>,
1327    /// Epoch for MVCC versioning.
1328    viewing_epoch: Option<EpochId>,
1329    /// Transaction ID for undo log tracking.
1330    transaction_id: Option<TransactionId>,
1331    /// Optional write tracker for conflict detection.
1332    write_tracker: Option<SharedWriteTracker>,
1333}
1334
1335impl SetPropertyOperator {
1336    /// Creates a new set property operator for nodes.
1337    pub fn new_for_node(
1338        store: Arc<dyn GraphStoreMut>,
1339        input: Box<dyn Operator>,
1340        node_column: usize,
1341        properties: Vec<(String, PropertySource)>,
1342        output_schema: Vec<LogicalType>,
1343    ) -> Self {
1344        Self {
1345            store,
1346            input,
1347            entity_column: node_column,
1348            is_edge: false,
1349            properties,
1350            output_schema,
1351            replace: false,
1352            validator: None,
1353            labels: Vec::new(),
1354            edge_type_name: None,
1355            viewing_epoch: None,
1356            transaction_id: None,
1357            write_tracker: None,
1358        }
1359    }
1360
1361    /// Creates a new set property operator for edges.
1362    pub fn new_for_edge(
1363        store: Arc<dyn GraphStoreMut>,
1364        input: Box<dyn Operator>,
1365        edge_column: usize,
1366        properties: Vec<(String, PropertySource)>,
1367        output_schema: Vec<LogicalType>,
1368    ) -> Self {
1369        Self {
1370            store,
1371            input,
1372            entity_column: edge_column,
1373            is_edge: true,
1374            properties,
1375            output_schema,
1376            replace: false,
1377            validator: None,
1378            labels: Vec::new(),
1379            edge_type_name: None,
1380            viewing_epoch: None,
1381            transaction_id: None,
1382            write_tracker: None,
1383        }
1384    }
1385
1386    /// Sets whether this operator replaces all properties (for map assignment).
1387    pub fn with_replace(mut self, replace: bool) -> Self {
1388        self.replace = replace;
1389        self
1390    }
1391
1392    /// Sets the constraint validator for schema enforcement.
1393    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1394        self.validator = Some(validator);
1395        self
1396    }
1397
1398    /// Sets the entity labels (for node constraint validation).
1399    pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1400        self.labels = labels;
1401        self
1402    }
1403
1404    /// Sets the edge type name (for edge constraint validation).
1405    pub fn with_edge_type(mut self, edge_type: String) -> Self {
1406        self.edge_type_name = Some(edge_type);
1407        self
1408    }
1409
1410    /// Sets the transaction context for versioned property mutations.
1411    ///
1412    /// When a transaction ID is provided, property changes are recorded in
1413    /// an undo log so they can be restored on rollback.
1414    pub fn with_transaction_context(
1415        mut self,
1416        epoch: EpochId,
1417        transaction_id: Option<TransactionId>,
1418    ) -> Self {
1419        self.viewing_epoch = Some(epoch);
1420        self.transaction_id = transaction_id;
1421        self
1422    }
1423
1424    /// Sets the write tracker for conflict detection.
1425    pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1426        self.write_tracker = Some(tracker);
1427        self
1428    }
1429}
1430
1431impl Operator for SetPropertyOperator {
1432    fn next(&mut self) -> OperatorResult {
1433        if let Some(chunk) = self.input.next()? {
1434            let mut builder =
1435                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1436
1437            for row in chunk.selected_indices() {
1438                let entity_val = chunk
1439                    .column(self.entity_column)
1440                    .and_then(|c| c.get_value(row))
1441                    .ok_or_else(|| {
1442                        OperatorError::ColumnNotFound(format!(
1443                            "entity column {}",
1444                            self.entity_column
1445                        ))
1446                    })?;
1447
1448                let entity_id = match entity_val {
1449                    // reason: ID encoding: i64 <-> u64 round-trip
1450                    #[allow(clippy::cast_sign_loss)]
1451                    Value::Int64(id) => id as u64,
1452                    _ => {
1453                        return Err(OperatorError::TypeMismatch {
1454                            expected: "Int64 (entity ID)".to_string(),
1455                            found: format!("{entity_val:?}"),
1456                        });
1457                    }
1458                };
1459
1460                // Record write for conflict detection
1461                if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1462                    if self.is_edge {
1463                        tracker.record_edge_write(tid, EdgeId(entity_id))?;
1464                    } else {
1465                        tracker.record_node_write(tid, NodeId(entity_id))?;
1466                    }
1467                }
1468
1469                // Resolve all property values
1470                let resolved_props: Vec<(String, Value)> = self
1471                    .properties
1472                    .iter()
1473                    .map(|(name, source)| {
1474                        let value =
1475                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1476                        (name.clone(), value)
1477                    })
1478                    .collect();
1479
1480                // Validate constraints before writing
1481                if let Some(ref validator) = self.validator {
1482                    if self.is_edge {
1483                        if let Some(ref et) = self.edge_type_name {
1484                            for (name, value) in &resolved_props {
1485                                validator.validate_edge_property(et, name, value)?;
1486                            }
1487                        }
1488                    } else {
1489                        for (name, value) in &resolved_props {
1490                            validator.validate_node_property(&self.labels, name, value)?;
1491                            validator.check_unique_node_property(&self.labels, name, value)?;
1492                        }
1493                    }
1494                }
1495
1496                // Write all properties (use versioned methods when inside a transaction)
1497                let tx_id = self.transaction_id;
1498                for (prop_name, value) in resolved_props {
1499                    if prop_name == "*" {
1500                        // Map assignment: value should be a Map
1501                        if let Value::Map(map) = value {
1502                            if self.replace {
1503                                // Replace: remove all existing properties first
1504                                if self.is_edge {
1505                                    if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1506                                        let keys: Vec<String> = edge
1507                                            .properties
1508                                            .iter()
1509                                            .map(|(k, _)| k.as_str().to_string())
1510                                            .collect();
1511                                        for key in keys {
1512                                            if let Some(tid) = tx_id {
1513                                                self.store.remove_edge_property_versioned(
1514                                                    EdgeId(entity_id),
1515                                                    &key,
1516                                                    tid,
1517                                                );
1518                                            } else {
1519                                                self.store
1520                                                    .remove_edge_property(EdgeId(entity_id), &key);
1521                                            }
1522                                        }
1523                                    }
1524                                } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1525                                    let keys: Vec<String> = node
1526                                        .properties
1527                                        .iter()
1528                                        .map(|(k, _)| k.as_str().to_string())
1529                                        .collect();
1530                                    for key in keys {
1531                                        if let Some(tid) = tx_id {
1532                                            self.store.remove_node_property_versioned(
1533                                                NodeId(entity_id),
1534                                                &key,
1535                                                tid,
1536                                            );
1537                                        } else {
1538                                            self.store
1539                                                .remove_node_property(NodeId(entity_id), &key);
1540                                        }
1541                                    }
1542                                }
1543                            }
1544                            // Set each map entry (null values remove the property)
1545                            for (key, val) in map.iter() {
1546                                if val.is_null() {
1547                                    // Null in SET += removes the property (Cypher/GQL semantics)
1548                                    if self.is_edge {
1549                                        if let Some(tid) = tx_id {
1550                                            self.store.remove_edge_property_versioned(
1551                                                EdgeId(entity_id),
1552                                                key.as_str(),
1553                                                tid,
1554                                            );
1555                                        } else {
1556                                            self.store.remove_edge_property(
1557                                                EdgeId(entity_id),
1558                                                key.as_str(),
1559                                            );
1560                                        }
1561                                    } else if let Some(tid) = tx_id {
1562                                        self.store.remove_node_property_versioned(
1563                                            NodeId(entity_id),
1564                                            key.as_str(),
1565                                            tid,
1566                                        );
1567                                    } else {
1568                                        self.store
1569                                            .remove_node_property(NodeId(entity_id), key.as_str());
1570                                    }
1571                                } else if self.is_edge {
1572                                    if let Some(tid) = tx_id {
1573                                        self.store.set_edge_property_versioned(
1574                                            EdgeId(entity_id),
1575                                            key.as_str(),
1576                                            val.clone(),
1577                                            tid,
1578                                        );
1579                                    } else {
1580                                        self.store.set_edge_property(
1581                                            EdgeId(entity_id),
1582                                            key.as_str(),
1583                                            val.clone(),
1584                                        );
1585                                    }
1586                                } else if let Some(tid) = tx_id {
1587                                    self.store.set_node_property_versioned(
1588                                        NodeId(entity_id),
1589                                        key.as_str(),
1590                                        val.clone(),
1591                                        tid,
1592                                    );
1593                                } else {
1594                                    self.store.set_node_property(
1595                                        NodeId(entity_id),
1596                                        key.as_str(),
1597                                        val.clone(),
1598                                    );
1599                                }
1600                            }
1601                        }
1602                    } else if self.is_edge {
1603                        if let Some(tid) = tx_id {
1604                            self.store.set_edge_property_versioned(
1605                                EdgeId(entity_id),
1606                                &prop_name,
1607                                value,
1608                                tid,
1609                            );
1610                        } else {
1611                            self.store
1612                                .set_edge_property(EdgeId(entity_id), &prop_name, value);
1613                        }
1614                    } else if let Some(tid) = tx_id {
1615                        self.store.set_node_property_versioned(
1616                            NodeId(entity_id),
1617                            &prop_name,
1618                            value,
1619                            tid,
1620                        );
1621                    } else {
1622                        self.store
1623                            .set_node_property(NodeId(entity_id), &prop_name, value);
1624                    }
1625                }
1626
1627                // Copy input columns to output
1628                for col_idx in 0..chunk.column_count() {
1629                    if let (Some(src), Some(dst)) =
1630                        (chunk.column(col_idx), builder.column_mut(col_idx))
1631                    {
1632                        if let Some(val) = src.get_value(row) {
1633                            dst.push_value(val);
1634                        } else {
1635                            dst.push_value(Value::Null);
1636                        }
1637                    }
1638                }
1639
1640                builder.advance_row();
1641            }
1642
1643            return Ok(Some(builder.finish()));
1644        }
1645        Ok(None)
1646    }
1647
1648    fn reset(&mut self) {
1649        self.input.reset();
1650    }
1651
1652    fn name(&self) -> &'static str {
1653        "SetProperty"
1654    }
1655
1656    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1657        self
1658    }
1659}
1660
1661#[cfg(all(test, feature = "lpg"))]
1662mod tests {
1663    use super::*;
1664    use crate::execution::DataChunk;
1665    use crate::execution::chunk::DataChunkBuilder;
1666    use crate::graph::lpg::LpgStore;
1667
1668    // ── Helpers ────────────────────────────────────────────────────
1669
1670    fn create_test_store() -> Arc<dyn GraphStoreMut> {
1671        Arc::new(LpgStore::new().unwrap())
1672    }
1673
1674    struct MockInput {
1675        chunk: Option<DataChunk>,
1676    }
1677
1678    impl MockInput {
1679        fn boxed(chunk: DataChunk) -> Box<Self> {
1680            Box::new(Self { chunk: Some(chunk) })
1681        }
1682    }
1683
1684    impl Operator for MockInput {
1685        fn next(&mut self) -> OperatorResult {
1686            Ok(self.chunk.take())
1687        }
1688        fn reset(&mut self) {}
1689        fn name(&self) -> &'static str {
1690            "MockInput"
1691        }
1692
1693        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1694            self
1695        }
1696    }
1697
1698    struct EmptyInput;
1699    impl Operator for EmptyInput {
1700        fn next(&mut self) -> OperatorResult {
1701            Ok(None)
1702        }
1703        fn reset(&mut self) {}
1704        fn name(&self) -> &'static str {
1705            "EmptyInput"
1706        }
1707
1708        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1709            self
1710        }
1711    }
1712
1713    // reason: test IDs are small sequential counters
1714    #[allow(clippy::cast_possible_wrap)]
1715    fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1716        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1717        for id in ids {
1718            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1719            builder.advance_row();
1720        }
1721        builder.finish()
1722    }
1723
1724    // reason: test IDs are small sequential counters
1725    #[allow(clippy::cast_possible_wrap)]
1726    fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1727        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1728        for id in ids {
1729            builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1730            builder.advance_row();
1731        }
1732        builder.finish()
1733    }
1734
1735    // ── CreateNodeOperator ──────────────────────────────────────
1736
1737    #[test]
1738    fn test_create_node_standalone() {
1739        let store = create_test_store();
1740
1741        let mut op = CreateNodeOperator::new(
1742            Arc::clone(&store),
1743            None,
1744            vec!["Person".to_string()],
1745            vec![(
1746                "name".to_string(),
1747                PropertySource::Constant(Value::String("Alix".into())),
1748            )],
1749            vec![LogicalType::Int64],
1750            0,
1751        );
1752
1753        let chunk = op.next().unwrap().unwrap();
1754        assert_eq!(chunk.row_count(), 1);
1755
1756        // Second call should return None (standalone executes once)
1757        assert!(op.next().unwrap().is_none());
1758
1759        assert_eq!(store.node_count(), 1);
1760    }
1761
1762    #[test]
1763    // reason: test IDs are small sequential counters
1764    #[allow(clippy::cast_possible_wrap)]
1765    fn test_create_edge() {
1766        let store = create_test_store();
1767
1768        let node1 = store.create_node(&["Person"]);
1769        let node2 = store.create_node(&["Person"]);
1770
1771        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1772        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1773        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1774        builder.advance_row();
1775
1776        let mut op = CreateEdgeOperator::new(
1777            Arc::clone(&store),
1778            MockInput::boxed(builder.finish()),
1779            0,
1780            1,
1781            "KNOWS".to_string(),
1782            vec![LogicalType::Int64, LogicalType::Int64],
1783        );
1784
1785        let _chunk = op.next().unwrap().unwrap();
1786        assert_eq!(store.edge_count(), 1);
1787    }
1788
1789    #[test]
1790    fn test_delete_node() {
1791        let store = create_test_store();
1792
1793        let node_id = store.create_node(&["Person"]);
1794        assert_eq!(store.node_count(), 1);
1795
1796        let mut op = DeleteNodeOperator::new(
1797            Arc::clone(&store),
1798            MockInput::boxed(node_id_chunk(&[node_id])),
1799            0,
1800            vec![LogicalType::Node],
1801            false,
1802        );
1803
1804        let chunk = op.next().unwrap().unwrap();
1805        // Pass-through: output row contains the original node ID
1806        assert_eq!(chunk.row_count(), 1);
1807        assert_eq!(store.node_count(), 0);
1808    }
1809
1810    // ── DeleteEdgeOperator ───────────────────────────────────────
1811
1812    #[test]
1813    fn test_delete_edge() {
1814        let store = create_test_store();
1815
1816        let n1 = store.create_node(&["Person"]);
1817        let n2 = store.create_node(&["Person"]);
1818        let eid = store.create_edge(n1, n2, "KNOWS");
1819        assert_eq!(store.edge_count(), 1);
1820
1821        let mut op = DeleteEdgeOperator::new(
1822            Arc::clone(&store),
1823            MockInput::boxed(edge_id_chunk(&[eid])),
1824            0,
1825            vec![LogicalType::Node],
1826        );
1827
1828        let chunk = op.next().unwrap().unwrap();
1829        assert_eq!(chunk.row_count(), 1);
1830        assert_eq!(store.edge_count(), 0);
1831    }
1832
1833    #[test]
1834    fn test_delete_edge_no_input_returns_none() {
1835        let store = create_test_store();
1836
1837        let mut op = DeleteEdgeOperator::new(
1838            Arc::clone(&store),
1839            Box::new(EmptyInput),
1840            0,
1841            vec![LogicalType::Int64],
1842        );
1843
1844        assert!(op.next().unwrap().is_none());
1845    }
1846
1847    #[test]
1848    fn test_delete_multiple_edges() {
1849        let store = create_test_store();
1850
1851        let n1 = store.create_node(&["N"]);
1852        let n2 = store.create_node(&["N"]);
1853        let e1 = store.create_edge(n1, n2, "R");
1854        let e2 = store.create_edge(n2, n1, "S");
1855        assert_eq!(store.edge_count(), 2);
1856
1857        let mut op = DeleteEdgeOperator::new(
1858            Arc::clone(&store),
1859            MockInput::boxed(edge_id_chunk(&[e1, e2])),
1860            0,
1861            vec![LogicalType::Node],
1862        );
1863
1864        let chunk = op.next().unwrap().unwrap();
1865        assert_eq!(chunk.row_count(), 2);
1866        assert_eq!(store.edge_count(), 0);
1867    }
1868
1869    // ── DeleteNodeOperator with DETACH ───────────────────────────
1870
1871    #[test]
1872    fn test_delete_node_detach() {
1873        let store = create_test_store();
1874
1875        let n1 = store.create_node(&["Person"]);
1876        let n2 = store.create_node(&["Person"]);
1877        store.create_edge(n1, n2, "KNOWS");
1878        store.create_edge(n2, n1, "FOLLOWS");
1879        assert_eq!(store.edge_count(), 2);
1880
1881        let mut op = DeleteNodeOperator::new(
1882            Arc::clone(&store),
1883            MockInput::boxed(node_id_chunk(&[n1])),
1884            0,
1885            vec![LogicalType::Node],
1886            true, // detach = true
1887        );
1888
1889        let chunk = op.next().unwrap().unwrap();
1890        assert_eq!(chunk.row_count(), 1);
1891        assert_eq!(store.node_count(), 1);
1892        assert_eq!(store.edge_count(), 0); // edges detached
1893    }
1894
1895    // ── AddLabelOperator ─────────────────────────────────────────
1896
1897    #[test]
1898    fn test_add_label() {
1899        let store = create_test_store();
1900
1901        let node = store.create_node(&["Person"]);
1902
1903        let mut op = AddLabelOperator::new(
1904            Arc::clone(&store),
1905            MockInput::boxed(node_id_chunk(&[node])),
1906            0,
1907            vec!["Employee".to_string()],
1908            vec![LogicalType::Int64, LogicalType::Int64],
1909        );
1910
1911        let chunk = op.next().unwrap().unwrap();
1912        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1913        assert_eq!(updated, 1);
1914
1915        // Verify label was added
1916        let node_data = store.get_node(node).unwrap();
1917        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1918        assert!(labels.contains(&"Person"));
1919        assert!(labels.contains(&"Employee"));
1920    }
1921
1922    #[test]
1923    fn test_add_multiple_labels() {
1924        let store = create_test_store();
1925
1926        let node = store.create_node(&["Base"]);
1927
1928        let mut op = AddLabelOperator::new(
1929            Arc::clone(&store),
1930            MockInput::boxed(node_id_chunk(&[node])),
1931            0,
1932            vec!["LabelA".to_string(), "LabelB".to_string()],
1933            vec![LogicalType::Int64, LogicalType::Int64],
1934        );
1935
1936        let chunk = op.next().unwrap().unwrap();
1937        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1938        assert_eq!(updated, 2); // 2 labels added
1939
1940        let node_data = store.get_node(node).unwrap();
1941        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1942        assert!(labels.contains(&"LabelA"));
1943        assert!(labels.contains(&"LabelB"));
1944    }
1945
1946    #[test]
1947    fn test_add_label_no_input_returns_none() {
1948        let store = create_test_store();
1949
1950        let mut op = AddLabelOperator::new(
1951            Arc::clone(&store),
1952            Box::new(EmptyInput),
1953            0,
1954            vec!["Foo".to_string()],
1955            vec![LogicalType::Int64, LogicalType::Int64],
1956        );
1957
1958        assert!(op.next().unwrap().is_none());
1959    }
1960
1961    // ── RemoveLabelOperator ──────────────────────────────────────
1962
1963    #[test]
1964    fn test_remove_label() {
1965        let store = create_test_store();
1966
1967        let node = store.create_node(&["Person", "Employee"]);
1968
1969        let mut op = RemoveLabelOperator::new(
1970            Arc::clone(&store),
1971            MockInput::boxed(node_id_chunk(&[node])),
1972            0,
1973            vec!["Employee".to_string()],
1974            vec![LogicalType::Int64, LogicalType::Int64],
1975        );
1976
1977        let chunk = op.next().unwrap().unwrap();
1978        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1979        assert_eq!(updated, 1);
1980
1981        // Verify label was removed
1982        let node_data = store.get_node(node).unwrap();
1983        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1984        assert!(labels.contains(&"Person"));
1985        assert!(!labels.contains(&"Employee"));
1986    }
1987
1988    #[test]
1989    fn test_remove_nonexistent_label() {
1990        let store = create_test_store();
1991
1992        let node = store.create_node(&["Person"]);
1993
1994        let mut op = RemoveLabelOperator::new(
1995            Arc::clone(&store),
1996            MockInput::boxed(node_id_chunk(&[node])),
1997            0,
1998            vec!["NonExistent".to_string()],
1999            vec![LogicalType::Int64, LogicalType::Int64],
2000        );
2001
2002        let chunk = op.next().unwrap().unwrap();
2003        let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
2004        assert_eq!(updated, 0); // nothing removed
2005    }
2006
2007    // ── SetPropertyOperator ──────────────────────────────────────
2008
2009    #[test]
2010    fn test_set_node_property_constant() {
2011        let store = create_test_store();
2012
2013        let node = store.create_node(&["Person"]);
2014
2015        let mut op = SetPropertyOperator::new_for_node(
2016            Arc::clone(&store),
2017            MockInput::boxed(node_id_chunk(&[node])),
2018            0,
2019            vec![(
2020                "name".to_string(),
2021                PropertySource::Constant(Value::String("Alix".into())),
2022            )],
2023            vec![LogicalType::Int64],
2024        );
2025
2026        let chunk = op.next().unwrap().unwrap();
2027        assert_eq!(chunk.row_count(), 1);
2028
2029        // Verify property was set
2030        let node_data = store.get_node(node).unwrap();
2031        assert_eq!(
2032            node_data
2033                .properties
2034                .get(&grafeo_common::types::PropertyKey::new("name")),
2035            Some(&Value::String("Alix".into()))
2036        );
2037    }
2038
2039    #[test]
2040    // reason: test IDs are small sequential counters
2041    #[allow(clippy::cast_possible_wrap)]
2042    fn test_set_node_property_from_column() {
2043        let store = create_test_store();
2044
2045        let node = store.create_node(&["Person"]);
2046
2047        // Input: column 0 = node ID, column 1 = property value
2048        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
2049        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
2050        builder
2051            .column_mut(1)
2052            .unwrap()
2053            .push_value(Value::String("Gus".into()));
2054        builder.advance_row();
2055
2056        let mut op = SetPropertyOperator::new_for_node(
2057            Arc::clone(&store),
2058            MockInput::boxed(builder.finish()),
2059            0,
2060            vec![("name".to_string(), PropertySource::Column(1))],
2061            vec![LogicalType::Int64, LogicalType::String],
2062        );
2063
2064        let chunk = op.next().unwrap().unwrap();
2065        assert_eq!(chunk.row_count(), 1);
2066
2067        let node_data = store.get_node(node).unwrap();
2068        assert_eq!(
2069            node_data
2070                .properties
2071                .get(&grafeo_common::types::PropertyKey::new("name")),
2072            Some(&Value::String("Gus".into()))
2073        );
2074    }
2075
2076    #[test]
2077    fn test_set_edge_property() {
2078        let store = create_test_store();
2079
2080        let n1 = store.create_node(&["N"]);
2081        let n2 = store.create_node(&["N"]);
2082        let eid = store.create_edge(n1, n2, "KNOWS");
2083
2084        let mut op = SetPropertyOperator::new_for_edge(
2085            Arc::clone(&store),
2086            MockInput::boxed(edge_id_chunk(&[eid])),
2087            0,
2088            vec![(
2089                "weight".to_string(),
2090                PropertySource::Constant(Value::Float64(0.75)),
2091            )],
2092            vec![LogicalType::Int64],
2093        );
2094
2095        let chunk = op.next().unwrap().unwrap();
2096        assert_eq!(chunk.row_count(), 1);
2097
2098        let edge_data = store.get_edge(eid).unwrap();
2099        assert_eq!(
2100            edge_data
2101                .properties
2102                .get(&grafeo_common::types::PropertyKey::new("weight")),
2103            Some(&Value::Float64(0.75))
2104        );
2105    }
2106
2107    #[test]
2108    fn test_set_multiple_properties() {
2109        let store = create_test_store();
2110
2111        let node = store.create_node(&["Person"]);
2112
2113        let mut op = SetPropertyOperator::new_for_node(
2114            Arc::clone(&store),
2115            MockInput::boxed(node_id_chunk(&[node])),
2116            0,
2117            vec![
2118                (
2119                    "name".to_string(),
2120                    PropertySource::Constant(Value::String("Alix".into())),
2121                ),
2122                (
2123                    "age".to_string(),
2124                    PropertySource::Constant(Value::Int64(30)),
2125                ),
2126            ],
2127            vec![LogicalType::Int64],
2128        );
2129
2130        op.next().unwrap().unwrap();
2131
2132        let node_data = store.get_node(node).unwrap();
2133        assert_eq!(
2134            node_data
2135                .properties
2136                .get(&grafeo_common::types::PropertyKey::new("name")),
2137            Some(&Value::String("Alix".into()))
2138        );
2139        assert_eq!(
2140            node_data
2141                .properties
2142                .get(&grafeo_common::types::PropertyKey::new("age")),
2143            Some(&Value::Int64(30))
2144        );
2145    }
2146
2147    #[test]
2148    fn test_set_property_no_input_returns_none() {
2149        let store = create_test_store();
2150
2151        let mut op = SetPropertyOperator::new_for_node(
2152            Arc::clone(&store),
2153            Box::new(EmptyInput),
2154            0,
2155            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2156            vec![LogicalType::Int64],
2157        );
2158
2159        assert!(op.next().unwrap().is_none());
2160    }
2161
2162    // ── Error paths ──────────────────────────────────────────────
2163
2164    #[test]
2165    fn test_delete_node_without_detach_errors_when_edges_exist() {
2166        let store = create_test_store();
2167
2168        let n1 = store.create_node(&["Person"]);
2169        let n2 = store.create_node(&["Person"]);
2170        store.create_edge(n1, n2, "KNOWS");
2171
2172        let mut op = DeleteNodeOperator::new(
2173            Arc::clone(&store),
2174            MockInput::boxed(node_id_chunk(&[n1])),
2175            0,
2176            vec![LogicalType::Int64],
2177            false, // no detach
2178        );
2179
2180        let err = op.next().unwrap_err();
2181        match err {
2182            OperatorError::ConstraintViolation(msg) => {
2183                assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2184            }
2185            other => panic!("expected ConstraintViolation, got {other:?}"),
2186        }
2187        // Node should still exist
2188        assert_eq!(store.node_count(), 2);
2189    }
2190
2191    // ── CreateNodeOperator with input ───────────────────────────
2192
2193    #[test]
2194    fn test_create_node_with_input_operator() {
2195        let store = create_test_store();
2196
2197        // Seed node to provide input rows
2198        let existing = store.create_node(&["Seed"]);
2199
2200        let mut op = CreateNodeOperator::new(
2201            Arc::clone(&store),
2202            Some(MockInput::boxed(node_id_chunk(&[existing]))),
2203            vec!["Created".to_string()],
2204            vec![(
2205                "source".to_string(),
2206                PropertySource::Constant(Value::String("from_input".into())),
2207            )],
2208            vec![LogicalType::Int64, LogicalType::Int64], // input col + output col
2209            1,                                            // output column for new node ID
2210        );
2211
2212        let chunk = op.next().unwrap().unwrap();
2213        assert_eq!(chunk.row_count(), 1);
2214
2215        // Should have created one new node (2 total: Seed + Created)
2216        assert_eq!(store.node_count(), 2);
2217
2218        // Exhausted
2219        assert!(op.next().unwrap().is_none());
2220    }
2221
2222    // ── CreateEdgeOperator with properties and output column ────
2223
2224    #[test]
2225    // reason: test IDs are small sequential counters
2226    #[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)]
2227    fn test_create_edge_with_properties_and_output_column() {
2228        let store = create_test_store();
2229
2230        let n1 = store.create_node(&["Person"]);
2231        let n2 = store.create_node(&["Person"]);
2232
2233        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2234        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2235        builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2236        builder.advance_row();
2237
2238        let mut op = CreateEdgeOperator::new(
2239            Arc::clone(&store),
2240            MockInput::boxed(builder.finish()),
2241            0,
2242            1,
2243            "KNOWS".to_string(),
2244            vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2245        )
2246        .with_properties(vec![(
2247            "since".to_string(),
2248            PropertySource::Constant(Value::Int64(2024)),
2249        )])
2250        .with_output_column(2);
2251
2252        let chunk = op.next().unwrap().unwrap();
2253        assert_eq!(chunk.row_count(), 1);
2254        assert_eq!(store.edge_count(), 1);
2255
2256        // Verify the output chunk contains the edge ID in column 2
2257        let edge_id_raw = chunk
2258            .column(2)
2259            .and_then(|c| c.get_int64(0))
2260            .expect("edge ID should be in output column 2");
2261        let edge_id = EdgeId(edge_id_raw as u64);
2262
2263        // Verify the edge has the property
2264        let edge = store.get_edge(edge_id).expect("edge should exist");
2265        assert_eq!(
2266            edge.properties
2267                .get(&grafeo_common::types::PropertyKey::new("since")),
2268            Some(&Value::Int64(2024))
2269        );
2270    }
2271
2272    // ── SetPropertyOperator with map replacement ────────────────
2273
2274    #[test]
2275    fn test_set_property_map_replace() {
2276        use std::collections::BTreeMap;
2277
2278        let store = create_test_store();
2279
2280        let node = store.create_node(&["Person"]);
2281        store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2282
2283        let mut map = BTreeMap::new();
2284        map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2285
2286        let mut op = SetPropertyOperator::new_for_node(
2287            Arc::clone(&store),
2288            MockInput::boxed(node_id_chunk(&[node])),
2289            0,
2290            vec![(
2291                "*".to_string(),
2292                PropertySource::Constant(Value::Map(Arc::new(map))),
2293            )],
2294            vec![LogicalType::Int64],
2295        )
2296        .with_replace(true);
2297
2298        op.next().unwrap().unwrap();
2299
2300        let node_data = store.get_node(node).unwrap();
2301        // Old property should be gone
2302        assert!(
2303            node_data
2304                .properties
2305                .get(&PropertyKey::new("old_prop"))
2306                .is_none()
2307        );
2308        // New property should exist
2309        assert_eq!(
2310            node_data.properties.get(&PropertyKey::new("new_key")),
2311            Some(&Value::String("new_val".into()))
2312        );
2313    }
2314
2315    // ── SetPropertyOperator with map merge (no replace) ─────────
2316
2317    #[test]
2318    fn test_set_property_map_merge() {
2319        use std::collections::BTreeMap;
2320
2321        let store = create_test_store();
2322
2323        let node = store.create_node(&["Person"]);
2324        store.set_node_property(node, "existing", Value::Int64(42));
2325
2326        let mut map = BTreeMap::new();
2327        map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2328
2329        let mut op = SetPropertyOperator::new_for_node(
2330            Arc::clone(&store),
2331            MockInput::boxed(node_id_chunk(&[node])),
2332            0,
2333            vec![(
2334                "*".to_string(),
2335                PropertySource::Constant(Value::Map(Arc::new(map))),
2336            )],
2337            vec![LogicalType::Int64],
2338        ); // replace defaults to false
2339
2340        op.next().unwrap().unwrap();
2341
2342        let node_data = store.get_node(node).unwrap();
2343        // Existing property should still be there
2344        assert_eq!(
2345            node_data.properties.get(&PropertyKey::new("existing")),
2346            Some(&Value::Int64(42))
2347        );
2348        // New property should also exist
2349        assert_eq!(
2350            node_data.properties.get(&PropertyKey::new("added")),
2351            Some(&Value::String("hello".into()))
2352        );
2353    }
2354
2355    // ── PropertySource::PropertyAccess ──────────────────────────
2356
2357    #[test]
2358    // reason: test IDs are small sequential counters
2359    #[allow(clippy::cast_possible_wrap)]
2360    fn test_property_source_property_access() {
2361        let store = create_test_store();
2362
2363        let source_node = store.create_node(&["Source"]);
2364        store.set_node_property(source_node, "name", Value::String("Alix".into()));
2365
2366        let target_node = store.create_node(&["Target"]);
2367
2368        // Build chunk: col 0 = source node ID (Node type for PropertyAccess), col 1 = target node ID
2369        let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2370        builder.column_mut(0).unwrap().push_node_id(source_node);
2371        builder
2372            .column_mut(1)
2373            .unwrap()
2374            .push_int64(target_node.0 as i64);
2375        builder.advance_row();
2376
2377        let mut op = SetPropertyOperator::new_for_node(
2378            Arc::clone(&store),
2379            MockInput::boxed(builder.finish()),
2380            1, // entity column = target node
2381            vec![(
2382                "copied_name".to_string(),
2383                PropertySource::PropertyAccess {
2384                    column: 0,
2385                    property: "name".to_string(),
2386                },
2387            )],
2388            vec![LogicalType::Node, LogicalType::Int64],
2389        );
2390
2391        op.next().unwrap().unwrap();
2392
2393        let target_data = store.get_node(target_node).unwrap();
2394        assert_eq!(
2395            target_data.properties.get(&PropertyKey::new("copied_name")),
2396            Some(&Value::String("Alix".into()))
2397        );
2398    }
2399
2400    // ── ConstraintValidator integration ─────────────────────────
2401
2402    #[test]
2403    fn test_create_node_with_constraint_validator() {
2404        let store = create_test_store();
2405
2406        struct RejectAgeValidator;
2407        impl ConstraintValidator for RejectAgeValidator {
2408            fn validate_node_property(
2409                &self,
2410                _labels: &[String],
2411                key: &str,
2412                _value: &Value,
2413            ) -> Result<(), OperatorError> {
2414                if key == "forbidden" {
2415                    return Err(OperatorError::ConstraintViolation(
2416                        "property 'forbidden' is not allowed".to_string(),
2417                    ));
2418                }
2419                Ok(())
2420            }
2421            fn validate_node_complete(
2422                &self,
2423                _labels: &[String],
2424                _properties: &[(String, Value)],
2425            ) -> Result<(), OperatorError> {
2426                Ok(())
2427            }
2428            fn check_unique_node_property(
2429                &self,
2430                _labels: &[String],
2431                _key: &str,
2432                _value: &Value,
2433            ) -> Result<(), OperatorError> {
2434                Ok(())
2435            }
2436            fn validate_edge_property(
2437                &self,
2438                _edge_type: &str,
2439                _key: &str,
2440                _value: &Value,
2441            ) -> Result<(), OperatorError> {
2442                Ok(())
2443            }
2444            fn validate_edge_complete(
2445                &self,
2446                _edge_type: &str,
2447                _properties: &[(String, Value)],
2448            ) -> Result<(), OperatorError> {
2449                Ok(())
2450            }
2451        }
2452
2453        // Valid property should succeed
2454        let mut op = CreateNodeOperator::new(
2455            Arc::clone(&store),
2456            None,
2457            vec!["Thing".to_string()],
2458            vec![(
2459                "name".to_string(),
2460                PropertySource::Constant(Value::String("ok".into())),
2461            )],
2462            vec![LogicalType::Int64],
2463            0,
2464        )
2465        .with_validator(Arc::new(RejectAgeValidator));
2466
2467        assert!(op.next().is_ok());
2468        assert_eq!(store.node_count(), 1);
2469
2470        // Forbidden property should fail
2471        let mut op = CreateNodeOperator::new(
2472            Arc::clone(&store),
2473            None,
2474            vec!["Thing".to_string()],
2475            vec![(
2476                "forbidden".to_string(),
2477                PropertySource::Constant(Value::Int64(1)),
2478            )],
2479            vec![LogicalType::Int64],
2480            0,
2481        )
2482        .with_validator(Arc::new(RejectAgeValidator));
2483
2484        let err = op.next().unwrap_err();
2485        assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2486        // Node count should still be 2 (the node is created before validation, but the error
2487        // propagates - this tests the validation logic fires)
2488    }
2489
2490    // ── Reset behavior ──────────────────────────────────────────
2491
2492    #[test]
2493    fn test_create_node_reset_allows_re_execution() {
2494        let store = create_test_store();
2495
2496        let mut op = CreateNodeOperator::new(
2497            Arc::clone(&store),
2498            None,
2499            vec!["Person".to_string()],
2500            vec![],
2501            vec![LogicalType::Int64],
2502            0,
2503        );
2504
2505        // First execution
2506        assert!(op.next().unwrap().is_some());
2507        assert!(op.next().unwrap().is_none());
2508
2509        // Reset and re-execute
2510        op.reset();
2511        assert!(op.next().unwrap().is_some());
2512
2513        assert_eq!(store.node_count(), 2);
2514    }
2515
2516    // ── Operator name() ──────────────────────────────────────────
2517
2518    #[test]
2519    fn test_operator_names() {
2520        let store = create_test_store();
2521
2522        let op = CreateNodeOperator::new(
2523            Arc::clone(&store),
2524            None,
2525            vec![],
2526            vec![],
2527            vec![LogicalType::Int64],
2528            0,
2529        );
2530        assert_eq!(op.name(), "CreateNode");
2531
2532        let op = CreateEdgeOperator::new(
2533            Arc::clone(&store),
2534            Box::new(EmptyInput),
2535            0,
2536            1,
2537            "R".to_string(),
2538            vec![LogicalType::Int64],
2539        );
2540        assert_eq!(op.name(), "CreateEdge");
2541
2542        let op = DeleteNodeOperator::new(
2543            Arc::clone(&store),
2544            Box::new(EmptyInput),
2545            0,
2546            vec![LogicalType::Int64],
2547            false,
2548        );
2549        assert_eq!(op.name(), "DeleteNode");
2550
2551        let op = DeleteEdgeOperator::new(
2552            Arc::clone(&store),
2553            Box::new(EmptyInput),
2554            0,
2555            vec![LogicalType::Int64],
2556        );
2557        assert_eq!(op.name(), "DeleteEdge");
2558
2559        let op = AddLabelOperator::new(
2560            Arc::clone(&store),
2561            Box::new(EmptyInput),
2562            0,
2563            vec!["L".to_string()],
2564            vec![LogicalType::Int64],
2565        );
2566        assert_eq!(op.name(), "AddLabel");
2567
2568        let op = RemoveLabelOperator::new(
2569            Arc::clone(&store),
2570            Box::new(EmptyInput),
2571            0,
2572            vec!["L".to_string()],
2573            vec![LogicalType::Int64],
2574        );
2575        assert_eq!(op.name(), "RemoveLabel");
2576
2577        let op = SetPropertyOperator::new_for_node(
2578            Arc::clone(&store),
2579            Box::new(EmptyInput),
2580            0,
2581            vec![],
2582            vec![LogicalType::Int64],
2583        );
2584        assert_eq!(op.name(), "SetProperty");
2585    }
2586
2587    // ── into_any() coverage ─────────────────────────────────────
2588
2589    #[test]
2590    fn test_create_node_into_any() {
2591        let store = create_test_store();
2592        let op = CreateNodeOperator::new(
2593            Arc::clone(&store),
2594            None,
2595            vec!["Person".to_string()],
2596            vec![],
2597            vec![LogicalType::Int64],
2598            0,
2599        );
2600        let any = Box::new(op).into_any();
2601        assert!(any.downcast::<CreateNodeOperator>().is_ok());
2602    }
2603
2604    #[test]
2605    fn test_create_edge_into_any() {
2606        let store = create_test_store();
2607        let op = CreateEdgeOperator::new(
2608            Arc::clone(&store),
2609            Box::new(EmptyInput),
2610            0,
2611            1,
2612            "KNOWS".to_string(),
2613            vec![LogicalType::Int64],
2614        );
2615        let any = Box::new(op).into_any();
2616        assert!(any.downcast::<CreateEdgeOperator>().is_ok());
2617    }
2618
2619    #[test]
2620    fn test_delete_node_into_any() {
2621        let store = create_test_store();
2622        let op = DeleteNodeOperator::new(
2623            Arc::clone(&store),
2624            Box::new(EmptyInput),
2625            0,
2626            vec![LogicalType::Int64],
2627            false,
2628        );
2629        let any = Box::new(op).into_any();
2630        assert!(any.downcast::<DeleteNodeOperator>().is_ok());
2631    }
2632
2633    #[test]
2634    fn test_delete_edge_into_any() {
2635        let store = create_test_store();
2636        let op = DeleteEdgeOperator::new(
2637            Arc::clone(&store),
2638            Box::new(EmptyInput),
2639            0,
2640            vec![LogicalType::Int64],
2641        );
2642        let any = Box::new(op).into_any();
2643        assert!(any.downcast::<DeleteEdgeOperator>().is_ok());
2644    }
2645
2646    #[test]
2647    fn test_add_label_into_any() {
2648        let store = create_test_store();
2649        let op = AddLabelOperator::new(
2650            Arc::clone(&store),
2651            Box::new(EmptyInput),
2652            0,
2653            vec!["Label".to_string()],
2654            vec![LogicalType::Int64],
2655        );
2656        let any = Box::new(op).into_any();
2657        assert!(any.downcast::<AddLabelOperator>().is_ok());
2658    }
2659
2660    #[test]
2661    fn test_remove_label_into_any() {
2662        let store = create_test_store();
2663        let op = RemoveLabelOperator::new(
2664            Arc::clone(&store),
2665            Box::new(EmptyInput),
2666            0,
2667            vec!["Label".to_string()],
2668            vec![LogicalType::Int64],
2669        );
2670        let any = Box::new(op).into_any();
2671        assert!(any.downcast::<RemoveLabelOperator>().is_ok());
2672    }
2673
2674    #[test]
2675    fn test_set_property_into_any() {
2676        let store = create_test_store();
2677        let op = SetPropertyOperator::new_for_node(
2678            Arc::clone(&store),
2679            Box::new(EmptyInput),
2680            0,
2681            vec![],
2682            vec![LogicalType::Int64],
2683        );
2684        let any = Box::new(op).into_any();
2685        assert!(any.downcast::<SetPropertyOperator>().is_ok());
2686    }
2687
2688    // ── ConstraintValidator default methods ──────────────────────
2689
2690    /// A minimal validator that implements only the required methods,
2691    /// relying on defaults for the optional ones.
2692    struct MinimalValidator;
2693
2694    impl ConstraintValidator for MinimalValidator {
2695        fn validate_node_property(
2696            &self,
2697            _labels: &[String],
2698            _key: &str,
2699            _value: &Value,
2700        ) -> Result<(), OperatorError> {
2701            Ok(())
2702        }
2703        fn validate_node_complete(
2704            &self,
2705            _labels: &[String],
2706            _properties: &[(String, Value)],
2707        ) -> Result<(), OperatorError> {
2708            Ok(())
2709        }
2710        fn check_unique_node_property(
2711            &self,
2712            _labels: &[String],
2713            _key: &str,
2714            _value: &Value,
2715        ) -> Result<(), OperatorError> {
2716            Ok(())
2717        }
2718        fn validate_edge_property(
2719            &self,
2720            _edge_type: &str,
2721            _key: &str,
2722            _value: &Value,
2723        ) -> Result<(), OperatorError> {
2724            Ok(())
2725        }
2726        fn validate_edge_complete(
2727            &self,
2728            _edge_type: &str,
2729            _properties: &[(String, Value)],
2730        ) -> Result<(), OperatorError> {
2731            Ok(())
2732        }
2733    }
2734
2735    #[test]
2736    fn test_constraint_validator_default_node_labels_allowed() {
2737        let v = MinimalValidator;
2738        assert!(
2739            v.validate_node_labels_allowed(&["Person".to_string(), "Actor".to_string()])
2740                .is_ok()
2741        );
2742    }
2743
2744    #[test]
2745    fn test_constraint_validator_default_edge_type_allowed() {
2746        let v = MinimalValidator;
2747        assert!(v.validate_edge_type_allowed("KNOWS").is_ok());
2748    }
2749
2750    #[test]
2751    fn test_constraint_validator_default_edge_endpoints() {
2752        let v = MinimalValidator;
2753        assert!(
2754            v.validate_edge_endpoints("KNOWS", &["Person".to_string()], &["Person".to_string()],)
2755                .is_ok()
2756        );
2757    }
2758
2759    #[test]
2760    fn test_constraint_validator_default_inject_defaults() {
2761        let v = MinimalValidator;
2762        let mut props = vec![("name".to_string(), Value::String("Alix".into()))];
2763        v.inject_defaults(&["Person".to_string()], &mut props);
2764        // Default impl is a no-op
2765        assert_eq!(props.len(), 1);
2766    }
2767
2768    // ── PropertySource tests ────────────────────────────────────
2769
2770    #[test]
2771    fn test_property_source_column() {
2772        let store = LpgStore::new().unwrap();
2773        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
2774        builder.column_mut(0).unwrap().push_int64(42);
2775        builder.advance_row();
2776        let chunk = builder.finish();
2777
2778        let src = PropertySource::Column(0);
2779        assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(42));
2780    }
2781
2782    #[test]
2783    fn test_property_source_constant() {
2784        let store = LpgStore::new().unwrap();
2785        let chunk = DataChunk::empty();
2786
2787        let src = PropertySource::Constant(Value::String("hello".into()));
2788        assert_eq!(
2789            src.resolve(&chunk, 0, &store),
2790            Value::String("hello".into()),
2791        );
2792    }
2793
2794    #[test]
2795    fn test_property_source_column_out_of_bounds() {
2796        let store = LpgStore::new().unwrap();
2797        let chunk = DataChunk::empty();
2798
2799        let src = PropertySource::Column(99);
2800        assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2801    }
2802
2803    #[test]
2804    fn test_property_source_property_access_from_map() {
2805        let store = LpgStore::new().unwrap();
2806        let mut map = std::collections::BTreeMap::new();
2807        map.insert(PropertyKey::new("age"), Value::Int64(30));
2808
2809        let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
2810        builder
2811            .column_mut(0)
2812            .unwrap()
2813            .push_value(Value::Map(Arc::new(map)));
2814        builder.advance_row();
2815        let chunk = builder.finish();
2816
2817        let src = PropertySource::PropertyAccess {
2818            column: 0,
2819            property: "age".to_string(),
2820        };
2821        assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(30));
2822    }
2823
2824    #[test]
2825    fn test_property_source_property_access_missing_column() {
2826        let store = LpgStore::new().unwrap();
2827        let chunk = DataChunk::empty();
2828
2829        let src = PropertySource::PropertyAccess {
2830            column: 99,
2831            property: "name".to_string(),
2832        };
2833        assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2834    }
2835}