Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<LpgStore>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49}
50
51impl CreateNodeOperator {
52    /// Creates a new node creation operator.
53    ///
54    /// # Arguments
55    /// * `store` - The graph store to modify.
56    /// * `input` - Optional input operator (None for standalone CREATE).
57    /// * `labels` - Labels to assign to created nodes.
58    /// * `properties` - Properties to set on created nodes.
59    /// * `output_schema` - Schema of the output.
60    /// * `output_column` - Column index where the created node ID goes.
61    pub fn new(
62        store: Arc<LpgStore>,
63        input: Option<Box<dyn Operator>>,
64        labels: Vec<String>,
65        properties: Vec<(String, PropertySource)>,
66        output_schema: Vec<LogicalType>,
67        output_column: usize,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            labels,
73            properties,
74            output_schema,
75            output_column,
76            executed: false,
77            viewing_epoch: None,
78            tx_id: None,
79        }
80    }
81
82    /// Sets the transaction context for MVCC versioning.
83    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84        self.viewing_epoch = Some(epoch);
85        self.tx_id = tx_id;
86        self
87    }
88}
89
90impl Operator for CreateNodeOperator {
91    fn next(&mut self) -> OperatorResult {
92        // Get transaction context for versioned creation
93        let epoch = self
94            .viewing_epoch
95            .unwrap_or_else(|| self.store.current_epoch());
96        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98        if let Some(ref mut input) = self.input {
99            // For each input row, create a node
100            if let Some(chunk) = input.next()? {
101                let mut builder =
102                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104                for row in chunk.selected_indices() {
105                    // Create the node with MVCC versioning
106                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109                    // Set properties
110                    for (prop_name, source) in &self.properties {
111                        let value = match source {
112                            PropertySource::Column(col_idx) => chunk
113                                .column(*col_idx)
114                                .and_then(|c| c.get_value(row))
115                                .unwrap_or(Value::Null),
116                            PropertySource::Constant(v) => v.clone(),
117                        };
118                        self.store.set_node_property(node_id, prop_name, value);
119                    }
120
121                    // Copy input columns to output
122                    for col_idx in 0..chunk.column_count() {
123                        if col_idx < self.output_column {
124                            if let (Some(src), Some(dst)) =
125                                (chunk.column(col_idx), builder.column_mut(col_idx))
126                            {
127                                if let Some(val) = src.get_value(row) {
128                                    dst.push_value(val);
129                                } else {
130                                    dst.push_value(Value::Null);
131                                }
132                            }
133                        }
134                    }
135
136                    // Add the new node ID
137                    if let Some(dst) = builder.column_mut(self.output_column) {
138                        dst.push_value(Value::Int64(node_id.0 as i64));
139                    }
140
141                    builder.advance_row();
142                }
143
144                return Ok(Some(builder.finish()));
145            }
146            Ok(None)
147        } else {
148            // No input - create a single node
149            if self.executed {
150                return Ok(None);
151            }
152            self.executed = true;
153
154            // Create the node with MVCC versioning
155            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
156            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
157
158            // Set properties from constants only
159            for (prop_name, source) in &self.properties {
160                if let PropertySource::Constant(value) = source {
161                    self.store
162                        .set_node_property(node_id, prop_name, value.clone());
163                }
164            }
165
166            // Build output chunk with just the node ID
167            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
168            if let Some(dst) = builder.column_mut(self.output_column) {
169                dst.push_value(Value::Int64(node_id.0 as i64));
170            }
171            builder.advance_row();
172
173            Ok(Some(builder.finish()))
174        }
175    }
176
177    fn reset(&mut self) {
178        if let Some(ref mut input) = self.input {
179            input.reset();
180        }
181        self.executed = false;
182    }
183
184    fn name(&self) -> &'static str {
185        "CreateNode"
186    }
187}
188
189/// Operator that creates new edges.
190pub struct CreateEdgeOperator {
191    /// The graph store to modify.
192    store: Arc<LpgStore>,
193    /// Input operator.
194    input: Box<dyn Operator>,
195    /// Column index for the source node.
196    from_column: usize,
197    /// Column index for the target node.
198    to_column: usize,
199    /// Edge type.
200    edge_type: String,
201    /// Properties to set.
202    properties: Vec<(String, PropertySource)>,
203    /// Output schema.
204    output_schema: Vec<LogicalType>,
205    /// Column index for the created edge variable (if any).
206    output_column: Option<usize>,
207    /// Epoch for MVCC versioning.
208    viewing_epoch: Option<EpochId>,
209    /// Transaction ID for MVCC versioning.
210    tx_id: Option<TxId>,
211}
212
213impl CreateEdgeOperator {
214    /// Creates a new edge creation operator.
215    ///
216    /// Use builder methods to set additional options:
217    /// - [`with_properties`](Self::with_properties) - set edge properties
218    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
219    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
220    pub fn new(
221        store: Arc<LpgStore>,
222        input: Box<dyn Operator>,
223        from_column: usize,
224        to_column: usize,
225        edge_type: String,
226        output_schema: Vec<LogicalType>,
227    ) -> Self {
228        Self {
229            store,
230            input,
231            from_column,
232            to_column,
233            edge_type,
234            properties: Vec::new(),
235            output_schema,
236            output_column: None,
237            viewing_epoch: None,
238            tx_id: None,
239        }
240    }
241
242    /// Sets the properties to assign to created edges.
243    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
244        self.properties = properties;
245        self
246    }
247
248    /// Sets the output column for the created edge ID.
249    pub fn with_output_column(mut self, column: usize) -> Self {
250        self.output_column = Some(column);
251        self
252    }
253
254    /// Sets the transaction context for MVCC versioning.
255    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
256        self.viewing_epoch = Some(epoch);
257        self.tx_id = tx_id;
258        self
259    }
260}
261
262impl Operator for CreateEdgeOperator {
263    fn next(&mut self) -> OperatorResult {
264        // Get transaction context for versioned creation
265        let epoch = self
266            .viewing_epoch
267            .unwrap_or_else(|| self.store.current_epoch());
268        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
269
270        if let Some(chunk) = self.input.next()? {
271            let mut builder =
272                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
273
274            for row in chunk.selected_indices() {
275                // Get source and target node IDs
276                let from_id = chunk
277                    .column(self.from_column)
278                    .and_then(|c| c.get_value(row))
279                    .ok_or_else(|| {
280                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
281                    })?;
282
283                let to_id = chunk
284                    .column(self.to_column)
285                    .and_then(|c| c.get_value(row))
286                    .ok_or_else(|| {
287                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
288                    })?;
289
290                // Extract node IDs
291                let from_node_id = match from_id {
292                    Value::Int64(id) => NodeId(id as u64),
293                    _ => {
294                        return Err(OperatorError::TypeMismatch {
295                            expected: "Int64 (node ID)".to_string(),
296                            found: format!("{from_id:?}"),
297                        });
298                    }
299                };
300
301                let to_node_id = match to_id {
302                    Value::Int64(id) => NodeId(id as u64),
303                    _ => {
304                        return Err(OperatorError::TypeMismatch {
305                            expected: "Int64 (node ID)".to_string(),
306                            found: format!("{to_id:?}"),
307                        });
308                    }
309                };
310
311                // Create the edge with MVCC versioning
312                let edge_id = self.store.create_edge_versioned(
313                    from_node_id,
314                    to_node_id,
315                    &self.edge_type,
316                    epoch,
317                    tx,
318                );
319
320                // Set properties
321                for (prop_name, source) in &self.properties {
322                    let value = match source {
323                        PropertySource::Column(col_idx) => chunk
324                            .column(*col_idx)
325                            .and_then(|c| c.get_value(row))
326                            .unwrap_or(Value::Null),
327                        PropertySource::Constant(v) => v.clone(),
328                    };
329                    self.store.set_edge_property(edge_id, prop_name, value);
330                }
331
332                // Copy input columns
333                for col_idx in 0..chunk.column_count() {
334                    if let (Some(src), Some(dst)) =
335                        (chunk.column(col_idx), builder.column_mut(col_idx))
336                    {
337                        if let Some(val) = src.get_value(row) {
338                            dst.push_value(val);
339                        } else {
340                            dst.push_value(Value::Null);
341                        }
342                    }
343                }
344
345                // Add edge ID if requested
346                if let Some(out_col) = self.output_column {
347                    if let Some(dst) = builder.column_mut(out_col) {
348                        dst.push_value(Value::Int64(edge_id.0 as i64));
349                    }
350                }
351
352                builder.advance_row();
353            }
354
355            return Ok(Some(builder.finish()));
356        }
357        Ok(None)
358    }
359
360    fn reset(&mut self) {
361        self.input.reset();
362    }
363
364    fn name(&self) -> &'static str {
365        "CreateEdge"
366    }
367}
368
369/// Operator that deletes nodes.
370pub struct DeleteNodeOperator {
371    /// The graph store to modify.
372    store: Arc<LpgStore>,
373    /// Input operator.
374    input: Box<dyn Operator>,
375    /// Column index for the node to delete.
376    node_column: usize,
377    /// Output schema.
378    output_schema: Vec<LogicalType>,
379    /// Whether to detach (delete connected edges) before deleting.
380    detach: bool,
381    /// Epoch for MVCC versioning.
382    viewing_epoch: Option<EpochId>,
383    /// Transaction ID for MVCC versioning (reserved for future use).
384    #[allow(dead_code)]
385    tx_id: Option<TxId>,
386}
387
388impl DeleteNodeOperator {
389    /// Creates a new node deletion operator.
390    pub fn new(
391        store: Arc<LpgStore>,
392        input: Box<dyn Operator>,
393        node_column: usize,
394        output_schema: Vec<LogicalType>,
395        detach: bool,
396    ) -> Self {
397        Self {
398            store,
399            input,
400            node_column,
401            output_schema,
402            detach,
403            viewing_epoch: None,
404            tx_id: None,
405        }
406    }
407
408    /// Sets the transaction context for MVCC versioning.
409    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
410        self.viewing_epoch = Some(epoch);
411        self.tx_id = tx_id;
412        self
413    }
414}
415
416impl Operator for DeleteNodeOperator {
417    fn next(&mut self) -> OperatorResult {
418        // Get transaction context for versioned deletion
419        let epoch = self
420            .viewing_epoch
421            .unwrap_or_else(|| self.store.current_epoch());
422
423        if let Some(chunk) = self.input.next()? {
424            let mut deleted_count = 0;
425
426            for row in chunk.selected_indices() {
427                let node_val = chunk
428                    .column(self.node_column)
429                    .and_then(|c| c.get_value(row))
430                    .ok_or_else(|| {
431                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
432                    })?;
433
434                let node_id = match node_val {
435                    Value::Int64(id) => NodeId(id as u64),
436                    _ => {
437                        return Err(OperatorError::TypeMismatch {
438                            expected: "Int64 (node ID)".to_string(),
439                            found: format!("{node_val:?}"),
440                        });
441                    }
442                };
443
444                if self.detach {
445                    // Delete all connected edges first
446                    // Note: Edge deletion will use epoch internally
447                    self.store.delete_node_edges(node_id);
448                }
449
450                // Delete the node with MVCC versioning
451                if self.store.delete_node_at_epoch(node_id, epoch) {
452                    deleted_count += 1;
453                }
454            }
455
456            // Return a chunk with the delete count
457            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
458            if let Some(dst) = builder.column_mut(0) {
459                dst.push_value(Value::Int64(deleted_count));
460            }
461            builder.advance_row();
462
463            return Ok(Some(builder.finish()));
464        }
465        Ok(None)
466    }
467
468    fn reset(&mut self) {
469        self.input.reset();
470    }
471
472    fn name(&self) -> &'static str {
473        "DeleteNode"
474    }
475}
476
477/// Operator that deletes edges.
478pub struct DeleteEdgeOperator {
479    /// The graph store to modify.
480    store: Arc<LpgStore>,
481    /// Input operator.
482    input: Box<dyn Operator>,
483    /// Column index for the edge to delete.
484    edge_column: usize,
485    /// Output schema.
486    output_schema: Vec<LogicalType>,
487    /// Epoch for MVCC versioning.
488    viewing_epoch: Option<EpochId>,
489    /// Transaction ID for MVCC versioning (reserved for future use).
490    #[allow(dead_code)]
491    tx_id: Option<TxId>,
492}
493
494impl DeleteEdgeOperator {
495    /// Creates a new edge deletion operator.
496    pub fn new(
497        store: Arc<LpgStore>,
498        input: Box<dyn Operator>,
499        edge_column: usize,
500        output_schema: Vec<LogicalType>,
501    ) -> Self {
502        Self {
503            store,
504            input,
505            edge_column,
506            output_schema,
507            viewing_epoch: None,
508            tx_id: None,
509        }
510    }
511
512    /// Sets the transaction context for MVCC versioning.
513    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
514        self.viewing_epoch = Some(epoch);
515        self.tx_id = tx_id;
516        self
517    }
518}
519
520impl Operator for DeleteEdgeOperator {
521    fn next(&mut self) -> OperatorResult {
522        // Get transaction context for versioned deletion
523        let epoch = self
524            .viewing_epoch
525            .unwrap_or_else(|| self.store.current_epoch());
526
527        if let Some(chunk) = self.input.next()? {
528            let mut deleted_count = 0;
529
530            for row in chunk.selected_indices() {
531                let edge_val = chunk
532                    .column(self.edge_column)
533                    .and_then(|c| c.get_value(row))
534                    .ok_or_else(|| {
535                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
536                    })?;
537
538                let edge_id = match edge_val {
539                    Value::Int64(id) => EdgeId(id as u64),
540                    _ => {
541                        return Err(OperatorError::TypeMismatch {
542                            expected: "Int64 (edge ID)".to_string(),
543                            found: format!("{edge_val:?}"),
544                        });
545                    }
546                };
547
548                // Delete the edge with MVCC versioning
549                if self.store.delete_edge_at_epoch(edge_id, epoch) {
550                    deleted_count += 1;
551                }
552            }
553
554            // Return a chunk with the delete count
555            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
556            if let Some(dst) = builder.column_mut(0) {
557                dst.push_value(Value::Int64(deleted_count));
558            }
559            builder.advance_row();
560
561            return Ok(Some(builder.finish()));
562        }
563        Ok(None)
564    }
565
566    fn reset(&mut self) {
567        self.input.reset();
568    }
569
570    fn name(&self) -> &'static str {
571        "DeleteEdge"
572    }
573}
574
575/// Operator that adds labels to nodes.
576pub struct AddLabelOperator {
577    /// The graph store.
578    store: Arc<LpgStore>,
579    /// Child operator providing nodes.
580    input: Box<dyn Operator>,
581    /// Column index containing node IDs.
582    node_column: usize,
583    /// Labels to add.
584    labels: Vec<String>,
585    /// Output schema.
586    output_schema: Vec<LogicalType>,
587}
588
589impl AddLabelOperator {
590    /// Creates a new add label operator.
591    pub fn new(
592        store: Arc<LpgStore>,
593        input: Box<dyn Operator>,
594        node_column: usize,
595        labels: Vec<String>,
596        output_schema: Vec<LogicalType>,
597    ) -> Self {
598        Self {
599            store,
600            input,
601            node_column,
602            labels,
603            output_schema,
604        }
605    }
606}
607
608impl Operator for AddLabelOperator {
609    fn next(&mut self) -> OperatorResult {
610        if let Some(chunk) = self.input.next()? {
611            let mut updated_count = 0;
612
613            for row in chunk.selected_indices() {
614                let node_val = chunk
615                    .column(self.node_column)
616                    .and_then(|c| c.get_value(row))
617                    .ok_or_else(|| {
618                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
619                    })?;
620
621                let node_id = match node_val {
622                    Value::Int64(id) => NodeId(id as u64),
623                    _ => {
624                        return Err(OperatorError::TypeMismatch {
625                            expected: "Int64 (node ID)".to_string(),
626                            found: format!("{node_val:?}"),
627                        });
628                    }
629                };
630
631                // Add all labels
632                for label in &self.labels {
633                    if self.store.add_label(node_id, label) {
634                        updated_count += 1;
635                    }
636                }
637            }
638
639            // Return a chunk with the update count
640            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
641            if let Some(dst) = builder.column_mut(0) {
642                dst.push_value(Value::Int64(updated_count));
643            }
644            builder.advance_row();
645
646            return Ok(Some(builder.finish()));
647        }
648        Ok(None)
649    }
650
651    fn reset(&mut self) {
652        self.input.reset();
653    }
654
655    fn name(&self) -> &'static str {
656        "AddLabel"
657    }
658}
659
660/// Operator that removes labels from nodes.
661pub struct RemoveLabelOperator {
662    /// The graph store.
663    store: Arc<LpgStore>,
664    /// Child operator providing nodes.
665    input: Box<dyn Operator>,
666    /// Column index containing node IDs.
667    node_column: usize,
668    /// Labels to remove.
669    labels: Vec<String>,
670    /// Output schema.
671    output_schema: Vec<LogicalType>,
672}
673
674impl RemoveLabelOperator {
675    /// Creates a new remove label operator.
676    pub fn new(
677        store: Arc<LpgStore>,
678        input: Box<dyn Operator>,
679        node_column: usize,
680        labels: Vec<String>,
681        output_schema: Vec<LogicalType>,
682    ) -> Self {
683        Self {
684            store,
685            input,
686            node_column,
687            labels,
688            output_schema,
689        }
690    }
691}
692
693impl Operator for RemoveLabelOperator {
694    fn next(&mut self) -> OperatorResult {
695        if let Some(chunk) = self.input.next()? {
696            let mut updated_count = 0;
697
698            for row in chunk.selected_indices() {
699                let node_val = chunk
700                    .column(self.node_column)
701                    .and_then(|c| c.get_value(row))
702                    .ok_or_else(|| {
703                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
704                    })?;
705
706                let node_id = match node_val {
707                    Value::Int64(id) => NodeId(id as u64),
708                    _ => {
709                        return Err(OperatorError::TypeMismatch {
710                            expected: "Int64 (node ID)".to_string(),
711                            found: format!("{node_val:?}"),
712                        });
713                    }
714                };
715
716                // Remove all labels
717                for label in &self.labels {
718                    if self.store.remove_label(node_id, label) {
719                        updated_count += 1;
720                    }
721                }
722            }
723
724            // Return a chunk with the update count
725            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
726            if let Some(dst) = builder.column_mut(0) {
727                dst.push_value(Value::Int64(updated_count));
728            }
729            builder.advance_row();
730
731            return Ok(Some(builder.finish()));
732        }
733        Ok(None)
734    }
735
736    fn reset(&mut self) {
737        self.input.reset();
738    }
739
740    fn name(&self) -> &'static str {
741        "RemoveLabel"
742    }
743}
744
745/// Operator that sets properties on nodes or edges.
746///
747/// This operator reads node/edge IDs from a column and sets the
748/// specified properties on each entity.
749pub struct SetPropertyOperator {
750    /// The graph store.
751    store: Arc<LpgStore>,
752    /// Child operator providing entities.
753    input: Box<dyn Operator>,
754    /// Column index containing entity IDs (node or edge).
755    entity_column: usize,
756    /// Whether the entity is an edge (false = node).
757    is_edge: bool,
758    /// Properties to set (name -> source).
759    properties: Vec<(String, PropertySource)>,
760    /// Output schema.
761    output_schema: Vec<LogicalType>,
762}
763
764impl SetPropertyOperator {
765    /// Creates a new set property operator for nodes.
766    pub fn new_for_node(
767        store: Arc<LpgStore>,
768        input: Box<dyn Operator>,
769        node_column: usize,
770        properties: Vec<(String, PropertySource)>,
771        output_schema: Vec<LogicalType>,
772    ) -> Self {
773        Self {
774            store,
775            input,
776            entity_column: node_column,
777            is_edge: false,
778            properties,
779            output_schema,
780        }
781    }
782
783    /// Creates a new set property operator for edges.
784    pub fn new_for_edge(
785        store: Arc<LpgStore>,
786        input: Box<dyn Operator>,
787        edge_column: usize,
788        properties: Vec<(String, PropertySource)>,
789        output_schema: Vec<LogicalType>,
790    ) -> Self {
791        Self {
792            store,
793            input,
794            entity_column: edge_column,
795            is_edge: true,
796            properties,
797            output_schema,
798        }
799    }
800}
801
802impl Operator for SetPropertyOperator {
803    fn next(&mut self) -> OperatorResult {
804        if let Some(chunk) = self.input.next()? {
805            let mut builder =
806                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
807
808            for row in chunk.selected_indices() {
809                let entity_val = chunk
810                    .column(self.entity_column)
811                    .and_then(|c| c.get_value(row))
812                    .ok_or_else(|| {
813                        OperatorError::ColumnNotFound(format!(
814                            "entity column {}",
815                            self.entity_column
816                        ))
817                    })?;
818
819                let entity_id = match entity_val {
820                    Value::Int64(id) => id as u64,
821                    _ => {
822                        return Err(OperatorError::TypeMismatch {
823                            expected: "Int64 (entity ID)".to_string(),
824                            found: format!("{entity_val:?}"),
825                        });
826                    }
827                };
828
829                // Set all properties
830                for (prop_name, source) in &self.properties {
831                    let value = match source {
832                        PropertySource::Column(col_idx) => chunk
833                            .column(*col_idx)
834                            .and_then(|c| c.get_value(row))
835                            .unwrap_or(Value::Null),
836                        PropertySource::Constant(v) => v.clone(),
837                    };
838
839                    if self.is_edge {
840                        self.store
841                            .set_edge_property(EdgeId(entity_id), prop_name, value);
842                    } else {
843                        self.store
844                            .set_node_property(NodeId(entity_id), prop_name, value);
845                    }
846                }
847
848                // Copy input columns to output
849                for col_idx in 0..chunk.column_count() {
850                    if let (Some(src), Some(dst)) =
851                        (chunk.column(col_idx), builder.column_mut(col_idx))
852                    {
853                        if let Some(val) = src.get_value(row) {
854                            dst.push_value(val);
855                        } else {
856                            dst.push_value(Value::Null);
857                        }
858                    }
859                }
860
861                builder.advance_row();
862            }
863
864            return Ok(Some(builder.finish()));
865        }
866        Ok(None)
867    }
868
869    fn reset(&mut self) {
870        self.input.reset();
871    }
872
873    fn name(&self) -> &'static str {
874        "SetProperty"
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use crate::execution::DataChunk;
882    use crate::execution::chunk::DataChunkBuilder;
883
884    fn create_test_store() -> Arc<LpgStore> {
885        Arc::new(LpgStore::new())
886    }
887
888    #[test]
889    fn test_create_node_standalone() {
890        let store = create_test_store();
891
892        let mut op = CreateNodeOperator::new(
893            Arc::clone(&store),
894            None,
895            vec!["Person".to_string()],
896            vec![(
897                "name".to_string(),
898                PropertySource::Constant(Value::String("Alice".into())),
899            )],
900            vec![LogicalType::Int64],
901            0,
902        );
903
904        // First call should create a node
905        let chunk = op.next().unwrap().unwrap();
906        assert_eq!(chunk.row_count(), 1);
907
908        // Second call should return None
909        assert!(op.next().unwrap().is_none());
910
911        // Verify node was created
912        assert_eq!(store.node_count(), 1);
913    }
914
915    #[test]
916    fn test_create_edge() {
917        let store = create_test_store();
918
919        // Create two nodes first
920        let node1 = store.create_node(&["Person"]);
921        let node2 = store.create_node(&["Person"]);
922
923        // Create input chunk with node IDs
924        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
925        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
926        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
927        builder.advance_row();
928        let input_chunk = builder.finish();
929
930        // Mock input operator
931        struct MockInput {
932            chunk: Option<DataChunk>,
933        }
934        impl Operator for MockInput {
935            fn next(&mut self) -> OperatorResult {
936                Ok(self.chunk.take())
937            }
938            fn reset(&mut self) {}
939            fn name(&self) -> &'static str {
940                "MockInput"
941            }
942        }
943
944        let mut op = CreateEdgeOperator::new(
945            Arc::clone(&store),
946            Box::new(MockInput {
947                chunk: Some(input_chunk),
948            }),
949            0, // from column
950            1, // to column
951            "KNOWS".to_string(),
952            vec![LogicalType::Int64, LogicalType::Int64],
953        );
954
955        // Execute
956        let _chunk = op.next().unwrap().unwrap();
957
958        // Verify edge was created
959        assert_eq!(store.edge_count(), 1);
960    }
961
962    #[test]
963    fn test_delete_node() {
964        let store = create_test_store();
965
966        // Create a node
967        let node_id = store.create_node(&["Person"]);
968        assert_eq!(store.node_count(), 1);
969
970        // Create input chunk with the node ID
971        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
972        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
973        builder.advance_row();
974        let input_chunk = builder.finish();
975
976        struct MockInput {
977            chunk: Option<DataChunk>,
978        }
979        impl Operator for MockInput {
980            fn next(&mut self) -> OperatorResult {
981                Ok(self.chunk.take())
982            }
983            fn reset(&mut self) {}
984            fn name(&self) -> &'static str {
985                "MockInput"
986            }
987        }
988
989        let mut op = DeleteNodeOperator::new(
990            Arc::clone(&store),
991            Box::new(MockInput {
992                chunk: Some(input_chunk),
993            }),
994            0,
995            vec![LogicalType::Int64],
996            false,
997        );
998
999        // Execute
1000        let chunk = op.next().unwrap().unwrap();
1001
1002        // Verify deletion
1003        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1004        assert_eq!(deleted, 1);
1005        assert_eq!(store.node_count(), 0);
1006    }
1007}