Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<dyn GraphStoreMut>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49    /// Access a named property from a map/node/edge in an input column.
50    PropertyAccess {
51        /// The column containing the map, node ID, or edge ID.
52        column: usize,
53        /// The property name to extract.
54        property: String,
55    },
56}
57
58impl PropertySource {
59    /// Resolves a property value from a data chunk row.
60    pub fn resolve(
61        &self,
62        chunk: &crate::execution::chunk::DataChunk,
63        row: usize,
64        store: &dyn GraphStore,
65    ) -> Value {
66        match self {
67            PropertySource::Column(col_idx) => chunk
68                .column(*col_idx)
69                .and_then(|c| c.get_value(row))
70                .unwrap_or(Value::Null),
71            PropertySource::Constant(v) => v.clone(),
72            PropertySource::PropertyAccess { column, property } => {
73                let Some(col) = chunk.column(*column) else {
74                    return Value::Null;
75                };
76                // Try node ID first, then edge ID, then map value
77                if let Some(node_id) = col.get_node_id(row) {
78                    store
79                        .get_node(node_id)
80                        .and_then(|node| node.get_property(property).cloned())
81                        .unwrap_or(Value::Null)
82                } else if let Some(edge_id) = col.get_edge_id(row) {
83                    store
84                        .get_edge(edge_id)
85                        .and_then(|edge| edge.get_property(property).cloned())
86                        .unwrap_or(Value::Null)
87                } else if let Some(Value::Map(map)) = col.get_value(row) {
88                    let key = PropertyKey::new(property);
89                    map.get(&key).cloned().unwrap_or(Value::Null)
90                } else {
91                    Value::Null
92                }
93            }
94        }
95    }
96}
97
98impl CreateNodeOperator {
99    /// Creates a new node creation operator.
100    ///
101    /// # Arguments
102    /// * `store` - The graph store to modify.
103    /// * `input` - Optional input operator (None for standalone CREATE).
104    /// * `labels` - Labels to assign to created nodes.
105    /// * `properties` - Properties to set on created nodes.
106    /// * `output_schema` - Schema of the output.
107    /// * `output_column` - Column index where the created node ID goes.
108    pub fn new(
109        store: Arc<dyn GraphStoreMut>,
110        input: Option<Box<dyn Operator>>,
111        labels: Vec<String>,
112        properties: Vec<(String, PropertySource)>,
113        output_schema: Vec<LogicalType>,
114        output_column: usize,
115    ) -> Self {
116        Self {
117            store,
118            input,
119            labels,
120            properties,
121            output_schema,
122            output_column,
123            executed: false,
124            viewing_epoch: None,
125            tx_id: None,
126        }
127    }
128
129    /// Sets the transaction context for MVCC versioning.
130    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
131        self.viewing_epoch = Some(epoch);
132        self.tx_id = tx_id;
133        self
134    }
135}
136
137impl Operator for CreateNodeOperator {
138    fn next(&mut self) -> OperatorResult {
139        // Get transaction context for versioned creation
140        let epoch = self
141            .viewing_epoch
142            .unwrap_or_else(|| self.store.current_epoch());
143        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
144
145        if let Some(ref mut input) = self.input {
146            // For each input row, create a node
147            if let Some(chunk) = input.next()? {
148                let mut builder =
149                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
150
151                for row in chunk.selected_indices() {
152                    // Create the node with MVCC versioning
153                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
154                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
155
156                    // Set properties
157                    for (prop_name, source) in &self.properties {
158                        let value =
159                            source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
160                        self.store.set_node_property(node_id, prop_name, value);
161                    }
162
163                    // Copy input columns to output
164                    for col_idx in 0..chunk.column_count() {
165                        if col_idx < self.output_column
166                            && let (Some(src), Some(dst)) =
167                                (chunk.column(col_idx), builder.column_mut(col_idx))
168                        {
169                            if let Some(val) = src.get_value(row) {
170                                dst.push_value(val);
171                            } else {
172                                dst.push_value(Value::Null);
173                            }
174                        }
175                    }
176
177                    // Add the new node ID
178                    if let Some(dst) = builder.column_mut(self.output_column) {
179                        dst.push_value(Value::Int64(node_id.0 as i64));
180                    }
181
182                    builder.advance_row();
183                }
184
185                return Ok(Some(builder.finish()));
186            }
187            Ok(None)
188        } else {
189            // No input - create a single node
190            if self.executed {
191                return Ok(None);
192            }
193            self.executed = true;
194
195            // Create the node with MVCC versioning
196            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
197            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
198
199            // Set properties from constants only
200            for (prop_name, source) in &self.properties {
201                if let PropertySource::Constant(value) = source {
202                    self.store
203                        .set_node_property(node_id, prop_name, value.clone());
204                }
205            }
206
207            // Build output chunk with just the node ID
208            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
209            if let Some(dst) = builder.column_mut(self.output_column) {
210                dst.push_value(Value::Int64(node_id.0 as i64));
211            }
212            builder.advance_row();
213
214            Ok(Some(builder.finish()))
215        }
216    }
217
218    fn reset(&mut self) {
219        if let Some(ref mut input) = self.input {
220            input.reset();
221        }
222        self.executed = false;
223    }
224
225    fn name(&self) -> &'static str {
226        "CreateNode"
227    }
228}
229
230/// Operator that creates new edges.
231pub struct CreateEdgeOperator {
232    /// The graph store to modify.
233    store: Arc<dyn GraphStoreMut>,
234    /// Input operator.
235    input: Box<dyn Operator>,
236    /// Column index for the source node.
237    from_column: usize,
238    /// Column index for the target node.
239    to_column: usize,
240    /// Edge type.
241    edge_type: String,
242    /// Properties to set.
243    properties: Vec<(String, PropertySource)>,
244    /// Output schema.
245    output_schema: Vec<LogicalType>,
246    /// Column index for the created edge variable (if any).
247    output_column: Option<usize>,
248    /// Epoch for MVCC versioning.
249    viewing_epoch: Option<EpochId>,
250    /// Transaction ID for MVCC versioning.
251    tx_id: Option<TxId>,
252}
253
254impl CreateEdgeOperator {
255    /// Creates a new edge creation operator.
256    ///
257    /// Use builder methods to set additional options:
258    /// - [`with_properties`](Self::with_properties) - set edge properties
259    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
260    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
261    pub fn new(
262        store: Arc<dyn GraphStoreMut>,
263        input: Box<dyn Operator>,
264        from_column: usize,
265        to_column: usize,
266        edge_type: String,
267        output_schema: Vec<LogicalType>,
268    ) -> Self {
269        Self {
270            store,
271            input,
272            from_column,
273            to_column,
274            edge_type,
275            properties: Vec::new(),
276            output_schema,
277            output_column: None,
278            viewing_epoch: None,
279            tx_id: None,
280        }
281    }
282
283    /// Sets the properties to assign to created edges.
284    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
285        self.properties = properties;
286        self
287    }
288
289    /// Sets the output column for the created edge ID.
290    pub fn with_output_column(mut self, column: usize) -> Self {
291        self.output_column = Some(column);
292        self
293    }
294
295    /// Sets the transaction context for MVCC versioning.
296    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
297        self.viewing_epoch = Some(epoch);
298        self.tx_id = tx_id;
299        self
300    }
301}
302
303impl Operator for CreateEdgeOperator {
304    fn next(&mut self) -> OperatorResult {
305        // Get transaction context for versioned creation
306        let epoch = self
307            .viewing_epoch
308            .unwrap_or_else(|| self.store.current_epoch());
309        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
310
311        if let Some(chunk) = self.input.next()? {
312            let mut builder =
313                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
314
315            for row in chunk.selected_indices() {
316                // Get source and target node IDs
317                let from_id = chunk
318                    .column(self.from_column)
319                    .and_then(|c| c.get_value(row))
320                    .ok_or_else(|| {
321                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
322                    })?;
323
324                let to_id = chunk
325                    .column(self.to_column)
326                    .and_then(|c| c.get_value(row))
327                    .ok_or_else(|| {
328                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
329                    })?;
330
331                // Extract node IDs
332                let from_node_id = match from_id {
333                    Value::Int64(id) => NodeId(id as u64),
334                    _ => {
335                        return Err(OperatorError::TypeMismatch {
336                            expected: "Int64 (node ID)".to_string(),
337                            found: format!("{from_id:?}"),
338                        });
339                    }
340                };
341
342                let to_node_id = match to_id {
343                    Value::Int64(id) => NodeId(id as u64),
344                    _ => {
345                        return Err(OperatorError::TypeMismatch {
346                            expected: "Int64 (node ID)".to_string(),
347                            found: format!("{to_id:?}"),
348                        });
349                    }
350                };
351
352                // Create the edge with MVCC versioning
353                let edge_id = self.store.create_edge_versioned(
354                    from_node_id,
355                    to_node_id,
356                    &self.edge_type,
357                    epoch,
358                    tx,
359                );
360
361                // Set properties
362                for (prop_name, source) in &self.properties {
363                    let value = source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
364                    self.store.set_edge_property(edge_id, prop_name, value);
365                }
366
367                // Copy input columns
368                for col_idx in 0..chunk.column_count() {
369                    if let (Some(src), Some(dst)) =
370                        (chunk.column(col_idx), builder.column_mut(col_idx))
371                    {
372                        if let Some(val) = src.get_value(row) {
373                            dst.push_value(val);
374                        } else {
375                            dst.push_value(Value::Null);
376                        }
377                    }
378                }
379
380                // Add edge ID if requested
381                if let Some(out_col) = self.output_column
382                    && let Some(dst) = builder.column_mut(out_col)
383                {
384                    dst.push_value(Value::Int64(edge_id.0 as i64));
385                }
386
387                builder.advance_row();
388            }
389
390            return Ok(Some(builder.finish()));
391        }
392        Ok(None)
393    }
394
395    fn reset(&mut self) {
396        self.input.reset();
397    }
398
399    fn name(&self) -> &'static str {
400        "CreateEdge"
401    }
402}
403
404/// Operator that deletes nodes.
405pub struct DeleteNodeOperator {
406    /// The graph store to modify.
407    store: Arc<dyn GraphStoreMut>,
408    /// Input operator.
409    input: Box<dyn Operator>,
410    /// Column index for the node to delete.
411    node_column: usize,
412    /// Output schema.
413    output_schema: Vec<LogicalType>,
414    /// Whether to detach (delete connected edges) before deleting.
415    detach: bool,
416    /// Epoch for MVCC versioning.
417    viewing_epoch: Option<EpochId>,
418    /// Transaction ID for MVCC versioning.
419    tx_id: Option<TxId>,
420}
421
422impl DeleteNodeOperator {
423    /// Creates a new node deletion operator.
424    pub fn new(
425        store: Arc<dyn GraphStoreMut>,
426        input: Box<dyn Operator>,
427        node_column: usize,
428        output_schema: Vec<LogicalType>,
429        detach: bool,
430    ) -> Self {
431        Self {
432            store,
433            input,
434            node_column,
435            output_schema,
436            detach,
437            viewing_epoch: None,
438            tx_id: None,
439        }
440    }
441
442    /// Sets the transaction context for MVCC versioning.
443    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
444        self.viewing_epoch = Some(epoch);
445        self.tx_id = tx_id;
446        self
447    }
448}
449
450impl Operator for DeleteNodeOperator {
451    fn next(&mut self) -> OperatorResult {
452        // Get transaction context for versioned deletion
453        let epoch = self
454            .viewing_epoch
455            .unwrap_or_else(|| self.store.current_epoch());
456        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
457
458        if let Some(chunk) = self.input.next()? {
459            let mut deleted_count = 0;
460
461            for row in chunk.selected_indices() {
462                let node_val = chunk
463                    .column(self.node_column)
464                    .and_then(|c| c.get_value(row))
465                    .ok_or_else(|| {
466                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
467                    })?;
468
469                let node_id = match node_val {
470                    Value::Int64(id) => NodeId(id as u64),
471                    _ => {
472                        return Err(OperatorError::TypeMismatch {
473                            expected: "Int64 (node ID)".to_string(),
474                            found: format!("{node_val:?}"),
475                        });
476                    }
477                };
478
479                if self.detach {
480                    // Delete all connected edges first
481                    self.store.delete_node_edges(node_id);
482                }
483
484                // Delete the node with MVCC versioning
485                if self.store.delete_node_versioned(node_id, epoch, tx) {
486                    deleted_count += 1;
487                }
488            }
489
490            // Return a chunk with the delete count
491            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
492            if let Some(dst) = builder.column_mut(0) {
493                dst.push_value(Value::Int64(deleted_count));
494            }
495            builder.advance_row();
496
497            return Ok(Some(builder.finish()));
498        }
499        Ok(None)
500    }
501
502    fn reset(&mut self) {
503        self.input.reset();
504    }
505
506    fn name(&self) -> &'static str {
507        "DeleteNode"
508    }
509}
510
511/// Operator that deletes edges.
512pub struct DeleteEdgeOperator {
513    /// The graph store to modify.
514    store: Arc<dyn GraphStoreMut>,
515    /// Input operator.
516    input: Box<dyn Operator>,
517    /// Column index for the edge to delete.
518    edge_column: usize,
519    /// Output schema.
520    output_schema: Vec<LogicalType>,
521    /// Epoch for MVCC versioning.
522    viewing_epoch: Option<EpochId>,
523    /// Transaction ID for MVCC versioning.
524    tx_id: Option<TxId>,
525}
526
527impl DeleteEdgeOperator {
528    /// Creates a new edge deletion operator.
529    pub fn new(
530        store: Arc<dyn GraphStoreMut>,
531        input: Box<dyn Operator>,
532        edge_column: usize,
533        output_schema: Vec<LogicalType>,
534    ) -> Self {
535        Self {
536            store,
537            input,
538            edge_column,
539            output_schema,
540            viewing_epoch: None,
541            tx_id: None,
542        }
543    }
544
545    /// Sets the transaction context for MVCC versioning.
546    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
547        self.viewing_epoch = Some(epoch);
548        self.tx_id = tx_id;
549        self
550    }
551}
552
553impl Operator for DeleteEdgeOperator {
554    fn next(&mut self) -> OperatorResult {
555        // Get transaction context for versioned deletion
556        let epoch = self
557            .viewing_epoch
558            .unwrap_or_else(|| self.store.current_epoch());
559        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
560
561        if let Some(chunk) = self.input.next()? {
562            let mut deleted_count = 0;
563
564            for row in chunk.selected_indices() {
565                let edge_val = chunk
566                    .column(self.edge_column)
567                    .and_then(|c| c.get_value(row))
568                    .ok_or_else(|| {
569                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
570                    })?;
571
572                let edge_id = match edge_val {
573                    Value::Int64(id) => EdgeId(id as u64),
574                    _ => {
575                        return Err(OperatorError::TypeMismatch {
576                            expected: "Int64 (edge ID)".to_string(),
577                            found: format!("{edge_val:?}"),
578                        });
579                    }
580                };
581
582                // Delete the edge with MVCC versioning
583                if self.store.delete_edge_versioned(edge_id, epoch, tx) {
584                    deleted_count += 1;
585                }
586            }
587
588            // Return a chunk with the delete count
589            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
590            if let Some(dst) = builder.column_mut(0) {
591                dst.push_value(Value::Int64(deleted_count));
592            }
593            builder.advance_row();
594
595            return Ok(Some(builder.finish()));
596        }
597        Ok(None)
598    }
599
600    fn reset(&mut self) {
601        self.input.reset();
602    }
603
604    fn name(&self) -> &'static str {
605        "DeleteEdge"
606    }
607}
608
609/// Operator that adds labels to nodes.
610pub struct AddLabelOperator {
611    /// The graph store.
612    store: Arc<dyn GraphStoreMut>,
613    /// Child operator providing nodes.
614    input: Box<dyn Operator>,
615    /// Column index containing node IDs.
616    node_column: usize,
617    /// Labels to add.
618    labels: Vec<String>,
619    /// Output schema.
620    output_schema: Vec<LogicalType>,
621}
622
623impl AddLabelOperator {
624    /// Creates a new add label operator.
625    pub fn new(
626        store: Arc<dyn GraphStoreMut>,
627        input: Box<dyn Operator>,
628        node_column: usize,
629        labels: Vec<String>,
630        output_schema: Vec<LogicalType>,
631    ) -> Self {
632        Self {
633            store,
634            input,
635            node_column,
636            labels,
637            output_schema,
638        }
639    }
640}
641
642impl Operator for AddLabelOperator {
643    fn next(&mut self) -> OperatorResult {
644        if let Some(chunk) = self.input.next()? {
645            let mut updated_count = 0;
646
647            for row in chunk.selected_indices() {
648                let node_val = chunk
649                    .column(self.node_column)
650                    .and_then(|c| c.get_value(row))
651                    .ok_or_else(|| {
652                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
653                    })?;
654
655                let node_id = match node_val {
656                    Value::Int64(id) => NodeId(id as u64),
657                    _ => {
658                        return Err(OperatorError::TypeMismatch {
659                            expected: "Int64 (node ID)".to_string(),
660                            found: format!("{node_val:?}"),
661                        });
662                    }
663                };
664
665                // Add all labels
666                for label in &self.labels {
667                    if self.store.add_label(node_id, label) {
668                        updated_count += 1;
669                    }
670                }
671            }
672
673            // Return a chunk with the update count
674            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
675            if let Some(dst) = builder.column_mut(0) {
676                dst.push_value(Value::Int64(updated_count));
677            }
678            builder.advance_row();
679
680            return Ok(Some(builder.finish()));
681        }
682        Ok(None)
683    }
684
685    fn reset(&mut self) {
686        self.input.reset();
687    }
688
689    fn name(&self) -> &'static str {
690        "AddLabel"
691    }
692}
693
694/// Operator that removes labels from nodes.
695pub struct RemoveLabelOperator {
696    /// The graph store.
697    store: Arc<dyn GraphStoreMut>,
698    /// Child operator providing nodes.
699    input: Box<dyn Operator>,
700    /// Column index containing node IDs.
701    node_column: usize,
702    /// Labels to remove.
703    labels: Vec<String>,
704    /// Output schema.
705    output_schema: Vec<LogicalType>,
706}
707
708impl RemoveLabelOperator {
709    /// Creates a new remove label operator.
710    pub fn new(
711        store: Arc<dyn GraphStoreMut>,
712        input: Box<dyn Operator>,
713        node_column: usize,
714        labels: Vec<String>,
715        output_schema: Vec<LogicalType>,
716    ) -> Self {
717        Self {
718            store,
719            input,
720            node_column,
721            labels,
722            output_schema,
723        }
724    }
725}
726
727impl Operator for RemoveLabelOperator {
728    fn next(&mut self) -> OperatorResult {
729        if let Some(chunk) = self.input.next()? {
730            let mut updated_count = 0;
731
732            for row in chunk.selected_indices() {
733                let node_val = chunk
734                    .column(self.node_column)
735                    .and_then(|c| c.get_value(row))
736                    .ok_or_else(|| {
737                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
738                    })?;
739
740                let node_id = match node_val {
741                    Value::Int64(id) => NodeId(id as u64),
742                    _ => {
743                        return Err(OperatorError::TypeMismatch {
744                            expected: "Int64 (node ID)".to_string(),
745                            found: format!("{node_val:?}"),
746                        });
747                    }
748                };
749
750                // Remove all labels
751                for label in &self.labels {
752                    if self.store.remove_label(node_id, label) {
753                        updated_count += 1;
754                    }
755                }
756            }
757
758            // Return a chunk with the update count
759            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
760            if let Some(dst) = builder.column_mut(0) {
761                dst.push_value(Value::Int64(updated_count));
762            }
763            builder.advance_row();
764
765            return Ok(Some(builder.finish()));
766        }
767        Ok(None)
768    }
769
770    fn reset(&mut self) {
771        self.input.reset();
772    }
773
774    fn name(&self) -> &'static str {
775        "RemoveLabel"
776    }
777}
778
779/// Operator that sets properties on nodes or edges.
780///
781/// This operator reads node/edge IDs from a column and sets the
782/// specified properties on each entity.
783pub struct SetPropertyOperator {
784    /// The graph store.
785    store: Arc<dyn GraphStoreMut>,
786    /// Child operator providing entities.
787    input: Box<dyn Operator>,
788    /// Column index containing entity IDs (node or edge).
789    entity_column: usize,
790    /// Whether the entity is an edge (false = node).
791    is_edge: bool,
792    /// Properties to set (name -> source).
793    properties: Vec<(String, PropertySource)>,
794    /// Output schema.
795    output_schema: Vec<LogicalType>,
796}
797
798impl SetPropertyOperator {
799    /// Creates a new set property operator for nodes.
800    pub fn new_for_node(
801        store: Arc<dyn GraphStoreMut>,
802        input: Box<dyn Operator>,
803        node_column: usize,
804        properties: Vec<(String, PropertySource)>,
805        output_schema: Vec<LogicalType>,
806    ) -> Self {
807        Self {
808            store,
809            input,
810            entity_column: node_column,
811            is_edge: false,
812            properties,
813            output_schema,
814        }
815    }
816
817    /// Creates a new set property operator for edges.
818    pub fn new_for_edge(
819        store: Arc<dyn GraphStoreMut>,
820        input: Box<dyn Operator>,
821        edge_column: usize,
822        properties: Vec<(String, PropertySource)>,
823        output_schema: Vec<LogicalType>,
824    ) -> Self {
825        Self {
826            store,
827            input,
828            entity_column: edge_column,
829            is_edge: true,
830            properties,
831            output_schema,
832        }
833    }
834}
835
836impl Operator for SetPropertyOperator {
837    fn next(&mut self) -> OperatorResult {
838        if let Some(chunk) = self.input.next()? {
839            let mut builder =
840                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
841
842            for row in chunk.selected_indices() {
843                let entity_val = chunk
844                    .column(self.entity_column)
845                    .and_then(|c| c.get_value(row))
846                    .ok_or_else(|| {
847                        OperatorError::ColumnNotFound(format!(
848                            "entity column {}",
849                            self.entity_column
850                        ))
851                    })?;
852
853                let entity_id = match entity_val {
854                    Value::Int64(id) => id as u64,
855                    _ => {
856                        return Err(OperatorError::TypeMismatch {
857                            expected: "Int64 (entity ID)".to_string(),
858                            found: format!("{entity_val:?}"),
859                        });
860                    }
861                };
862
863                // Set all properties
864                for (prop_name, source) in &self.properties {
865                    let value = source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
866
867                    if self.is_edge {
868                        self.store
869                            .set_edge_property(EdgeId(entity_id), prop_name, value);
870                    } else {
871                        self.store
872                            .set_node_property(NodeId(entity_id), prop_name, value);
873                    }
874                }
875
876                // Copy input columns to output
877                for col_idx in 0..chunk.column_count() {
878                    if let (Some(src), Some(dst)) =
879                        (chunk.column(col_idx), builder.column_mut(col_idx))
880                    {
881                        if let Some(val) = src.get_value(row) {
882                            dst.push_value(val);
883                        } else {
884                            dst.push_value(Value::Null);
885                        }
886                    }
887                }
888
889                builder.advance_row();
890            }
891
892            return Ok(Some(builder.finish()));
893        }
894        Ok(None)
895    }
896
897    fn reset(&mut self) {
898        self.input.reset();
899    }
900
901    fn name(&self) -> &'static str {
902        "SetProperty"
903    }
904}
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909    use crate::execution::DataChunk;
910    use crate::execution::chunk::DataChunkBuilder;
911    use crate::graph::lpg::LpgStore;
912
913    fn create_test_store() -> Arc<dyn GraphStoreMut> {
914        Arc::new(LpgStore::new())
915    }
916
917    #[test]
918    fn test_create_node_standalone() {
919        let store = create_test_store();
920
921        let mut op = CreateNodeOperator::new(
922            Arc::clone(&store),
923            None,
924            vec!["Person".to_string()],
925            vec![(
926                "name".to_string(),
927                PropertySource::Constant(Value::String("Alice".into())),
928            )],
929            vec![LogicalType::Int64],
930            0,
931        );
932
933        // First call should create a node
934        let chunk = op.next().unwrap().unwrap();
935        assert_eq!(chunk.row_count(), 1);
936
937        // Second call should return None
938        assert!(op.next().unwrap().is_none());
939
940        // Verify node was created
941        assert_eq!(store.node_count(), 1);
942    }
943
944    #[test]
945    fn test_create_edge() {
946        let store = create_test_store();
947
948        // Create two nodes first
949        let node1 = store.create_node(&["Person"]);
950        let node2 = store.create_node(&["Person"]);
951
952        // Create input chunk with node IDs
953        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
954        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
955        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
956        builder.advance_row();
957        let input_chunk = builder.finish();
958
959        // Mock input operator
960        struct MockInput {
961            chunk: Option<DataChunk>,
962        }
963        impl Operator for MockInput {
964            fn next(&mut self) -> OperatorResult {
965                Ok(self.chunk.take())
966            }
967            fn reset(&mut self) {}
968            fn name(&self) -> &'static str {
969                "MockInput"
970            }
971        }
972
973        let mut op = CreateEdgeOperator::new(
974            Arc::clone(&store),
975            Box::new(MockInput {
976                chunk: Some(input_chunk),
977            }),
978            0, // from column
979            1, // to column
980            "KNOWS".to_string(),
981            vec![LogicalType::Int64, LogicalType::Int64],
982        );
983
984        // Execute
985        let _chunk = op.next().unwrap().unwrap();
986
987        // Verify edge was created
988        assert_eq!(store.edge_count(), 1);
989    }
990
991    #[test]
992    fn test_delete_node() {
993        let store = create_test_store();
994
995        // Create a node
996        let node_id = store.create_node(&["Person"]);
997        assert_eq!(store.node_count(), 1);
998
999        // Create input chunk with the node ID
1000        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1001        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
1002        builder.advance_row();
1003        let input_chunk = builder.finish();
1004
1005        struct MockInput {
1006            chunk: Option<DataChunk>,
1007        }
1008        impl Operator for MockInput {
1009            fn next(&mut self) -> OperatorResult {
1010                Ok(self.chunk.take())
1011            }
1012            fn reset(&mut self) {}
1013            fn name(&self) -> &'static str {
1014                "MockInput"
1015            }
1016        }
1017
1018        let mut op = DeleteNodeOperator::new(
1019            Arc::clone(&store),
1020            Box::new(MockInput {
1021                chunk: Some(input_chunk),
1022            }),
1023            0,
1024            vec![LogicalType::Int64],
1025            false,
1026        );
1027
1028        // Execute
1029        let chunk = op.next().unwrap().unwrap();
1030
1031        // Verify deletion
1032        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1033        assert_eq!(deleted, 1);
1034        assert_eq!(store.node_count(), 0);
1035    }
1036
1037    // ── Helper: reusable MockInput ───────────────────────────────
1038
1039    struct MockInput {
1040        chunk: Option<DataChunk>,
1041    }
1042
1043    impl MockInput {
1044        fn boxed(chunk: DataChunk) -> Box<Self> {
1045            Box::new(Self { chunk: Some(chunk) })
1046        }
1047    }
1048
1049    impl Operator for MockInput {
1050        fn next(&mut self) -> OperatorResult {
1051            Ok(self.chunk.take())
1052        }
1053        fn reset(&mut self) {}
1054        fn name(&self) -> &'static str {
1055            "MockInput"
1056        }
1057    }
1058
1059    // ── DeleteEdgeOperator ───────────────────────────────────────
1060
1061    #[test]
1062    fn test_delete_edge() {
1063        let store = create_test_store();
1064
1065        let n1 = store.create_node(&["Person"]);
1066        let n2 = store.create_node(&["Person"]);
1067        let eid = store.create_edge(n1, n2, "KNOWS");
1068        assert_eq!(store.edge_count(), 1);
1069
1070        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1071        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1072        builder.advance_row();
1073
1074        let mut op = DeleteEdgeOperator::new(
1075            Arc::clone(&store),
1076            MockInput::boxed(builder.finish()),
1077            0,
1078            vec![LogicalType::Int64],
1079        );
1080
1081        let chunk = op.next().unwrap().unwrap();
1082        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1083        assert_eq!(deleted, 1);
1084        assert_eq!(store.edge_count(), 0);
1085    }
1086
1087    #[test]
1088    fn test_delete_edge_no_input_returns_none() {
1089        let store = create_test_store();
1090
1091        // Empty chunk: MockInput returns None immediately
1092        struct EmptyInput;
1093        impl Operator for EmptyInput {
1094            fn next(&mut self) -> OperatorResult {
1095                Ok(None)
1096            }
1097            fn reset(&mut self) {}
1098            fn name(&self) -> &'static str {
1099                "EmptyInput"
1100            }
1101        }
1102
1103        let mut op = DeleteEdgeOperator::new(
1104            Arc::clone(&store),
1105            Box::new(EmptyInput),
1106            0,
1107            vec![LogicalType::Int64],
1108        );
1109
1110        assert!(op.next().unwrap().is_none());
1111    }
1112
1113    #[test]
1114    fn test_delete_multiple_edges() {
1115        let store = create_test_store();
1116
1117        let n1 = store.create_node(&["N"]);
1118        let n2 = store.create_node(&["N"]);
1119        let e1 = store.create_edge(n1, n2, "R");
1120        let e2 = store.create_edge(n2, n1, "S");
1121        assert_eq!(store.edge_count(), 2);
1122
1123        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1124        builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1125        builder.advance_row();
1126        builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1127        builder.advance_row();
1128
1129        let mut op = DeleteEdgeOperator::new(
1130            Arc::clone(&store),
1131            MockInput::boxed(builder.finish()),
1132            0,
1133            vec![LogicalType::Int64],
1134        );
1135
1136        let chunk = op.next().unwrap().unwrap();
1137        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1138        assert_eq!(deleted, 2);
1139        assert_eq!(store.edge_count(), 0);
1140    }
1141
1142    // ── DeleteNodeOperator with DETACH ───────────────────────────
1143
1144    #[test]
1145    fn test_delete_node_detach() {
1146        let store = create_test_store();
1147
1148        let n1 = store.create_node(&["Person"]);
1149        let n2 = store.create_node(&["Person"]);
1150        store.create_edge(n1, n2, "KNOWS");
1151        store.create_edge(n2, n1, "FOLLOWS");
1152        assert_eq!(store.edge_count(), 2);
1153
1154        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1155        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1156        builder.advance_row();
1157
1158        let mut op = DeleteNodeOperator::new(
1159            Arc::clone(&store),
1160            MockInput::boxed(builder.finish()),
1161            0,
1162            vec![LogicalType::Int64],
1163            true, // detach = true
1164        );
1165
1166        let chunk = op.next().unwrap().unwrap();
1167        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1168        assert_eq!(deleted, 1);
1169        assert_eq!(store.node_count(), 1);
1170        assert_eq!(store.edge_count(), 0); // edges detached
1171    }
1172
1173    // ── AddLabelOperator ─────────────────────────────────────────
1174
1175    #[test]
1176    fn test_add_label() {
1177        let store = create_test_store();
1178
1179        let node = store.create_node(&["Person"]);
1180
1181        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1182        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1183        builder.advance_row();
1184
1185        let mut op = AddLabelOperator::new(
1186            Arc::clone(&store),
1187            MockInput::boxed(builder.finish()),
1188            0,
1189            vec!["Employee".to_string()],
1190            vec![LogicalType::Int64],
1191        );
1192
1193        let chunk = op.next().unwrap().unwrap();
1194        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1195        assert_eq!(updated, 1);
1196
1197        // Verify label was added
1198        let node_data = store.get_node(node).unwrap();
1199        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1200        assert!(labels.contains(&"Person"));
1201        assert!(labels.contains(&"Employee"));
1202    }
1203
1204    #[test]
1205    fn test_add_multiple_labels() {
1206        let store = create_test_store();
1207
1208        let node = store.create_node(&["Base"]);
1209
1210        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1211        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1212        builder.advance_row();
1213
1214        let mut op = AddLabelOperator::new(
1215            Arc::clone(&store),
1216            MockInput::boxed(builder.finish()),
1217            0,
1218            vec!["LabelA".to_string(), "LabelB".to_string()],
1219            vec![LogicalType::Int64],
1220        );
1221
1222        let chunk = op.next().unwrap().unwrap();
1223        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1224        assert_eq!(updated, 2); // 2 labels added
1225
1226        let node_data = store.get_node(node).unwrap();
1227        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1228        assert!(labels.contains(&"LabelA"));
1229        assert!(labels.contains(&"LabelB"));
1230    }
1231
1232    #[test]
1233    fn test_add_label_no_input_returns_none() {
1234        let store = create_test_store();
1235
1236        struct EmptyInput;
1237        impl Operator for EmptyInput {
1238            fn next(&mut self) -> OperatorResult {
1239                Ok(None)
1240            }
1241            fn reset(&mut self) {}
1242            fn name(&self) -> &'static str {
1243                "EmptyInput"
1244            }
1245        }
1246
1247        let mut op = AddLabelOperator::new(
1248            Arc::clone(&store),
1249            Box::new(EmptyInput),
1250            0,
1251            vec!["Foo".to_string()],
1252            vec![LogicalType::Int64],
1253        );
1254
1255        assert!(op.next().unwrap().is_none());
1256    }
1257
1258    // ── RemoveLabelOperator ──────────────────────────────────────
1259
1260    #[test]
1261    fn test_remove_label() {
1262        let store = create_test_store();
1263
1264        let node = store.create_node(&["Person", "Employee"]);
1265
1266        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1267        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1268        builder.advance_row();
1269
1270        let mut op = RemoveLabelOperator::new(
1271            Arc::clone(&store),
1272            MockInput::boxed(builder.finish()),
1273            0,
1274            vec!["Employee".to_string()],
1275            vec![LogicalType::Int64],
1276        );
1277
1278        let chunk = op.next().unwrap().unwrap();
1279        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1280        assert_eq!(updated, 1);
1281
1282        // Verify label was removed
1283        let node_data = store.get_node(node).unwrap();
1284        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1285        assert!(labels.contains(&"Person"));
1286        assert!(!labels.contains(&"Employee"));
1287    }
1288
1289    #[test]
1290    fn test_remove_nonexistent_label() {
1291        let store = create_test_store();
1292
1293        let node = store.create_node(&["Person"]);
1294
1295        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1296        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1297        builder.advance_row();
1298
1299        let mut op = RemoveLabelOperator::new(
1300            Arc::clone(&store),
1301            MockInput::boxed(builder.finish()),
1302            0,
1303            vec!["NonExistent".to_string()],
1304            vec![LogicalType::Int64],
1305        );
1306
1307        let chunk = op.next().unwrap().unwrap();
1308        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1309        assert_eq!(updated, 0); // nothing removed
1310    }
1311
1312    // ── SetPropertyOperator ──────────────────────────────────────
1313
1314    #[test]
1315    fn test_set_node_property_constant() {
1316        let store = create_test_store();
1317
1318        let node = store.create_node(&["Person"]);
1319
1320        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1321        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1322        builder.advance_row();
1323
1324        let mut op = SetPropertyOperator::new_for_node(
1325            Arc::clone(&store),
1326            MockInput::boxed(builder.finish()),
1327            0,
1328            vec![(
1329                "name".to_string(),
1330                PropertySource::Constant(Value::String("Alice".into())),
1331            )],
1332            vec![LogicalType::Int64],
1333        );
1334
1335        let chunk = op.next().unwrap().unwrap();
1336        assert_eq!(chunk.row_count(), 1);
1337
1338        // Verify property was set
1339        let node_data = store.get_node(node).unwrap();
1340        assert_eq!(
1341            node_data
1342                .properties
1343                .get(&grafeo_common::types::PropertyKey::new("name")),
1344            Some(&Value::String("Alice".into()))
1345        );
1346    }
1347
1348    #[test]
1349    fn test_set_node_property_from_column() {
1350        let store = create_test_store();
1351
1352        let node = store.create_node(&["Person"]);
1353
1354        // Input: column 0 = node ID, column 1 = property value
1355        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1356        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1357        builder
1358            .column_mut(1)
1359            .unwrap()
1360            .push_value(Value::String("Bob".into()));
1361        builder.advance_row();
1362
1363        let mut op = SetPropertyOperator::new_for_node(
1364            Arc::clone(&store),
1365            MockInput::boxed(builder.finish()),
1366            0,
1367            vec![("name".to_string(), PropertySource::Column(1))],
1368            vec![LogicalType::Int64, LogicalType::String],
1369        );
1370
1371        let chunk = op.next().unwrap().unwrap();
1372        assert_eq!(chunk.row_count(), 1);
1373
1374        let node_data = store.get_node(node).unwrap();
1375        assert_eq!(
1376            node_data
1377                .properties
1378                .get(&grafeo_common::types::PropertyKey::new("name")),
1379            Some(&Value::String("Bob".into()))
1380        );
1381    }
1382
1383    #[test]
1384    fn test_set_edge_property() {
1385        let store = create_test_store();
1386
1387        let n1 = store.create_node(&["N"]);
1388        let n2 = store.create_node(&["N"]);
1389        let eid = store.create_edge(n1, n2, "KNOWS");
1390
1391        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1392        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1393        builder.advance_row();
1394
1395        let mut op = SetPropertyOperator::new_for_edge(
1396            Arc::clone(&store),
1397            MockInput::boxed(builder.finish()),
1398            0,
1399            vec![(
1400                "weight".to_string(),
1401                PropertySource::Constant(Value::Float64(0.75)),
1402            )],
1403            vec![LogicalType::Int64],
1404        );
1405
1406        let chunk = op.next().unwrap().unwrap();
1407        assert_eq!(chunk.row_count(), 1);
1408
1409        let edge_data = store.get_edge(eid).unwrap();
1410        assert_eq!(
1411            edge_data
1412                .properties
1413                .get(&grafeo_common::types::PropertyKey::new("weight")),
1414            Some(&Value::Float64(0.75))
1415        );
1416    }
1417
1418    #[test]
1419    fn test_set_multiple_properties() {
1420        let store = create_test_store();
1421
1422        let node = store.create_node(&["Person"]);
1423
1424        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1425        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1426        builder.advance_row();
1427
1428        let mut op = SetPropertyOperator::new_for_node(
1429            Arc::clone(&store),
1430            MockInput::boxed(builder.finish()),
1431            0,
1432            vec![
1433                (
1434                    "name".to_string(),
1435                    PropertySource::Constant(Value::String("Alice".into())),
1436                ),
1437                (
1438                    "age".to_string(),
1439                    PropertySource::Constant(Value::Int64(30)),
1440                ),
1441            ],
1442            vec![LogicalType::Int64],
1443        );
1444
1445        op.next().unwrap().unwrap();
1446
1447        let node_data = store.get_node(node).unwrap();
1448        assert_eq!(
1449            node_data
1450                .properties
1451                .get(&grafeo_common::types::PropertyKey::new("name")),
1452            Some(&Value::String("Alice".into()))
1453        );
1454        assert_eq!(
1455            node_data
1456                .properties
1457                .get(&grafeo_common::types::PropertyKey::new("age")),
1458            Some(&Value::Int64(30))
1459        );
1460    }
1461
1462    #[test]
1463    fn test_set_property_no_input_returns_none() {
1464        let store = create_test_store();
1465
1466        struct EmptyInput;
1467        impl Operator for EmptyInput {
1468            fn next(&mut self) -> OperatorResult {
1469                Ok(None)
1470            }
1471            fn reset(&mut self) {}
1472            fn name(&self) -> &'static str {
1473                "EmptyInput"
1474            }
1475        }
1476
1477        let mut op = SetPropertyOperator::new_for_node(
1478            Arc::clone(&store),
1479            Box::new(EmptyInput),
1480            0,
1481            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1482            vec![LogicalType::Int64],
1483        );
1484
1485        assert!(op.next().unwrap().is_none());
1486    }
1487
1488    // ── Operator name() ──────────────────────────────────────────
1489
1490    #[test]
1491    fn test_operator_names() {
1492        let store = create_test_store();
1493
1494        struct EmptyInput;
1495        impl Operator for EmptyInput {
1496            fn next(&mut self) -> OperatorResult {
1497                Ok(None)
1498            }
1499            fn reset(&mut self) {}
1500            fn name(&self) -> &'static str {
1501                "EmptyInput"
1502            }
1503        }
1504
1505        let op = DeleteEdgeOperator::new(
1506            Arc::clone(&store),
1507            Box::new(EmptyInput),
1508            0,
1509            vec![LogicalType::Int64],
1510        );
1511        assert_eq!(op.name(), "DeleteEdge");
1512
1513        let op = AddLabelOperator::new(
1514            Arc::clone(&store),
1515            Box::new(EmptyInput),
1516            0,
1517            vec!["L".to_string()],
1518            vec![LogicalType::Int64],
1519        );
1520        assert_eq!(op.name(), "AddLabel");
1521
1522        let op = RemoveLabelOperator::new(
1523            Arc::clone(&store),
1524            Box::new(EmptyInput),
1525            0,
1526            vec!["L".to_string()],
1527            vec![LogicalType::Int64],
1528        );
1529        assert_eq!(op.name(), "RemoveLabel");
1530
1531        let op = SetPropertyOperator::new_for_node(
1532            Arc::clone(&store),
1533            Box::new(EmptyInput),
1534            0,
1535            vec![],
1536            vec![LogicalType::Int64],
1537        );
1538        assert_eq!(op.name(), "SetProperty");
1539    }
1540}