Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<LpgStore>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49}
50
51impl CreateNodeOperator {
52    /// Creates a new node creation operator.
53    ///
54    /// # Arguments
55    /// * `store` - The graph store to modify.
56    /// * `input` - Optional input operator (None for standalone CREATE).
57    /// * `labels` - Labels to assign to created nodes.
58    /// * `properties` - Properties to set on created nodes.
59    /// * `output_schema` - Schema of the output.
60    /// * `output_column` - Column index where the created node ID goes.
61    pub fn new(
62        store: Arc<LpgStore>,
63        input: Option<Box<dyn Operator>>,
64        labels: Vec<String>,
65        properties: Vec<(String, PropertySource)>,
66        output_schema: Vec<LogicalType>,
67        output_column: usize,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            labels,
73            properties,
74            output_schema,
75            output_column,
76            executed: false,
77            viewing_epoch: None,
78            tx_id: None,
79        }
80    }
81
82    /// Sets the transaction context for MVCC versioning.
83    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84        self.viewing_epoch = Some(epoch);
85        self.tx_id = tx_id;
86        self
87    }
88}
89
90impl Operator for CreateNodeOperator {
91    fn next(&mut self) -> OperatorResult {
92        // Get transaction context for versioned creation
93        let epoch = self
94            .viewing_epoch
95            .unwrap_or_else(|| self.store.current_epoch());
96        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98        if let Some(ref mut input) = self.input {
99            // For each input row, create a node
100            if let Some(chunk) = input.next()? {
101                let mut builder =
102                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104                for row in chunk.selected_indices() {
105                    // Create the node with MVCC versioning
106                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109                    // Set properties
110                    for (prop_name, source) in &self.properties {
111                        let value = match source {
112                            PropertySource::Column(col_idx) => chunk
113                                .column(*col_idx)
114                                .and_then(|c| c.get_value(row))
115                                .unwrap_or(Value::Null),
116                            PropertySource::Constant(v) => v.clone(),
117                        };
118                        self.store.set_node_property(node_id, prop_name, value);
119                    }
120
121                    // Copy input columns to output
122                    for col_idx in 0..chunk.column_count() {
123                        if col_idx < self.output_column
124                            && let (Some(src), Some(dst)) =
125                                (chunk.column(col_idx), builder.column_mut(col_idx))
126                        {
127                            if let Some(val) = src.get_value(row) {
128                                dst.push_value(val);
129                            } else {
130                                dst.push_value(Value::Null);
131                            }
132                        }
133                    }
134
135                    // Add the new node ID
136                    if let Some(dst) = builder.column_mut(self.output_column) {
137                        dst.push_value(Value::Int64(node_id.0 as i64));
138                    }
139
140                    builder.advance_row();
141                }
142
143                return Ok(Some(builder.finish()));
144            }
145            Ok(None)
146        } else {
147            // No input - create a single node
148            if self.executed {
149                return Ok(None);
150            }
151            self.executed = true;
152
153            // Create the node with MVCC versioning
154            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
155            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
156
157            // Set properties from constants only
158            for (prop_name, source) in &self.properties {
159                if let PropertySource::Constant(value) = source {
160                    self.store
161                        .set_node_property(node_id, prop_name, value.clone());
162                }
163            }
164
165            // Build output chunk with just the node ID
166            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
167            if let Some(dst) = builder.column_mut(self.output_column) {
168                dst.push_value(Value::Int64(node_id.0 as i64));
169            }
170            builder.advance_row();
171
172            Ok(Some(builder.finish()))
173        }
174    }
175
176    fn reset(&mut self) {
177        if let Some(ref mut input) = self.input {
178            input.reset();
179        }
180        self.executed = false;
181    }
182
183    fn name(&self) -> &'static str {
184        "CreateNode"
185    }
186}
187
188/// Operator that creates new edges.
189pub struct CreateEdgeOperator {
190    /// The graph store to modify.
191    store: Arc<LpgStore>,
192    /// Input operator.
193    input: Box<dyn Operator>,
194    /// Column index for the source node.
195    from_column: usize,
196    /// Column index for the target node.
197    to_column: usize,
198    /// Edge type.
199    edge_type: String,
200    /// Properties to set.
201    properties: Vec<(String, PropertySource)>,
202    /// Output schema.
203    output_schema: Vec<LogicalType>,
204    /// Column index for the created edge variable (if any).
205    output_column: Option<usize>,
206    /// Epoch for MVCC versioning.
207    viewing_epoch: Option<EpochId>,
208    /// Transaction ID for MVCC versioning.
209    tx_id: Option<TxId>,
210}
211
212impl CreateEdgeOperator {
213    /// Creates a new edge creation operator.
214    ///
215    /// Use builder methods to set additional options:
216    /// - [`with_properties`](Self::with_properties) - set edge properties
217    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
218    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
219    pub fn new(
220        store: Arc<LpgStore>,
221        input: Box<dyn Operator>,
222        from_column: usize,
223        to_column: usize,
224        edge_type: String,
225        output_schema: Vec<LogicalType>,
226    ) -> Self {
227        Self {
228            store,
229            input,
230            from_column,
231            to_column,
232            edge_type,
233            properties: Vec::new(),
234            output_schema,
235            output_column: None,
236            viewing_epoch: None,
237            tx_id: None,
238        }
239    }
240
241    /// Sets the properties to assign to created edges.
242    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
243        self.properties = properties;
244        self
245    }
246
247    /// Sets the output column for the created edge ID.
248    pub fn with_output_column(mut self, column: usize) -> Self {
249        self.output_column = Some(column);
250        self
251    }
252
253    /// Sets the transaction context for MVCC versioning.
254    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
255        self.viewing_epoch = Some(epoch);
256        self.tx_id = tx_id;
257        self
258    }
259}
260
261impl Operator for CreateEdgeOperator {
262    fn next(&mut self) -> OperatorResult {
263        // Get transaction context for versioned creation
264        let epoch = self
265            .viewing_epoch
266            .unwrap_or_else(|| self.store.current_epoch());
267        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
268
269        if let Some(chunk) = self.input.next()? {
270            let mut builder =
271                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
272
273            for row in chunk.selected_indices() {
274                // Get source and target node IDs
275                let from_id = chunk
276                    .column(self.from_column)
277                    .and_then(|c| c.get_value(row))
278                    .ok_or_else(|| {
279                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
280                    })?;
281
282                let to_id = chunk
283                    .column(self.to_column)
284                    .and_then(|c| c.get_value(row))
285                    .ok_or_else(|| {
286                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
287                    })?;
288
289                // Extract node IDs
290                let from_node_id = match from_id {
291                    Value::Int64(id) => NodeId(id as u64),
292                    _ => {
293                        return Err(OperatorError::TypeMismatch {
294                            expected: "Int64 (node ID)".to_string(),
295                            found: format!("{from_id:?}"),
296                        });
297                    }
298                };
299
300                let to_node_id = match to_id {
301                    Value::Int64(id) => NodeId(id as u64),
302                    _ => {
303                        return Err(OperatorError::TypeMismatch {
304                            expected: "Int64 (node ID)".to_string(),
305                            found: format!("{to_id:?}"),
306                        });
307                    }
308                };
309
310                // Create the edge with MVCC versioning
311                let edge_id = self.store.create_edge_versioned(
312                    from_node_id,
313                    to_node_id,
314                    &self.edge_type,
315                    epoch,
316                    tx,
317                );
318
319                // Set properties
320                for (prop_name, source) in &self.properties {
321                    let value = match source {
322                        PropertySource::Column(col_idx) => chunk
323                            .column(*col_idx)
324                            .and_then(|c| c.get_value(row))
325                            .unwrap_or(Value::Null),
326                        PropertySource::Constant(v) => v.clone(),
327                    };
328                    self.store.set_edge_property(edge_id, prop_name, value);
329                }
330
331                // Copy input columns
332                for col_idx in 0..chunk.column_count() {
333                    if let (Some(src), Some(dst)) =
334                        (chunk.column(col_idx), builder.column_mut(col_idx))
335                    {
336                        if let Some(val) = src.get_value(row) {
337                            dst.push_value(val);
338                        } else {
339                            dst.push_value(Value::Null);
340                        }
341                    }
342                }
343
344                // Add edge ID if requested
345                if let Some(out_col) = self.output_column
346                    && let Some(dst) = builder.column_mut(out_col)
347                {
348                    dst.push_value(Value::Int64(edge_id.0 as i64));
349                }
350
351                builder.advance_row();
352            }
353
354            return Ok(Some(builder.finish()));
355        }
356        Ok(None)
357    }
358
359    fn reset(&mut self) {
360        self.input.reset();
361    }
362
363    fn name(&self) -> &'static str {
364        "CreateEdge"
365    }
366}
367
368/// Operator that deletes nodes.
369pub struct DeleteNodeOperator {
370    /// The graph store to modify.
371    store: Arc<LpgStore>,
372    /// Input operator.
373    input: Box<dyn Operator>,
374    /// Column index for the node to delete.
375    node_column: usize,
376    /// Output schema.
377    output_schema: Vec<LogicalType>,
378    /// Whether to detach (delete connected edges) before deleting.
379    detach: bool,
380    /// Epoch for MVCC versioning.
381    viewing_epoch: Option<EpochId>,
382}
383
384impl DeleteNodeOperator {
385    /// Creates a new node deletion operator.
386    pub fn new(
387        store: Arc<LpgStore>,
388        input: Box<dyn Operator>,
389        node_column: usize,
390        output_schema: Vec<LogicalType>,
391        detach: bool,
392    ) -> Self {
393        Self {
394            store,
395            input,
396            node_column,
397            output_schema,
398            detach,
399            viewing_epoch: None,
400        }
401    }
402
403    /// Sets the transaction context for MVCC versioning.
404    pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
405        self.viewing_epoch = Some(epoch);
406        self
407    }
408}
409
410impl Operator for DeleteNodeOperator {
411    fn next(&mut self) -> OperatorResult {
412        // Get transaction context for versioned deletion
413        let epoch = self
414            .viewing_epoch
415            .unwrap_or_else(|| self.store.current_epoch());
416
417        if let Some(chunk) = self.input.next()? {
418            let mut deleted_count = 0;
419
420            for row in chunk.selected_indices() {
421                let node_val = chunk
422                    .column(self.node_column)
423                    .and_then(|c| c.get_value(row))
424                    .ok_or_else(|| {
425                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
426                    })?;
427
428                let node_id = match node_val {
429                    Value::Int64(id) => NodeId(id as u64),
430                    _ => {
431                        return Err(OperatorError::TypeMismatch {
432                            expected: "Int64 (node ID)".to_string(),
433                            found: format!("{node_val:?}"),
434                        });
435                    }
436                };
437
438                if self.detach {
439                    // Delete all connected edges first
440                    // Note: Edge deletion will use epoch internally
441                    self.store.delete_node_edges(node_id);
442                }
443
444                // Delete the node with MVCC versioning
445                if self.store.delete_node_at_epoch(node_id, epoch) {
446                    deleted_count += 1;
447                }
448            }
449
450            // Return a chunk with the delete count
451            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
452            if let Some(dst) = builder.column_mut(0) {
453                dst.push_value(Value::Int64(deleted_count));
454            }
455            builder.advance_row();
456
457            return Ok(Some(builder.finish()));
458        }
459        Ok(None)
460    }
461
462    fn reset(&mut self) {
463        self.input.reset();
464    }
465
466    fn name(&self) -> &'static str {
467        "DeleteNode"
468    }
469}
470
471/// Operator that deletes edges.
472pub struct DeleteEdgeOperator {
473    /// The graph store to modify.
474    store: Arc<LpgStore>,
475    /// Input operator.
476    input: Box<dyn Operator>,
477    /// Column index for the edge to delete.
478    edge_column: usize,
479    /// Output schema.
480    output_schema: Vec<LogicalType>,
481    /// Epoch for MVCC versioning.
482    viewing_epoch: Option<EpochId>,
483}
484
485impl DeleteEdgeOperator {
486    /// Creates a new edge deletion operator.
487    pub fn new(
488        store: Arc<LpgStore>,
489        input: Box<dyn Operator>,
490        edge_column: usize,
491        output_schema: Vec<LogicalType>,
492    ) -> Self {
493        Self {
494            store,
495            input,
496            edge_column,
497            output_schema,
498            viewing_epoch: None,
499        }
500    }
501
502    /// Sets the transaction context for MVCC versioning.
503    pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
504        self.viewing_epoch = Some(epoch);
505        self
506    }
507}
508
509impl Operator for DeleteEdgeOperator {
510    fn next(&mut self) -> OperatorResult {
511        // Get transaction context for versioned deletion
512        let epoch = self
513            .viewing_epoch
514            .unwrap_or_else(|| self.store.current_epoch());
515
516        if let Some(chunk) = self.input.next()? {
517            let mut deleted_count = 0;
518
519            for row in chunk.selected_indices() {
520                let edge_val = chunk
521                    .column(self.edge_column)
522                    .and_then(|c| c.get_value(row))
523                    .ok_or_else(|| {
524                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
525                    })?;
526
527                let edge_id = match edge_val {
528                    Value::Int64(id) => EdgeId(id as u64),
529                    _ => {
530                        return Err(OperatorError::TypeMismatch {
531                            expected: "Int64 (edge ID)".to_string(),
532                            found: format!("{edge_val:?}"),
533                        });
534                    }
535                };
536
537                // Delete the edge with MVCC versioning
538                if self.store.delete_edge_at_epoch(edge_id, epoch) {
539                    deleted_count += 1;
540                }
541            }
542
543            // Return a chunk with the delete count
544            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
545            if let Some(dst) = builder.column_mut(0) {
546                dst.push_value(Value::Int64(deleted_count));
547            }
548            builder.advance_row();
549
550            return Ok(Some(builder.finish()));
551        }
552        Ok(None)
553    }
554
555    fn reset(&mut self) {
556        self.input.reset();
557    }
558
559    fn name(&self) -> &'static str {
560        "DeleteEdge"
561    }
562}
563
564/// Operator that adds labels to nodes.
565pub struct AddLabelOperator {
566    /// The graph store.
567    store: Arc<LpgStore>,
568    /// Child operator providing nodes.
569    input: Box<dyn Operator>,
570    /// Column index containing node IDs.
571    node_column: usize,
572    /// Labels to add.
573    labels: Vec<String>,
574    /// Output schema.
575    output_schema: Vec<LogicalType>,
576}
577
578impl AddLabelOperator {
579    /// Creates a new add label operator.
580    pub fn new(
581        store: Arc<LpgStore>,
582        input: Box<dyn Operator>,
583        node_column: usize,
584        labels: Vec<String>,
585        output_schema: Vec<LogicalType>,
586    ) -> Self {
587        Self {
588            store,
589            input,
590            node_column,
591            labels,
592            output_schema,
593        }
594    }
595}
596
597impl Operator for AddLabelOperator {
598    fn next(&mut self) -> OperatorResult {
599        if let Some(chunk) = self.input.next()? {
600            let mut updated_count = 0;
601
602            for row in chunk.selected_indices() {
603                let node_val = chunk
604                    .column(self.node_column)
605                    .and_then(|c| c.get_value(row))
606                    .ok_or_else(|| {
607                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
608                    })?;
609
610                let node_id = match node_val {
611                    Value::Int64(id) => NodeId(id as u64),
612                    _ => {
613                        return Err(OperatorError::TypeMismatch {
614                            expected: "Int64 (node ID)".to_string(),
615                            found: format!("{node_val:?}"),
616                        });
617                    }
618                };
619
620                // Add all labels
621                for label in &self.labels {
622                    if self.store.add_label(node_id, label) {
623                        updated_count += 1;
624                    }
625                }
626            }
627
628            // Return a chunk with the update count
629            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
630            if let Some(dst) = builder.column_mut(0) {
631                dst.push_value(Value::Int64(updated_count));
632            }
633            builder.advance_row();
634
635            return Ok(Some(builder.finish()));
636        }
637        Ok(None)
638    }
639
640    fn reset(&mut self) {
641        self.input.reset();
642    }
643
644    fn name(&self) -> &'static str {
645        "AddLabel"
646    }
647}
648
649/// Operator that removes labels from nodes.
650pub struct RemoveLabelOperator {
651    /// The graph store.
652    store: Arc<LpgStore>,
653    /// Child operator providing nodes.
654    input: Box<dyn Operator>,
655    /// Column index containing node IDs.
656    node_column: usize,
657    /// Labels to remove.
658    labels: Vec<String>,
659    /// Output schema.
660    output_schema: Vec<LogicalType>,
661}
662
663impl RemoveLabelOperator {
664    /// Creates a new remove label operator.
665    pub fn new(
666        store: Arc<LpgStore>,
667        input: Box<dyn Operator>,
668        node_column: usize,
669        labels: Vec<String>,
670        output_schema: Vec<LogicalType>,
671    ) -> Self {
672        Self {
673            store,
674            input,
675            node_column,
676            labels,
677            output_schema,
678        }
679    }
680}
681
682impl Operator for RemoveLabelOperator {
683    fn next(&mut self) -> OperatorResult {
684        if let Some(chunk) = self.input.next()? {
685            let mut updated_count = 0;
686
687            for row in chunk.selected_indices() {
688                let node_val = chunk
689                    .column(self.node_column)
690                    .and_then(|c| c.get_value(row))
691                    .ok_or_else(|| {
692                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
693                    })?;
694
695                let node_id = match node_val {
696                    Value::Int64(id) => NodeId(id as u64),
697                    _ => {
698                        return Err(OperatorError::TypeMismatch {
699                            expected: "Int64 (node ID)".to_string(),
700                            found: format!("{node_val:?}"),
701                        });
702                    }
703                };
704
705                // Remove all labels
706                for label in &self.labels {
707                    if self.store.remove_label(node_id, label) {
708                        updated_count += 1;
709                    }
710                }
711            }
712
713            // Return a chunk with the update count
714            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
715            if let Some(dst) = builder.column_mut(0) {
716                dst.push_value(Value::Int64(updated_count));
717            }
718            builder.advance_row();
719
720            return Ok(Some(builder.finish()));
721        }
722        Ok(None)
723    }
724
725    fn reset(&mut self) {
726        self.input.reset();
727    }
728
729    fn name(&self) -> &'static str {
730        "RemoveLabel"
731    }
732}
733
734/// Operator that sets properties on nodes or edges.
735///
736/// This operator reads node/edge IDs from a column and sets the
737/// specified properties on each entity.
738pub struct SetPropertyOperator {
739    /// The graph store.
740    store: Arc<LpgStore>,
741    /// Child operator providing entities.
742    input: Box<dyn Operator>,
743    /// Column index containing entity IDs (node or edge).
744    entity_column: usize,
745    /// Whether the entity is an edge (false = node).
746    is_edge: bool,
747    /// Properties to set (name -> source).
748    properties: Vec<(String, PropertySource)>,
749    /// Output schema.
750    output_schema: Vec<LogicalType>,
751}
752
753impl SetPropertyOperator {
754    /// Creates a new set property operator for nodes.
755    pub fn new_for_node(
756        store: Arc<LpgStore>,
757        input: Box<dyn Operator>,
758        node_column: usize,
759        properties: Vec<(String, PropertySource)>,
760        output_schema: Vec<LogicalType>,
761    ) -> Self {
762        Self {
763            store,
764            input,
765            entity_column: node_column,
766            is_edge: false,
767            properties,
768            output_schema,
769        }
770    }
771
772    /// Creates a new set property operator for edges.
773    pub fn new_for_edge(
774        store: Arc<LpgStore>,
775        input: Box<dyn Operator>,
776        edge_column: usize,
777        properties: Vec<(String, PropertySource)>,
778        output_schema: Vec<LogicalType>,
779    ) -> Self {
780        Self {
781            store,
782            input,
783            entity_column: edge_column,
784            is_edge: true,
785            properties,
786            output_schema,
787        }
788    }
789}
790
791impl Operator for SetPropertyOperator {
792    fn next(&mut self) -> OperatorResult {
793        if let Some(chunk) = self.input.next()? {
794            let mut builder =
795                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
796
797            for row in chunk.selected_indices() {
798                let entity_val = chunk
799                    .column(self.entity_column)
800                    .and_then(|c| c.get_value(row))
801                    .ok_or_else(|| {
802                        OperatorError::ColumnNotFound(format!(
803                            "entity column {}",
804                            self.entity_column
805                        ))
806                    })?;
807
808                let entity_id = match entity_val {
809                    Value::Int64(id) => id as u64,
810                    _ => {
811                        return Err(OperatorError::TypeMismatch {
812                            expected: "Int64 (entity ID)".to_string(),
813                            found: format!("{entity_val:?}"),
814                        });
815                    }
816                };
817
818                // Set all properties
819                for (prop_name, source) in &self.properties {
820                    let value = match source {
821                        PropertySource::Column(col_idx) => chunk
822                            .column(*col_idx)
823                            .and_then(|c| c.get_value(row))
824                            .unwrap_or(Value::Null),
825                        PropertySource::Constant(v) => v.clone(),
826                    };
827
828                    if self.is_edge {
829                        self.store
830                            .set_edge_property(EdgeId(entity_id), prop_name, value);
831                    } else {
832                        self.store
833                            .set_node_property(NodeId(entity_id), prop_name, value);
834                    }
835                }
836
837                // Copy input columns to output
838                for col_idx in 0..chunk.column_count() {
839                    if let (Some(src), Some(dst)) =
840                        (chunk.column(col_idx), builder.column_mut(col_idx))
841                    {
842                        if let Some(val) = src.get_value(row) {
843                            dst.push_value(val);
844                        } else {
845                            dst.push_value(Value::Null);
846                        }
847                    }
848                }
849
850                builder.advance_row();
851            }
852
853            return Ok(Some(builder.finish()));
854        }
855        Ok(None)
856    }
857
858    fn reset(&mut self) {
859        self.input.reset();
860    }
861
862    fn name(&self) -> &'static str {
863        "SetProperty"
864    }
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870    use crate::execution::DataChunk;
871    use crate::execution::chunk::DataChunkBuilder;
872
873    fn create_test_store() -> Arc<LpgStore> {
874        Arc::new(LpgStore::new())
875    }
876
877    #[test]
878    fn test_create_node_standalone() {
879        let store = create_test_store();
880
881        let mut op = CreateNodeOperator::new(
882            Arc::clone(&store),
883            None,
884            vec!["Person".to_string()],
885            vec![(
886                "name".to_string(),
887                PropertySource::Constant(Value::String("Alice".into())),
888            )],
889            vec![LogicalType::Int64],
890            0,
891        );
892
893        // First call should create a node
894        let chunk = op.next().unwrap().unwrap();
895        assert_eq!(chunk.row_count(), 1);
896
897        // Second call should return None
898        assert!(op.next().unwrap().is_none());
899
900        // Verify node was created
901        assert_eq!(store.node_count(), 1);
902    }
903
904    #[test]
905    fn test_create_edge() {
906        let store = create_test_store();
907
908        // Create two nodes first
909        let node1 = store.create_node(&["Person"]);
910        let node2 = store.create_node(&["Person"]);
911
912        // Create input chunk with node IDs
913        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
914        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
915        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
916        builder.advance_row();
917        let input_chunk = builder.finish();
918
919        // Mock input operator
920        struct MockInput {
921            chunk: Option<DataChunk>,
922        }
923        impl Operator for MockInput {
924            fn next(&mut self) -> OperatorResult {
925                Ok(self.chunk.take())
926            }
927            fn reset(&mut self) {}
928            fn name(&self) -> &'static str {
929                "MockInput"
930            }
931        }
932
933        let mut op = CreateEdgeOperator::new(
934            Arc::clone(&store),
935            Box::new(MockInput {
936                chunk: Some(input_chunk),
937            }),
938            0, // from column
939            1, // to column
940            "KNOWS".to_string(),
941            vec![LogicalType::Int64, LogicalType::Int64],
942        );
943
944        // Execute
945        let _chunk = op.next().unwrap().unwrap();
946
947        // Verify edge was created
948        assert_eq!(store.edge_count(), 1);
949    }
950
951    #[test]
952    fn test_delete_node() {
953        let store = create_test_store();
954
955        // Create a node
956        let node_id = store.create_node(&["Person"]);
957        assert_eq!(store.node_count(), 1);
958
959        // Create input chunk with the node ID
960        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
961        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
962        builder.advance_row();
963        let input_chunk = builder.finish();
964
965        struct MockInput {
966            chunk: Option<DataChunk>,
967        }
968        impl Operator for MockInput {
969            fn next(&mut self) -> OperatorResult {
970                Ok(self.chunk.take())
971            }
972            fn reset(&mut self) {}
973            fn name(&self) -> &'static str {
974                "MockInput"
975            }
976        }
977
978        let mut op = DeleteNodeOperator::new(
979            Arc::clone(&store),
980            Box::new(MockInput {
981                chunk: Some(input_chunk),
982            }),
983            0,
984            vec![LogicalType::Int64],
985            false,
986        );
987
988        // Execute
989        let chunk = op.next().unwrap().unwrap();
990
991        // Verify deletion
992        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
993        assert_eq!(deleted, 1);
994        assert_eq!(store.node_count(), 0);
995    }
996
997    // ── Helper: reusable MockInput ───────────────────────────────
998
999    struct MockInput {
1000        chunk: Option<DataChunk>,
1001    }
1002
1003    impl MockInput {
1004        fn boxed(chunk: DataChunk) -> Box<Self> {
1005            Box::new(Self { chunk: Some(chunk) })
1006        }
1007    }
1008
1009    impl Operator for MockInput {
1010        fn next(&mut self) -> OperatorResult {
1011            Ok(self.chunk.take())
1012        }
1013        fn reset(&mut self) {}
1014        fn name(&self) -> &'static str {
1015            "MockInput"
1016        }
1017    }
1018
1019    // ── DeleteEdgeOperator ───────────────────────────────────────
1020
1021    #[test]
1022    fn test_delete_edge() {
1023        let store = create_test_store();
1024
1025        let n1 = store.create_node(&["Person"]);
1026        let n2 = store.create_node(&["Person"]);
1027        let eid = store.create_edge(n1, n2, "KNOWS");
1028        assert_eq!(store.edge_count(), 1);
1029
1030        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1031        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1032        builder.advance_row();
1033
1034        let mut op = DeleteEdgeOperator::new(
1035            Arc::clone(&store),
1036            MockInput::boxed(builder.finish()),
1037            0,
1038            vec![LogicalType::Int64],
1039        );
1040
1041        let chunk = op.next().unwrap().unwrap();
1042        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1043        assert_eq!(deleted, 1);
1044        assert_eq!(store.edge_count(), 0);
1045    }
1046
1047    #[test]
1048    fn test_delete_edge_no_input_returns_none() {
1049        let store = create_test_store();
1050
1051        // Empty chunk: MockInput returns None immediately
1052        struct EmptyInput;
1053        impl Operator for EmptyInput {
1054            fn next(&mut self) -> OperatorResult {
1055                Ok(None)
1056            }
1057            fn reset(&mut self) {}
1058            fn name(&self) -> &'static str {
1059                "EmptyInput"
1060            }
1061        }
1062
1063        let mut op = DeleteEdgeOperator::new(
1064            Arc::clone(&store),
1065            Box::new(EmptyInput),
1066            0,
1067            vec![LogicalType::Int64],
1068        );
1069
1070        assert!(op.next().unwrap().is_none());
1071    }
1072
1073    #[test]
1074    fn test_delete_multiple_edges() {
1075        let store = create_test_store();
1076
1077        let n1 = store.create_node(&["N"]);
1078        let n2 = store.create_node(&["N"]);
1079        let e1 = store.create_edge(n1, n2, "R");
1080        let e2 = store.create_edge(n2, n1, "S");
1081        assert_eq!(store.edge_count(), 2);
1082
1083        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1084        builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1085        builder.advance_row();
1086        builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1087        builder.advance_row();
1088
1089        let mut op = DeleteEdgeOperator::new(
1090            Arc::clone(&store),
1091            MockInput::boxed(builder.finish()),
1092            0,
1093            vec![LogicalType::Int64],
1094        );
1095
1096        let chunk = op.next().unwrap().unwrap();
1097        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1098        assert_eq!(deleted, 2);
1099        assert_eq!(store.edge_count(), 0);
1100    }
1101
1102    // ── DeleteNodeOperator with DETACH ───────────────────────────
1103
1104    #[test]
1105    fn test_delete_node_detach() {
1106        let store = create_test_store();
1107
1108        let n1 = store.create_node(&["Person"]);
1109        let n2 = store.create_node(&["Person"]);
1110        store.create_edge(n1, n2, "KNOWS");
1111        store.create_edge(n2, n1, "FOLLOWS");
1112        assert_eq!(store.edge_count(), 2);
1113
1114        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1115        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1116        builder.advance_row();
1117
1118        let mut op = DeleteNodeOperator::new(
1119            Arc::clone(&store),
1120            MockInput::boxed(builder.finish()),
1121            0,
1122            vec![LogicalType::Int64],
1123            true, // detach = true
1124        );
1125
1126        let chunk = op.next().unwrap().unwrap();
1127        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1128        assert_eq!(deleted, 1);
1129        assert_eq!(store.node_count(), 1);
1130        assert_eq!(store.edge_count(), 0); // edges detached
1131    }
1132
1133    // ── AddLabelOperator ─────────────────────────────────────────
1134
1135    #[test]
1136    fn test_add_label() {
1137        let store = create_test_store();
1138
1139        let node = store.create_node(&["Person"]);
1140
1141        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1142        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1143        builder.advance_row();
1144
1145        let mut op = AddLabelOperator::new(
1146            Arc::clone(&store),
1147            MockInput::boxed(builder.finish()),
1148            0,
1149            vec!["Employee".to_string()],
1150            vec![LogicalType::Int64],
1151        );
1152
1153        let chunk = op.next().unwrap().unwrap();
1154        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1155        assert_eq!(updated, 1);
1156
1157        // Verify label was added
1158        let node_data = store.get_node(node).unwrap();
1159        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1160        assert!(labels.contains(&"Person"));
1161        assert!(labels.contains(&"Employee"));
1162    }
1163
1164    #[test]
1165    fn test_add_multiple_labels() {
1166        let store = create_test_store();
1167
1168        let node = store.create_node(&["Base"]);
1169
1170        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1171        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1172        builder.advance_row();
1173
1174        let mut op = AddLabelOperator::new(
1175            Arc::clone(&store),
1176            MockInput::boxed(builder.finish()),
1177            0,
1178            vec!["LabelA".to_string(), "LabelB".to_string()],
1179            vec![LogicalType::Int64],
1180        );
1181
1182        let chunk = op.next().unwrap().unwrap();
1183        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1184        assert_eq!(updated, 2); // 2 labels added
1185
1186        let node_data = store.get_node(node).unwrap();
1187        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1188        assert!(labels.contains(&"LabelA"));
1189        assert!(labels.contains(&"LabelB"));
1190    }
1191
1192    #[test]
1193    fn test_add_label_no_input_returns_none() {
1194        let store = create_test_store();
1195
1196        struct EmptyInput;
1197        impl Operator for EmptyInput {
1198            fn next(&mut self) -> OperatorResult {
1199                Ok(None)
1200            }
1201            fn reset(&mut self) {}
1202            fn name(&self) -> &'static str {
1203                "EmptyInput"
1204            }
1205        }
1206
1207        let mut op = AddLabelOperator::new(
1208            Arc::clone(&store),
1209            Box::new(EmptyInput),
1210            0,
1211            vec!["Foo".to_string()],
1212            vec![LogicalType::Int64],
1213        );
1214
1215        assert!(op.next().unwrap().is_none());
1216    }
1217
1218    // ── RemoveLabelOperator ──────────────────────────────────────
1219
1220    #[test]
1221    fn test_remove_label() {
1222        let store = create_test_store();
1223
1224        let node = store.create_node(&["Person", "Employee"]);
1225
1226        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1227        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1228        builder.advance_row();
1229
1230        let mut op = RemoveLabelOperator::new(
1231            Arc::clone(&store),
1232            MockInput::boxed(builder.finish()),
1233            0,
1234            vec!["Employee".to_string()],
1235            vec![LogicalType::Int64],
1236        );
1237
1238        let chunk = op.next().unwrap().unwrap();
1239        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1240        assert_eq!(updated, 1);
1241
1242        // Verify label was removed
1243        let node_data = store.get_node(node).unwrap();
1244        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1245        assert!(labels.contains(&"Person"));
1246        assert!(!labels.contains(&"Employee"));
1247    }
1248
1249    #[test]
1250    fn test_remove_nonexistent_label() {
1251        let store = create_test_store();
1252
1253        let node = store.create_node(&["Person"]);
1254
1255        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1256        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1257        builder.advance_row();
1258
1259        let mut op = RemoveLabelOperator::new(
1260            Arc::clone(&store),
1261            MockInput::boxed(builder.finish()),
1262            0,
1263            vec!["NonExistent".to_string()],
1264            vec![LogicalType::Int64],
1265        );
1266
1267        let chunk = op.next().unwrap().unwrap();
1268        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1269        assert_eq!(updated, 0); // nothing removed
1270    }
1271
1272    // ── SetPropertyOperator ──────────────────────────────────────
1273
1274    #[test]
1275    fn test_set_node_property_constant() {
1276        let store = create_test_store();
1277
1278        let node = store.create_node(&["Person"]);
1279
1280        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1281        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1282        builder.advance_row();
1283
1284        let mut op = SetPropertyOperator::new_for_node(
1285            Arc::clone(&store),
1286            MockInput::boxed(builder.finish()),
1287            0,
1288            vec![(
1289                "name".to_string(),
1290                PropertySource::Constant(Value::String("Alice".into())),
1291            )],
1292            vec![LogicalType::Int64],
1293        );
1294
1295        let chunk = op.next().unwrap().unwrap();
1296        assert_eq!(chunk.row_count(), 1);
1297
1298        // Verify property was set
1299        let node_data = store.get_node(node).unwrap();
1300        assert_eq!(
1301            node_data
1302                .properties
1303                .get(&grafeo_common::types::PropertyKey::new("name")),
1304            Some(&Value::String("Alice".into()))
1305        );
1306    }
1307
1308    #[test]
1309    fn test_set_node_property_from_column() {
1310        let store = create_test_store();
1311
1312        let node = store.create_node(&["Person"]);
1313
1314        // Input: column 0 = node ID, column 1 = property value
1315        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1316        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1317        builder
1318            .column_mut(1)
1319            .unwrap()
1320            .push_value(Value::String("Bob".into()));
1321        builder.advance_row();
1322
1323        let mut op = SetPropertyOperator::new_for_node(
1324            Arc::clone(&store),
1325            MockInput::boxed(builder.finish()),
1326            0,
1327            vec![("name".to_string(), PropertySource::Column(1))],
1328            vec![LogicalType::Int64, LogicalType::String],
1329        );
1330
1331        let chunk = op.next().unwrap().unwrap();
1332        assert_eq!(chunk.row_count(), 1);
1333
1334        let node_data = store.get_node(node).unwrap();
1335        assert_eq!(
1336            node_data
1337                .properties
1338                .get(&grafeo_common::types::PropertyKey::new("name")),
1339            Some(&Value::String("Bob".into()))
1340        );
1341    }
1342
1343    #[test]
1344    fn test_set_edge_property() {
1345        let store = create_test_store();
1346
1347        let n1 = store.create_node(&["N"]);
1348        let n2 = store.create_node(&["N"]);
1349        let eid = store.create_edge(n1, n2, "KNOWS");
1350
1351        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1352        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1353        builder.advance_row();
1354
1355        let mut op = SetPropertyOperator::new_for_edge(
1356            Arc::clone(&store),
1357            MockInput::boxed(builder.finish()),
1358            0,
1359            vec![(
1360                "weight".to_string(),
1361                PropertySource::Constant(Value::Float64(0.75)),
1362            )],
1363            vec![LogicalType::Int64],
1364        );
1365
1366        let chunk = op.next().unwrap().unwrap();
1367        assert_eq!(chunk.row_count(), 1);
1368
1369        let edge_data = store.get_edge(eid).unwrap();
1370        assert_eq!(
1371            edge_data
1372                .properties
1373                .get(&grafeo_common::types::PropertyKey::new("weight")),
1374            Some(&Value::Float64(0.75))
1375        );
1376    }
1377
1378    #[test]
1379    fn test_set_multiple_properties() {
1380        let store = create_test_store();
1381
1382        let node = store.create_node(&["Person"]);
1383
1384        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1385        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1386        builder.advance_row();
1387
1388        let mut op = SetPropertyOperator::new_for_node(
1389            Arc::clone(&store),
1390            MockInput::boxed(builder.finish()),
1391            0,
1392            vec![
1393                (
1394                    "name".to_string(),
1395                    PropertySource::Constant(Value::String("Alice".into())),
1396                ),
1397                (
1398                    "age".to_string(),
1399                    PropertySource::Constant(Value::Int64(30)),
1400                ),
1401            ],
1402            vec![LogicalType::Int64],
1403        );
1404
1405        op.next().unwrap().unwrap();
1406
1407        let node_data = store.get_node(node).unwrap();
1408        assert_eq!(
1409            node_data
1410                .properties
1411                .get(&grafeo_common::types::PropertyKey::new("name")),
1412            Some(&Value::String("Alice".into()))
1413        );
1414        assert_eq!(
1415            node_data
1416                .properties
1417                .get(&grafeo_common::types::PropertyKey::new("age")),
1418            Some(&Value::Int64(30))
1419        );
1420    }
1421
1422    #[test]
1423    fn test_set_property_no_input_returns_none() {
1424        let store = create_test_store();
1425
1426        struct EmptyInput;
1427        impl Operator for EmptyInput {
1428            fn next(&mut self) -> OperatorResult {
1429                Ok(None)
1430            }
1431            fn reset(&mut self) {}
1432            fn name(&self) -> &'static str {
1433                "EmptyInput"
1434            }
1435        }
1436
1437        let mut op = SetPropertyOperator::new_for_node(
1438            Arc::clone(&store),
1439            Box::new(EmptyInput),
1440            0,
1441            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1442            vec![LogicalType::Int64],
1443        );
1444
1445        assert!(op.next().unwrap().is_none());
1446    }
1447
1448    // ── Operator name() ──────────────────────────────────────────
1449
1450    #[test]
1451    fn test_operator_names() {
1452        let store = create_test_store();
1453
1454        struct EmptyInput;
1455        impl Operator for EmptyInput {
1456            fn next(&mut self) -> OperatorResult {
1457                Ok(None)
1458            }
1459            fn reset(&mut self) {}
1460            fn name(&self) -> &'static str {
1461                "EmptyInput"
1462            }
1463        }
1464
1465        let op = DeleteEdgeOperator::new(
1466            Arc::clone(&store),
1467            Box::new(EmptyInput),
1468            0,
1469            vec![LogicalType::Int64],
1470        );
1471        assert_eq!(op.name(), "DeleteEdge");
1472
1473        let op = AddLabelOperator::new(
1474            Arc::clone(&store),
1475            Box::new(EmptyInput),
1476            0,
1477            vec!["L".to_string()],
1478            vec![LogicalType::Int64],
1479        );
1480        assert_eq!(op.name(), "AddLabel");
1481
1482        let op = RemoveLabelOperator::new(
1483            Arc::clone(&store),
1484            Box::new(EmptyInput),
1485            0,
1486            vec!["L".to_string()],
1487            vec![LogicalType::Int64],
1488        );
1489        assert_eq!(op.name(), "RemoveLabel");
1490
1491        let op = SetPropertyOperator::new_for_node(
1492            Arc::clone(&store),
1493            Box::new(EmptyInput),
1494            0,
1495            vec![],
1496            vec![LogicalType::Int64],
1497        );
1498        assert_eq!(op.name(), "SetProperty");
1499    }
1500}