Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<LpgStore>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49}
50
51impl CreateNodeOperator {
52    /// Creates a new node creation operator.
53    ///
54    /// # Arguments
55    /// * `store` - The graph store to modify.
56    /// * `input` - Optional input operator (None for standalone CREATE).
57    /// * `labels` - Labels to assign to created nodes.
58    /// * `properties` - Properties to set on created nodes.
59    /// * `output_schema` - Schema of the output.
60    /// * `output_column` - Column index where the created node ID goes.
61    pub fn new(
62        store: Arc<LpgStore>,
63        input: Option<Box<dyn Operator>>,
64        labels: Vec<String>,
65        properties: Vec<(String, PropertySource)>,
66        output_schema: Vec<LogicalType>,
67        output_column: usize,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            labels,
73            properties,
74            output_schema,
75            output_column,
76            executed: false,
77            viewing_epoch: None,
78            tx_id: None,
79        }
80    }
81
82    /// Sets the transaction context for MVCC versioning.
83    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84        self.viewing_epoch = Some(epoch);
85        self.tx_id = tx_id;
86        self
87    }
88}
89
90impl Operator for CreateNodeOperator {
91    fn next(&mut self) -> OperatorResult {
92        // Get transaction context for versioned creation
93        let epoch = self
94            .viewing_epoch
95            .unwrap_or_else(|| self.store.current_epoch());
96        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98        if let Some(ref mut input) = self.input {
99            // For each input row, create a node
100            if let Some(chunk) = input.next()? {
101                let mut builder =
102                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104                for row in chunk.selected_indices() {
105                    // Create the node with MVCC versioning
106                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109                    // Set properties
110                    for (prop_name, source) in &self.properties {
111                        let value = match source {
112                            PropertySource::Column(col_idx) => chunk
113                                .column(*col_idx)
114                                .and_then(|c| c.get_value(row))
115                                .unwrap_or(Value::Null),
116                            PropertySource::Constant(v) => v.clone(),
117                        };
118                        self.store.set_node_property(node_id, prop_name, value);
119                    }
120
121                    // Copy input columns to output
122                    for col_idx in 0..chunk.column_count() {
123                        if col_idx < self.output_column {
124                            if let (Some(src), Some(dst)) =
125                                (chunk.column(col_idx), builder.column_mut(col_idx))
126                            {
127                                if let Some(val) = src.get_value(row) {
128                                    dst.push_value(val);
129                                } else {
130                                    dst.push_value(Value::Null);
131                                }
132                            }
133                        }
134                    }
135
136                    // Add the new node ID
137                    if let Some(dst) = builder.column_mut(self.output_column) {
138                        dst.push_value(Value::Int64(node_id.0 as i64));
139                    }
140
141                    builder.advance_row();
142                }
143
144                return Ok(Some(builder.finish()));
145            }
146            Ok(None)
147        } else {
148            // No input - create a single node
149            if self.executed {
150                return Ok(None);
151            }
152            self.executed = true;
153
154            // Create the node with MVCC versioning
155            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
156            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
157
158            // Set properties from constants only
159            for (prop_name, source) in &self.properties {
160                if let PropertySource::Constant(value) = source {
161                    self.store
162                        .set_node_property(node_id, prop_name, value.clone());
163                }
164            }
165
166            // Build output chunk with just the node ID
167            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
168            if let Some(dst) = builder.column_mut(self.output_column) {
169                dst.push_value(Value::Int64(node_id.0 as i64));
170            }
171            builder.advance_row();
172
173            Ok(Some(builder.finish()))
174        }
175    }
176
177    fn reset(&mut self) {
178        if let Some(ref mut input) = self.input {
179            input.reset();
180        }
181        self.executed = false;
182    }
183
184    fn name(&self) -> &'static str {
185        "CreateNode"
186    }
187}
188
189/// Operator that creates new edges.
190pub struct CreateEdgeOperator {
191    /// The graph store to modify.
192    store: Arc<LpgStore>,
193    /// Input operator.
194    input: Box<dyn Operator>,
195    /// Column index for the source node.
196    from_column: usize,
197    /// Column index for the target node.
198    to_column: usize,
199    /// Edge type.
200    edge_type: String,
201    /// Properties to set.
202    properties: Vec<(String, PropertySource)>,
203    /// Output schema.
204    output_schema: Vec<LogicalType>,
205    /// Column index for the created edge variable (if any).
206    output_column: Option<usize>,
207    /// Epoch for MVCC versioning.
208    viewing_epoch: Option<EpochId>,
209    /// Transaction ID for MVCC versioning.
210    tx_id: Option<TxId>,
211}
212
213impl CreateEdgeOperator {
214    /// Creates a new edge creation operator.
215    #[allow(clippy::too_many_arguments)]
216    pub fn new(
217        store: Arc<LpgStore>,
218        input: Box<dyn Operator>,
219        from_column: usize,
220        to_column: usize,
221        edge_type: String,
222        properties: Vec<(String, PropertySource)>,
223        output_schema: Vec<LogicalType>,
224        output_column: Option<usize>,
225    ) -> Self {
226        Self {
227            store,
228            input,
229            from_column,
230            to_column,
231            edge_type,
232            properties,
233            output_schema,
234            output_column,
235            viewing_epoch: None,
236            tx_id: None,
237        }
238    }
239
240    /// Sets the transaction context for MVCC versioning.
241    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
242        self.viewing_epoch = Some(epoch);
243        self.tx_id = tx_id;
244        self
245    }
246}
247
248impl Operator for CreateEdgeOperator {
249    fn next(&mut self) -> OperatorResult {
250        // Get transaction context for versioned creation
251        let epoch = self
252            .viewing_epoch
253            .unwrap_or_else(|| self.store.current_epoch());
254        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
255
256        if let Some(chunk) = self.input.next()? {
257            let mut builder =
258                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
259
260            for row in chunk.selected_indices() {
261                // Get source and target node IDs
262                let from_id = chunk
263                    .column(self.from_column)
264                    .and_then(|c| c.get_value(row))
265                    .ok_or_else(|| {
266                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
267                    })?;
268
269                let to_id = chunk
270                    .column(self.to_column)
271                    .and_then(|c| c.get_value(row))
272                    .ok_or_else(|| {
273                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
274                    })?;
275
276                // Extract node IDs
277                let from_node_id = match from_id {
278                    Value::Int64(id) => NodeId(id as u64),
279                    _ => {
280                        return Err(OperatorError::TypeMismatch {
281                            expected: "Int64 (node ID)".to_string(),
282                            found: format!("{from_id:?}"),
283                        });
284                    }
285                };
286
287                let to_node_id = match to_id {
288                    Value::Int64(id) => NodeId(id as u64),
289                    _ => {
290                        return Err(OperatorError::TypeMismatch {
291                            expected: "Int64 (node ID)".to_string(),
292                            found: format!("{to_id:?}"),
293                        });
294                    }
295                };
296
297                // Create the edge with MVCC versioning
298                let edge_id = self.store.create_edge_versioned(
299                    from_node_id,
300                    to_node_id,
301                    &self.edge_type,
302                    epoch,
303                    tx,
304                );
305
306                // Set properties
307                for (prop_name, source) in &self.properties {
308                    let value = match source {
309                        PropertySource::Column(col_idx) => chunk
310                            .column(*col_idx)
311                            .and_then(|c| c.get_value(row))
312                            .unwrap_or(Value::Null),
313                        PropertySource::Constant(v) => v.clone(),
314                    };
315                    self.store.set_edge_property(edge_id, prop_name, value);
316                }
317
318                // Copy input columns
319                for col_idx in 0..chunk.column_count() {
320                    if let (Some(src), Some(dst)) =
321                        (chunk.column(col_idx), builder.column_mut(col_idx))
322                    {
323                        if let Some(val) = src.get_value(row) {
324                            dst.push_value(val);
325                        } else {
326                            dst.push_value(Value::Null);
327                        }
328                    }
329                }
330
331                // Add edge ID if requested
332                if let Some(out_col) = self.output_column {
333                    if let Some(dst) = builder.column_mut(out_col) {
334                        dst.push_value(Value::Int64(edge_id.0 as i64));
335                    }
336                }
337
338                builder.advance_row();
339            }
340
341            return Ok(Some(builder.finish()));
342        }
343        Ok(None)
344    }
345
346    fn reset(&mut self) {
347        self.input.reset();
348    }
349
350    fn name(&self) -> &'static str {
351        "CreateEdge"
352    }
353}
354
355/// Operator that deletes nodes.
356pub struct DeleteNodeOperator {
357    /// The graph store to modify.
358    store: Arc<LpgStore>,
359    /// Input operator.
360    input: Box<dyn Operator>,
361    /// Column index for the node to delete.
362    node_column: usize,
363    /// Output schema.
364    output_schema: Vec<LogicalType>,
365    /// Whether to detach (delete connected edges) before deleting.
366    detach: bool,
367    /// Epoch for MVCC versioning.
368    viewing_epoch: Option<EpochId>,
369    /// Transaction ID for MVCC versioning (reserved for future use).
370    #[allow(dead_code)]
371    tx_id: Option<TxId>,
372}
373
374impl DeleteNodeOperator {
375    /// Creates a new node deletion operator.
376    pub fn new(
377        store: Arc<LpgStore>,
378        input: Box<dyn Operator>,
379        node_column: usize,
380        output_schema: Vec<LogicalType>,
381        detach: bool,
382    ) -> Self {
383        Self {
384            store,
385            input,
386            node_column,
387            output_schema,
388            detach,
389            viewing_epoch: None,
390            tx_id: None,
391        }
392    }
393
394    /// Sets the transaction context for MVCC versioning.
395    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
396        self.viewing_epoch = Some(epoch);
397        self.tx_id = tx_id;
398        self
399    }
400}
401
402impl Operator for DeleteNodeOperator {
403    fn next(&mut self) -> OperatorResult {
404        // Get transaction context for versioned deletion
405        let epoch = self
406            .viewing_epoch
407            .unwrap_or_else(|| self.store.current_epoch());
408
409        if let Some(chunk) = self.input.next()? {
410            let mut deleted_count = 0;
411
412            for row in chunk.selected_indices() {
413                let node_val = chunk
414                    .column(self.node_column)
415                    .and_then(|c| c.get_value(row))
416                    .ok_or_else(|| {
417                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
418                    })?;
419
420                let node_id = match node_val {
421                    Value::Int64(id) => NodeId(id as u64),
422                    _ => {
423                        return Err(OperatorError::TypeMismatch {
424                            expected: "Int64 (node ID)".to_string(),
425                            found: format!("{node_val:?}"),
426                        });
427                    }
428                };
429
430                if self.detach {
431                    // Delete all connected edges first
432                    // Note: Edge deletion will use epoch internally
433                    self.store.delete_node_edges(node_id);
434                }
435
436                // Delete the node with MVCC versioning
437                if self.store.delete_node_at_epoch(node_id, epoch) {
438                    deleted_count += 1;
439                }
440            }
441
442            // Return a chunk with the delete count
443            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
444            if let Some(dst) = builder.column_mut(0) {
445                dst.push_value(Value::Int64(deleted_count));
446            }
447            builder.advance_row();
448
449            return Ok(Some(builder.finish()));
450        }
451        Ok(None)
452    }
453
454    fn reset(&mut self) {
455        self.input.reset();
456    }
457
458    fn name(&self) -> &'static str {
459        "DeleteNode"
460    }
461}
462
463/// Operator that deletes edges.
464pub struct DeleteEdgeOperator {
465    /// The graph store to modify.
466    store: Arc<LpgStore>,
467    /// Input operator.
468    input: Box<dyn Operator>,
469    /// Column index for the edge to delete.
470    edge_column: usize,
471    /// Output schema.
472    output_schema: Vec<LogicalType>,
473    /// Epoch for MVCC versioning.
474    viewing_epoch: Option<EpochId>,
475    /// Transaction ID for MVCC versioning (reserved for future use).
476    #[allow(dead_code)]
477    tx_id: Option<TxId>,
478}
479
480impl DeleteEdgeOperator {
481    /// Creates a new edge deletion operator.
482    pub fn new(
483        store: Arc<LpgStore>,
484        input: Box<dyn Operator>,
485        edge_column: usize,
486        output_schema: Vec<LogicalType>,
487    ) -> Self {
488        Self {
489            store,
490            input,
491            edge_column,
492            output_schema,
493            viewing_epoch: None,
494            tx_id: None,
495        }
496    }
497
498    /// Sets the transaction context for MVCC versioning.
499    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
500        self.viewing_epoch = Some(epoch);
501        self.tx_id = tx_id;
502        self
503    }
504}
505
506impl Operator for DeleteEdgeOperator {
507    fn next(&mut self) -> OperatorResult {
508        // Get transaction context for versioned deletion
509        let epoch = self
510            .viewing_epoch
511            .unwrap_or_else(|| self.store.current_epoch());
512
513        if let Some(chunk) = self.input.next()? {
514            let mut deleted_count = 0;
515
516            for row in chunk.selected_indices() {
517                let edge_val = chunk
518                    .column(self.edge_column)
519                    .and_then(|c| c.get_value(row))
520                    .ok_or_else(|| {
521                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
522                    })?;
523
524                let edge_id = match edge_val {
525                    Value::Int64(id) => EdgeId(id as u64),
526                    _ => {
527                        return Err(OperatorError::TypeMismatch {
528                            expected: "Int64 (edge ID)".to_string(),
529                            found: format!("{edge_val:?}"),
530                        });
531                    }
532                };
533
534                // Delete the edge with MVCC versioning
535                if self.store.delete_edge_at_epoch(edge_id, epoch) {
536                    deleted_count += 1;
537                }
538            }
539
540            // Return a chunk with the delete count
541            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
542            if let Some(dst) = builder.column_mut(0) {
543                dst.push_value(Value::Int64(deleted_count));
544            }
545            builder.advance_row();
546
547            return Ok(Some(builder.finish()));
548        }
549        Ok(None)
550    }
551
552    fn reset(&mut self) {
553        self.input.reset();
554    }
555
556    fn name(&self) -> &'static str {
557        "DeleteEdge"
558    }
559}
560
561/// Operator that adds labels to nodes.
562pub struct AddLabelOperator {
563    /// The graph store.
564    store: Arc<LpgStore>,
565    /// Child operator providing nodes.
566    input: Box<dyn Operator>,
567    /// Column index containing node IDs.
568    node_column: usize,
569    /// Labels to add.
570    labels: Vec<String>,
571    /// Output schema.
572    output_schema: Vec<LogicalType>,
573}
574
575impl AddLabelOperator {
576    /// Creates a new add label operator.
577    pub fn new(
578        store: Arc<LpgStore>,
579        input: Box<dyn Operator>,
580        node_column: usize,
581        labels: Vec<String>,
582        output_schema: Vec<LogicalType>,
583    ) -> Self {
584        Self {
585            store,
586            input,
587            node_column,
588            labels,
589            output_schema,
590        }
591    }
592}
593
594impl Operator for AddLabelOperator {
595    fn next(&mut self) -> OperatorResult {
596        if let Some(chunk) = self.input.next()? {
597            let mut updated_count = 0;
598
599            for row in chunk.selected_indices() {
600                let node_val = chunk
601                    .column(self.node_column)
602                    .and_then(|c| c.get_value(row))
603                    .ok_or_else(|| {
604                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
605                    })?;
606
607                let node_id = match node_val {
608                    Value::Int64(id) => NodeId(id as u64),
609                    _ => {
610                        return Err(OperatorError::TypeMismatch {
611                            expected: "Int64 (node ID)".to_string(),
612                            found: format!("{node_val:?}"),
613                        });
614                    }
615                };
616
617                // Add all labels
618                for label in &self.labels {
619                    if self.store.add_label(node_id, label) {
620                        updated_count += 1;
621                    }
622                }
623            }
624
625            // Return a chunk with the update count
626            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627            if let Some(dst) = builder.column_mut(0) {
628                dst.push_value(Value::Int64(updated_count));
629            }
630            builder.advance_row();
631
632            return Ok(Some(builder.finish()));
633        }
634        Ok(None)
635    }
636
637    fn reset(&mut self) {
638        self.input.reset();
639    }
640
641    fn name(&self) -> &'static str {
642        "AddLabel"
643    }
644}
645
646/// Operator that removes labels from nodes.
647pub struct RemoveLabelOperator {
648    /// The graph store.
649    store: Arc<LpgStore>,
650    /// Child operator providing nodes.
651    input: Box<dyn Operator>,
652    /// Column index containing node IDs.
653    node_column: usize,
654    /// Labels to remove.
655    labels: Vec<String>,
656    /// Output schema.
657    output_schema: Vec<LogicalType>,
658}
659
660impl RemoveLabelOperator {
661    /// Creates a new remove label operator.
662    pub fn new(
663        store: Arc<LpgStore>,
664        input: Box<dyn Operator>,
665        node_column: usize,
666        labels: Vec<String>,
667        output_schema: Vec<LogicalType>,
668    ) -> Self {
669        Self {
670            store,
671            input,
672            node_column,
673            labels,
674            output_schema,
675        }
676    }
677}
678
679impl Operator for RemoveLabelOperator {
680    fn next(&mut self) -> OperatorResult {
681        if let Some(chunk) = self.input.next()? {
682            let mut updated_count = 0;
683
684            for row in chunk.selected_indices() {
685                let node_val = chunk
686                    .column(self.node_column)
687                    .and_then(|c| c.get_value(row))
688                    .ok_or_else(|| {
689                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
690                    })?;
691
692                let node_id = match node_val {
693                    Value::Int64(id) => NodeId(id as u64),
694                    _ => {
695                        return Err(OperatorError::TypeMismatch {
696                            expected: "Int64 (node ID)".to_string(),
697                            found: format!("{node_val:?}"),
698                        });
699                    }
700                };
701
702                // Remove all labels
703                for label in &self.labels {
704                    if self.store.remove_label(node_id, label) {
705                        updated_count += 1;
706                    }
707                }
708            }
709
710            // Return a chunk with the update count
711            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
712            if let Some(dst) = builder.column_mut(0) {
713                dst.push_value(Value::Int64(updated_count));
714            }
715            builder.advance_row();
716
717            return Ok(Some(builder.finish()));
718        }
719        Ok(None)
720    }
721
722    fn reset(&mut self) {
723        self.input.reset();
724    }
725
726    fn name(&self) -> &'static str {
727        "RemoveLabel"
728    }
729}
730
731/// Operator that sets properties on nodes or edges.
732///
733/// This operator reads node/edge IDs from a column and sets the
734/// specified properties on each entity.
735pub struct SetPropertyOperator {
736    /// The graph store.
737    store: Arc<LpgStore>,
738    /// Child operator providing entities.
739    input: Box<dyn Operator>,
740    /// Column index containing entity IDs (node or edge).
741    entity_column: usize,
742    /// Whether the entity is an edge (false = node).
743    is_edge: bool,
744    /// Properties to set (name -> source).
745    properties: Vec<(String, PropertySource)>,
746    /// Output schema.
747    output_schema: Vec<LogicalType>,
748}
749
750impl SetPropertyOperator {
751    /// Creates a new set property operator for nodes.
752    pub fn new_for_node(
753        store: Arc<LpgStore>,
754        input: Box<dyn Operator>,
755        node_column: usize,
756        properties: Vec<(String, PropertySource)>,
757        output_schema: Vec<LogicalType>,
758    ) -> Self {
759        Self {
760            store,
761            input,
762            entity_column: node_column,
763            is_edge: false,
764            properties,
765            output_schema,
766        }
767    }
768
769    /// Creates a new set property operator for edges.
770    pub fn new_for_edge(
771        store: Arc<LpgStore>,
772        input: Box<dyn Operator>,
773        edge_column: usize,
774        properties: Vec<(String, PropertySource)>,
775        output_schema: Vec<LogicalType>,
776    ) -> Self {
777        Self {
778            store,
779            input,
780            entity_column: edge_column,
781            is_edge: true,
782            properties,
783            output_schema,
784        }
785    }
786}
787
788impl Operator for SetPropertyOperator {
789    fn next(&mut self) -> OperatorResult {
790        if let Some(chunk) = self.input.next()? {
791            let mut builder =
792                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
793
794            for row in chunk.selected_indices() {
795                let entity_val = chunk
796                    .column(self.entity_column)
797                    .and_then(|c| c.get_value(row))
798                    .ok_or_else(|| {
799                        OperatorError::ColumnNotFound(format!(
800                            "entity column {}",
801                            self.entity_column
802                        ))
803                    })?;
804
805                let entity_id = match entity_val {
806                    Value::Int64(id) => id as u64,
807                    _ => {
808                        return Err(OperatorError::TypeMismatch {
809                            expected: "Int64 (entity ID)".to_string(),
810                            found: format!("{entity_val:?}"),
811                        });
812                    }
813                };
814
815                // Set all properties
816                for (prop_name, source) in &self.properties {
817                    let value = match source {
818                        PropertySource::Column(col_idx) => chunk
819                            .column(*col_idx)
820                            .and_then(|c| c.get_value(row))
821                            .unwrap_or(Value::Null),
822                        PropertySource::Constant(v) => v.clone(),
823                    };
824
825                    if self.is_edge {
826                        self.store
827                            .set_edge_property(EdgeId(entity_id), prop_name, value);
828                    } else {
829                        self.store
830                            .set_node_property(NodeId(entity_id), prop_name, value);
831                    }
832                }
833
834                // Copy input columns to output
835                for col_idx in 0..chunk.column_count() {
836                    if let (Some(src), Some(dst)) =
837                        (chunk.column(col_idx), builder.column_mut(col_idx))
838                    {
839                        if let Some(val) = src.get_value(row) {
840                            dst.push_value(val);
841                        } else {
842                            dst.push_value(Value::Null);
843                        }
844                    }
845                }
846
847                builder.advance_row();
848            }
849
850            return Ok(Some(builder.finish()));
851        }
852        Ok(None)
853    }
854
855    fn reset(&mut self) {
856        self.input.reset();
857    }
858
859    fn name(&self) -> &'static str {
860        "SetProperty"
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867    use crate::execution::DataChunk;
868    use crate::execution::chunk::DataChunkBuilder;
869
870    fn create_test_store() -> Arc<LpgStore> {
871        Arc::new(LpgStore::new())
872    }
873
874    #[test]
875    fn test_create_node_standalone() {
876        let store = create_test_store();
877
878        let mut op = CreateNodeOperator::new(
879            Arc::clone(&store),
880            None,
881            vec!["Person".to_string()],
882            vec![(
883                "name".to_string(),
884                PropertySource::Constant(Value::String("Alice".into())),
885            )],
886            vec![LogicalType::Int64],
887            0,
888        );
889
890        // First call should create a node
891        let chunk = op.next().unwrap().unwrap();
892        assert_eq!(chunk.row_count(), 1);
893
894        // Second call should return None
895        assert!(op.next().unwrap().is_none());
896
897        // Verify node was created
898        assert_eq!(store.node_count(), 1);
899    }
900
901    #[test]
902    fn test_create_edge() {
903        let store = create_test_store();
904
905        // Create two nodes first
906        let node1 = store.create_node(&["Person"]);
907        let node2 = store.create_node(&["Person"]);
908
909        // Create input chunk with node IDs
910        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
911        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
912        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
913        builder.advance_row();
914        let input_chunk = builder.finish();
915
916        // Mock input operator
917        struct MockInput {
918            chunk: Option<DataChunk>,
919        }
920        impl Operator for MockInput {
921            fn next(&mut self) -> OperatorResult {
922                Ok(self.chunk.take())
923            }
924            fn reset(&mut self) {}
925            fn name(&self) -> &'static str {
926                "MockInput"
927            }
928        }
929
930        let mut op = CreateEdgeOperator::new(
931            Arc::clone(&store),
932            Box::new(MockInput {
933                chunk: Some(input_chunk),
934            }),
935            0, // from column
936            1, // to column
937            "KNOWS".to_string(),
938            vec![],
939            vec![LogicalType::Int64, LogicalType::Int64],
940            None,
941        );
942
943        // Execute
944        let _chunk = op.next().unwrap().unwrap();
945
946        // Verify edge was created
947        assert_eq!(store.edge_count(), 1);
948    }
949
950    #[test]
951    fn test_delete_node() {
952        let store = create_test_store();
953
954        // Create a node
955        let node_id = store.create_node(&["Person"]);
956        assert_eq!(store.node_count(), 1);
957
958        // Create input chunk with the node ID
959        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
960        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
961        builder.advance_row();
962        let input_chunk = builder.finish();
963
964        struct MockInput {
965            chunk: Option<DataChunk>,
966        }
967        impl Operator for MockInput {
968            fn next(&mut self) -> OperatorResult {
969                Ok(self.chunk.take())
970            }
971            fn reset(&mut self) {}
972            fn name(&self) -> &'static str {
973                "MockInput"
974            }
975        }
976
977        let mut op = DeleteNodeOperator::new(
978            Arc::clone(&store),
979            Box::new(MockInput {
980                chunk: Some(input_chunk),
981            }),
982            0,
983            vec![LogicalType::Int64],
984            false,
985        );
986
987        // Execute
988        let chunk = op.next().unwrap().unwrap();
989
990        // Verify deletion
991        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
992        assert_eq!(deleted, 1);
993        assert_eq!(store.node_count(), 0);
994    }
995}