Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<LpgStore>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49}
50
51impl CreateNodeOperator {
52    /// Creates a new node creation operator.
53    ///
54    /// # Arguments
55    /// * `store` - The graph store to modify.
56    /// * `input` - Optional input operator (None for standalone CREATE).
57    /// * `labels` - Labels to assign to created nodes.
58    /// * `properties` - Properties to set on created nodes.
59    /// * `output_schema` - Schema of the output.
60    /// * `output_column` - Column index where the created node ID goes.
61    pub fn new(
62        store: Arc<LpgStore>,
63        input: Option<Box<dyn Operator>>,
64        labels: Vec<String>,
65        properties: Vec<(String, PropertySource)>,
66        output_schema: Vec<LogicalType>,
67        output_column: usize,
68    ) -> Self {
69        Self {
70            store,
71            input,
72            labels,
73            properties,
74            output_schema,
75            output_column,
76            executed: false,
77            viewing_epoch: None,
78            tx_id: None,
79        }
80    }
81
82    /// Sets the transaction context for MVCC versioning.
83    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84        self.viewing_epoch = Some(epoch);
85        self.tx_id = tx_id;
86        self
87    }
88}
89
90impl Operator for CreateNodeOperator {
91    fn next(&mut self) -> OperatorResult {
92        // Get transaction context for versioned creation
93        let epoch = self
94            .viewing_epoch
95            .unwrap_or_else(|| self.store.current_epoch());
96        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98        if let Some(ref mut input) = self.input {
99            // For each input row, create a node
100            if let Some(chunk) = input.next()? {
101                let mut builder =
102                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104                for row in chunk.selected_indices() {
105                    // Create the node with MVCC versioning
106                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109                    // Set properties
110                    for (prop_name, source) in &self.properties {
111                        let value = match source {
112                            PropertySource::Column(col_idx) => chunk
113                                .column(*col_idx)
114                                .and_then(|c| c.get_value(row))
115                                .unwrap_or(Value::Null),
116                            PropertySource::Constant(v) => v.clone(),
117                        };
118                        self.store.set_node_property(node_id, prop_name, value);
119                    }
120
121                    // Copy input columns to output
122                    for col_idx in 0..chunk.column_count() {
123                        if col_idx < self.output_column
124                            && let (Some(src), Some(dst)) =
125                                (chunk.column(col_idx), builder.column_mut(col_idx))
126                        {
127                            if let Some(val) = src.get_value(row) {
128                                dst.push_value(val);
129                            } else {
130                                dst.push_value(Value::Null);
131                            }
132                        }
133                    }
134
135                    // Add the new node ID
136                    if let Some(dst) = builder.column_mut(self.output_column) {
137                        dst.push_value(Value::Int64(node_id.0 as i64));
138                    }
139
140                    builder.advance_row();
141                }
142
143                return Ok(Some(builder.finish()));
144            }
145            Ok(None)
146        } else {
147            // No input - create a single node
148            if self.executed {
149                return Ok(None);
150            }
151            self.executed = true;
152
153            // Create the node with MVCC versioning
154            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
155            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
156
157            // Set properties from constants only
158            for (prop_name, source) in &self.properties {
159                if let PropertySource::Constant(value) = source {
160                    self.store
161                        .set_node_property(node_id, prop_name, value.clone());
162                }
163            }
164
165            // Build output chunk with just the node ID
166            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
167            if let Some(dst) = builder.column_mut(self.output_column) {
168                dst.push_value(Value::Int64(node_id.0 as i64));
169            }
170            builder.advance_row();
171
172            Ok(Some(builder.finish()))
173        }
174    }
175
176    fn reset(&mut self) {
177        if let Some(ref mut input) = self.input {
178            input.reset();
179        }
180        self.executed = false;
181    }
182
183    fn name(&self) -> &'static str {
184        "CreateNode"
185    }
186}
187
188/// Operator that creates new edges.
189pub struct CreateEdgeOperator {
190    /// The graph store to modify.
191    store: Arc<LpgStore>,
192    /// Input operator.
193    input: Box<dyn Operator>,
194    /// Column index for the source node.
195    from_column: usize,
196    /// Column index for the target node.
197    to_column: usize,
198    /// Edge type.
199    edge_type: String,
200    /// Properties to set.
201    properties: Vec<(String, PropertySource)>,
202    /// Output schema.
203    output_schema: Vec<LogicalType>,
204    /// Column index for the created edge variable (if any).
205    output_column: Option<usize>,
206    /// Epoch for MVCC versioning.
207    viewing_epoch: Option<EpochId>,
208    /// Transaction ID for MVCC versioning.
209    tx_id: Option<TxId>,
210}
211
212impl CreateEdgeOperator {
213    /// Creates a new edge creation operator.
214    ///
215    /// Use builder methods to set additional options:
216    /// - [`with_properties`](Self::with_properties) - set edge properties
217    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
218    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
219    pub fn new(
220        store: Arc<LpgStore>,
221        input: Box<dyn Operator>,
222        from_column: usize,
223        to_column: usize,
224        edge_type: String,
225        output_schema: Vec<LogicalType>,
226    ) -> Self {
227        Self {
228            store,
229            input,
230            from_column,
231            to_column,
232            edge_type,
233            properties: Vec::new(),
234            output_schema,
235            output_column: None,
236            viewing_epoch: None,
237            tx_id: None,
238        }
239    }
240
241    /// Sets the properties to assign to created edges.
242    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
243        self.properties = properties;
244        self
245    }
246
247    /// Sets the output column for the created edge ID.
248    pub fn with_output_column(mut self, column: usize) -> Self {
249        self.output_column = Some(column);
250        self
251    }
252
253    /// Sets the transaction context for MVCC versioning.
254    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
255        self.viewing_epoch = Some(epoch);
256        self.tx_id = tx_id;
257        self
258    }
259}
260
261impl Operator for CreateEdgeOperator {
262    fn next(&mut self) -> OperatorResult {
263        // Get transaction context for versioned creation
264        let epoch = self
265            .viewing_epoch
266            .unwrap_or_else(|| self.store.current_epoch());
267        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
268
269        if let Some(chunk) = self.input.next()? {
270            let mut builder =
271                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
272
273            for row in chunk.selected_indices() {
274                // Get source and target node IDs
275                let from_id = chunk
276                    .column(self.from_column)
277                    .and_then(|c| c.get_value(row))
278                    .ok_or_else(|| {
279                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
280                    })?;
281
282                let to_id = chunk
283                    .column(self.to_column)
284                    .and_then(|c| c.get_value(row))
285                    .ok_or_else(|| {
286                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
287                    })?;
288
289                // Extract node IDs
290                let from_node_id = match from_id {
291                    Value::Int64(id) => NodeId(id as u64),
292                    _ => {
293                        return Err(OperatorError::TypeMismatch {
294                            expected: "Int64 (node ID)".to_string(),
295                            found: format!("{from_id:?}"),
296                        });
297                    }
298                };
299
300                let to_node_id = match to_id {
301                    Value::Int64(id) => NodeId(id as u64),
302                    _ => {
303                        return Err(OperatorError::TypeMismatch {
304                            expected: "Int64 (node ID)".to_string(),
305                            found: format!("{to_id:?}"),
306                        });
307                    }
308                };
309
310                // Create the edge with MVCC versioning
311                let edge_id = self.store.create_edge_versioned(
312                    from_node_id,
313                    to_node_id,
314                    &self.edge_type,
315                    epoch,
316                    tx,
317                );
318
319                // Set properties
320                for (prop_name, source) in &self.properties {
321                    let value = match source {
322                        PropertySource::Column(col_idx) => chunk
323                            .column(*col_idx)
324                            .and_then(|c| c.get_value(row))
325                            .unwrap_or(Value::Null),
326                        PropertySource::Constant(v) => v.clone(),
327                    };
328                    self.store.set_edge_property(edge_id, prop_name, value);
329                }
330
331                // Copy input columns
332                for col_idx in 0..chunk.column_count() {
333                    if let (Some(src), Some(dst)) =
334                        (chunk.column(col_idx), builder.column_mut(col_idx))
335                    {
336                        if let Some(val) = src.get_value(row) {
337                            dst.push_value(val);
338                        } else {
339                            dst.push_value(Value::Null);
340                        }
341                    }
342                }
343
344                // Add edge ID if requested
345                if let Some(out_col) = self.output_column
346                    && let Some(dst) = builder.column_mut(out_col)
347                {
348                    dst.push_value(Value::Int64(edge_id.0 as i64));
349                }
350
351                builder.advance_row();
352            }
353
354            return Ok(Some(builder.finish()));
355        }
356        Ok(None)
357    }
358
359    fn reset(&mut self) {
360        self.input.reset();
361    }
362
363    fn name(&self) -> &'static str {
364        "CreateEdge"
365    }
366}
367
368/// Operator that deletes nodes.
369pub struct DeleteNodeOperator {
370    /// The graph store to modify.
371    store: Arc<LpgStore>,
372    /// Input operator.
373    input: Box<dyn Operator>,
374    /// Column index for the node to delete.
375    node_column: usize,
376    /// Output schema.
377    output_schema: Vec<LogicalType>,
378    /// Whether to detach (delete connected edges) before deleting.
379    detach: bool,
380    /// Epoch for MVCC versioning.
381    viewing_epoch: Option<EpochId>,
382    /// Transaction ID for MVCC versioning (reserved for future use).
383    #[allow(dead_code)]
384    tx_id: Option<TxId>,
385}
386
387impl DeleteNodeOperator {
388    /// Creates a new node deletion operator.
389    pub fn new(
390        store: Arc<LpgStore>,
391        input: Box<dyn Operator>,
392        node_column: usize,
393        output_schema: Vec<LogicalType>,
394        detach: bool,
395    ) -> Self {
396        Self {
397            store,
398            input,
399            node_column,
400            output_schema,
401            detach,
402            viewing_epoch: None,
403            tx_id: None,
404        }
405    }
406
407    /// Sets the transaction context for MVCC versioning.
408    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
409        self.viewing_epoch = Some(epoch);
410        self.tx_id = tx_id;
411        self
412    }
413}
414
415impl Operator for DeleteNodeOperator {
416    fn next(&mut self) -> OperatorResult {
417        // Get transaction context for versioned deletion
418        let epoch = self
419            .viewing_epoch
420            .unwrap_or_else(|| self.store.current_epoch());
421
422        if let Some(chunk) = self.input.next()? {
423            let mut deleted_count = 0;
424
425            for row in chunk.selected_indices() {
426                let node_val = chunk
427                    .column(self.node_column)
428                    .and_then(|c| c.get_value(row))
429                    .ok_or_else(|| {
430                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
431                    })?;
432
433                let node_id = match node_val {
434                    Value::Int64(id) => NodeId(id as u64),
435                    _ => {
436                        return Err(OperatorError::TypeMismatch {
437                            expected: "Int64 (node ID)".to_string(),
438                            found: format!("{node_val:?}"),
439                        });
440                    }
441                };
442
443                if self.detach {
444                    // Delete all connected edges first
445                    // Note: Edge deletion will use epoch internally
446                    self.store.delete_node_edges(node_id);
447                }
448
449                // Delete the node with MVCC versioning
450                if self.store.delete_node_at_epoch(node_id, epoch) {
451                    deleted_count += 1;
452                }
453            }
454
455            // Return a chunk with the delete count
456            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
457            if let Some(dst) = builder.column_mut(0) {
458                dst.push_value(Value::Int64(deleted_count));
459            }
460            builder.advance_row();
461
462            return Ok(Some(builder.finish()));
463        }
464        Ok(None)
465    }
466
467    fn reset(&mut self) {
468        self.input.reset();
469    }
470
471    fn name(&self) -> &'static str {
472        "DeleteNode"
473    }
474}
475
476/// Operator that deletes edges.
477pub struct DeleteEdgeOperator {
478    /// The graph store to modify.
479    store: Arc<LpgStore>,
480    /// Input operator.
481    input: Box<dyn Operator>,
482    /// Column index for the edge to delete.
483    edge_column: usize,
484    /// Output schema.
485    output_schema: Vec<LogicalType>,
486    /// Epoch for MVCC versioning.
487    viewing_epoch: Option<EpochId>,
488    /// Transaction ID for MVCC versioning (reserved for future use).
489    #[allow(dead_code)]
490    tx_id: Option<TxId>,
491}
492
493impl DeleteEdgeOperator {
494    /// Creates a new edge deletion operator.
495    pub fn new(
496        store: Arc<LpgStore>,
497        input: Box<dyn Operator>,
498        edge_column: usize,
499        output_schema: Vec<LogicalType>,
500    ) -> Self {
501        Self {
502            store,
503            input,
504            edge_column,
505            output_schema,
506            viewing_epoch: None,
507            tx_id: None,
508        }
509    }
510
511    /// Sets the transaction context for MVCC versioning.
512    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
513        self.viewing_epoch = Some(epoch);
514        self.tx_id = tx_id;
515        self
516    }
517}
518
519impl Operator for DeleteEdgeOperator {
520    fn next(&mut self) -> OperatorResult {
521        // Get transaction context for versioned deletion
522        let epoch = self
523            .viewing_epoch
524            .unwrap_or_else(|| self.store.current_epoch());
525
526        if let Some(chunk) = self.input.next()? {
527            let mut deleted_count = 0;
528
529            for row in chunk.selected_indices() {
530                let edge_val = chunk
531                    .column(self.edge_column)
532                    .and_then(|c| c.get_value(row))
533                    .ok_or_else(|| {
534                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
535                    })?;
536
537                let edge_id = match edge_val {
538                    Value::Int64(id) => EdgeId(id as u64),
539                    _ => {
540                        return Err(OperatorError::TypeMismatch {
541                            expected: "Int64 (edge ID)".to_string(),
542                            found: format!("{edge_val:?}"),
543                        });
544                    }
545                };
546
547                // Delete the edge with MVCC versioning
548                if self.store.delete_edge_at_epoch(edge_id, epoch) {
549                    deleted_count += 1;
550                }
551            }
552
553            // Return a chunk with the delete count
554            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
555            if let Some(dst) = builder.column_mut(0) {
556                dst.push_value(Value::Int64(deleted_count));
557            }
558            builder.advance_row();
559
560            return Ok(Some(builder.finish()));
561        }
562        Ok(None)
563    }
564
565    fn reset(&mut self) {
566        self.input.reset();
567    }
568
569    fn name(&self) -> &'static str {
570        "DeleteEdge"
571    }
572}
573
574/// Operator that adds labels to nodes.
575pub struct AddLabelOperator {
576    /// The graph store.
577    store: Arc<LpgStore>,
578    /// Child operator providing nodes.
579    input: Box<dyn Operator>,
580    /// Column index containing node IDs.
581    node_column: usize,
582    /// Labels to add.
583    labels: Vec<String>,
584    /// Output schema.
585    output_schema: Vec<LogicalType>,
586}
587
588impl AddLabelOperator {
589    /// Creates a new add label operator.
590    pub fn new(
591        store: Arc<LpgStore>,
592        input: Box<dyn Operator>,
593        node_column: usize,
594        labels: Vec<String>,
595        output_schema: Vec<LogicalType>,
596    ) -> Self {
597        Self {
598            store,
599            input,
600            node_column,
601            labels,
602            output_schema,
603        }
604    }
605}
606
607impl Operator for AddLabelOperator {
608    fn next(&mut self) -> OperatorResult {
609        if let Some(chunk) = self.input.next()? {
610            let mut updated_count = 0;
611
612            for row in chunk.selected_indices() {
613                let node_val = chunk
614                    .column(self.node_column)
615                    .and_then(|c| c.get_value(row))
616                    .ok_or_else(|| {
617                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
618                    })?;
619
620                let node_id = match node_val {
621                    Value::Int64(id) => NodeId(id as u64),
622                    _ => {
623                        return Err(OperatorError::TypeMismatch {
624                            expected: "Int64 (node ID)".to_string(),
625                            found: format!("{node_val:?}"),
626                        });
627                    }
628                };
629
630                // Add all labels
631                for label in &self.labels {
632                    if self.store.add_label(node_id, label) {
633                        updated_count += 1;
634                    }
635                }
636            }
637
638            // Return a chunk with the update count
639            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
640            if let Some(dst) = builder.column_mut(0) {
641                dst.push_value(Value::Int64(updated_count));
642            }
643            builder.advance_row();
644
645            return Ok(Some(builder.finish()));
646        }
647        Ok(None)
648    }
649
650    fn reset(&mut self) {
651        self.input.reset();
652    }
653
654    fn name(&self) -> &'static str {
655        "AddLabel"
656    }
657}
658
659/// Operator that removes labels from nodes.
660pub struct RemoveLabelOperator {
661    /// The graph store.
662    store: Arc<LpgStore>,
663    /// Child operator providing nodes.
664    input: Box<dyn Operator>,
665    /// Column index containing node IDs.
666    node_column: usize,
667    /// Labels to remove.
668    labels: Vec<String>,
669    /// Output schema.
670    output_schema: Vec<LogicalType>,
671}
672
673impl RemoveLabelOperator {
674    /// Creates a new remove label operator.
675    pub fn new(
676        store: Arc<LpgStore>,
677        input: Box<dyn Operator>,
678        node_column: usize,
679        labels: Vec<String>,
680        output_schema: Vec<LogicalType>,
681    ) -> Self {
682        Self {
683            store,
684            input,
685            node_column,
686            labels,
687            output_schema,
688        }
689    }
690}
691
692impl Operator for RemoveLabelOperator {
693    fn next(&mut self) -> OperatorResult {
694        if let Some(chunk) = self.input.next()? {
695            let mut updated_count = 0;
696
697            for row in chunk.selected_indices() {
698                let node_val = chunk
699                    .column(self.node_column)
700                    .and_then(|c| c.get_value(row))
701                    .ok_or_else(|| {
702                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
703                    })?;
704
705                let node_id = match node_val {
706                    Value::Int64(id) => NodeId(id as u64),
707                    _ => {
708                        return Err(OperatorError::TypeMismatch {
709                            expected: "Int64 (node ID)".to_string(),
710                            found: format!("{node_val:?}"),
711                        });
712                    }
713                };
714
715                // Remove all labels
716                for label in &self.labels {
717                    if self.store.remove_label(node_id, label) {
718                        updated_count += 1;
719                    }
720                }
721            }
722
723            // Return a chunk with the update count
724            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725            if let Some(dst) = builder.column_mut(0) {
726                dst.push_value(Value::Int64(updated_count));
727            }
728            builder.advance_row();
729
730            return Ok(Some(builder.finish()));
731        }
732        Ok(None)
733    }
734
735    fn reset(&mut self) {
736        self.input.reset();
737    }
738
739    fn name(&self) -> &'static str {
740        "RemoveLabel"
741    }
742}
743
744/// Operator that sets properties on nodes or edges.
745///
746/// This operator reads node/edge IDs from a column and sets the
747/// specified properties on each entity.
748pub struct SetPropertyOperator {
749    /// The graph store.
750    store: Arc<LpgStore>,
751    /// Child operator providing entities.
752    input: Box<dyn Operator>,
753    /// Column index containing entity IDs (node or edge).
754    entity_column: usize,
755    /// Whether the entity is an edge (false = node).
756    is_edge: bool,
757    /// Properties to set (name -> source).
758    properties: Vec<(String, PropertySource)>,
759    /// Output schema.
760    output_schema: Vec<LogicalType>,
761}
762
763impl SetPropertyOperator {
764    /// Creates a new set property operator for nodes.
765    pub fn new_for_node(
766        store: Arc<LpgStore>,
767        input: Box<dyn Operator>,
768        node_column: usize,
769        properties: Vec<(String, PropertySource)>,
770        output_schema: Vec<LogicalType>,
771    ) -> Self {
772        Self {
773            store,
774            input,
775            entity_column: node_column,
776            is_edge: false,
777            properties,
778            output_schema,
779        }
780    }
781
782    /// Creates a new set property operator for edges.
783    pub fn new_for_edge(
784        store: Arc<LpgStore>,
785        input: Box<dyn Operator>,
786        edge_column: usize,
787        properties: Vec<(String, PropertySource)>,
788        output_schema: Vec<LogicalType>,
789    ) -> Self {
790        Self {
791            store,
792            input,
793            entity_column: edge_column,
794            is_edge: true,
795            properties,
796            output_schema,
797        }
798    }
799}
800
801impl Operator for SetPropertyOperator {
802    fn next(&mut self) -> OperatorResult {
803        if let Some(chunk) = self.input.next()? {
804            let mut builder =
805                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
806
807            for row in chunk.selected_indices() {
808                let entity_val = chunk
809                    .column(self.entity_column)
810                    .and_then(|c| c.get_value(row))
811                    .ok_or_else(|| {
812                        OperatorError::ColumnNotFound(format!(
813                            "entity column {}",
814                            self.entity_column
815                        ))
816                    })?;
817
818                let entity_id = match entity_val {
819                    Value::Int64(id) => id as u64,
820                    _ => {
821                        return Err(OperatorError::TypeMismatch {
822                            expected: "Int64 (entity ID)".to_string(),
823                            found: format!("{entity_val:?}"),
824                        });
825                    }
826                };
827
828                // Set all properties
829                for (prop_name, source) in &self.properties {
830                    let value = match source {
831                        PropertySource::Column(col_idx) => chunk
832                            .column(*col_idx)
833                            .and_then(|c| c.get_value(row))
834                            .unwrap_or(Value::Null),
835                        PropertySource::Constant(v) => v.clone(),
836                    };
837
838                    if self.is_edge {
839                        self.store
840                            .set_edge_property(EdgeId(entity_id), prop_name, value);
841                    } else {
842                        self.store
843                            .set_node_property(NodeId(entity_id), prop_name, value);
844                    }
845                }
846
847                // Copy input columns to output
848                for col_idx in 0..chunk.column_count() {
849                    if let (Some(src), Some(dst)) =
850                        (chunk.column(col_idx), builder.column_mut(col_idx))
851                    {
852                        if let Some(val) = src.get_value(row) {
853                            dst.push_value(val);
854                        } else {
855                            dst.push_value(Value::Null);
856                        }
857                    }
858                }
859
860                builder.advance_row();
861            }
862
863            return Ok(Some(builder.finish()));
864        }
865        Ok(None)
866    }
867
868    fn reset(&mut self) {
869        self.input.reset();
870    }
871
872    fn name(&self) -> &'static str {
873        "SetProperty"
874    }
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    use crate::execution::DataChunk;
881    use crate::execution::chunk::DataChunkBuilder;
882
883    fn create_test_store() -> Arc<LpgStore> {
884        Arc::new(LpgStore::new())
885    }
886
887    #[test]
888    fn test_create_node_standalone() {
889        let store = create_test_store();
890
891        let mut op = CreateNodeOperator::new(
892            Arc::clone(&store),
893            None,
894            vec!["Person".to_string()],
895            vec![(
896                "name".to_string(),
897                PropertySource::Constant(Value::String("Alice".into())),
898            )],
899            vec![LogicalType::Int64],
900            0,
901        );
902
903        // First call should create a node
904        let chunk = op.next().unwrap().unwrap();
905        assert_eq!(chunk.row_count(), 1);
906
907        // Second call should return None
908        assert!(op.next().unwrap().is_none());
909
910        // Verify node was created
911        assert_eq!(store.node_count(), 1);
912    }
913
914    #[test]
915    fn test_create_edge() {
916        let store = create_test_store();
917
918        // Create two nodes first
919        let node1 = store.create_node(&["Person"]);
920        let node2 = store.create_node(&["Person"]);
921
922        // Create input chunk with node IDs
923        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
924        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
925        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
926        builder.advance_row();
927        let input_chunk = builder.finish();
928
929        // Mock input operator
930        struct MockInput {
931            chunk: Option<DataChunk>,
932        }
933        impl Operator for MockInput {
934            fn next(&mut self) -> OperatorResult {
935                Ok(self.chunk.take())
936            }
937            fn reset(&mut self) {}
938            fn name(&self) -> &'static str {
939                "MockInput"
940            }
941        }
942
943        let mut op = CreateEdgeOperator::new(
944            Arc::clone(&store),
945            Box::new(MockInput {
946                chunk: Some(input_chunk),
947            }),
948            0, // from column
949            1, // to column
950            "KNOWS".to_string(),
951            vec![LogicalType::Int64, LogicalType::Int64],
952        );
953
954        // Execute
955        let _chunk = op.next().unwrap().unwrap();
956
957        // Verify edge was created
958        assert_eq!(store.edge_count(), 1);
959    }
960
961    #[test]
962    fn test_delete_node() {
963        let store = create_test_store();
964
965        // Create a node
966        let node_id = store.create_node(&["Person"]);
967        assert_eq!(store.node_count(), 1);
968
969        // Create input chunk with the node ID
970        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
971        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
972        builder.advance_row();
973        let input_chunk = builder.finish();
974
975        struct MockInput {
976            chunk: Option<DataChunk>,
977        }
978        impl Operator for MockInput {
979            fn next(&mut self) -> OperatorResult {
980                Ok(self.chunk.take())
981            }
982            fn reset(&mut self) {}
983            fn name(&self) -> &'static str {
984                "MockInput"
985            }
986        }
987
988        let mut op = DeleteNodeOperator::new(
989            Arc::clone(&store),
990            Box::new(MockInput {
991                chunk: Some(input_chunk),
992            }),
993            0,
994            vec![LogicalType::Int64],
995            false,
996        );
997
998        // Execute
999        let chunk = op.next().unwrap().unwrap();
1000
1001        // Verify deletion
1002        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1003        assert_eq!(deleted, 1);
1004        assert_eq!(store.node_count(), 0);
1005    }
1006
1007    // ── Helper: reusable MockInput ───────────────────────────────
1008
1009    struct MockInput {
1010        chunk: Option<DataChunk>,
1011    }
1012
1013    impl MockInput {
1014        fn boxed(chunk: DataChunk) -> Box<Self> {
1015            Box::new(Self { chunk: Some(chunk) })
1016        }
1017    }
1018
1019    impl Operator for MockInput {
1020        fn next(&mut self) -> OperatorResult {
1021            Ok(self.chunk.take())
1022        }
1023        fn reset(&mut self) {}
1024        fn name(&self) -> &'static str {
1025            "MockInput"
1026        }
1027    }
1028
1029    // ── DeleteEdgeOperator ───────────────────────────────────────
1030
1031    #[test]
1032    fn test_delete_edge() {
1033        let store = create_test_store();
1034
1035        let n1 = store.create_node(&["Person"]);
1036        let n2 = store.create_node(&["Person"]);
1037        let eid = store.create_edge(n1, n2, "KNOWS");
1038        assert_eq!(store.edge_count(), 1);
1039
1040        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1041        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1042        builder.advance_row();
1043
1044        let mut op = DeleteEdgeOperator::new(
1045            Arc::clone(&store),
1046            MockInput::boxed(builder.finish()),
1047            0,
1048            vec![LogicalType::Int64],
1049        );
1050
1051        let chunk = op.next().unwrap().unwrap();
1052        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1053        assert_eq!(deleted, 1);
1054        assert_eq!(store.edge_count(), 0);
1055    }
1056
1057    #[test]
1058    fn test_delete_edge_no_input_returns_none() {
1059        let store = create_test_store();
1060
1061        // Empty chunk: MockInput returns None immediately
1062        struct EmptyInput;
1063        impl Operator for EmptyInput {
1064            fn next(&mut self) -> OperatorResult {
1065                Ok(None)
1066            }
1067            fn reset(&mut self) {}
1068            fn name(&self) -> &'static str {
1069                "EmptyInput"
1070            }
1071        }
1072
1073        let mut op = DeleteEdgeOperator::new(
1074            Arc::clone(&store),
1075            Box::new(EmptyInput),
1076            0,
1077            vec![LogicalType::Int64],
1078        );
1079
1080        assert!(op.next().unwrap().is_none());
1081    }
1082
1083    #[test]
1084    fn test_delete_multiple_edges() {
1085        let store = create_test_store();
1086
1087        let n1 = store.create_node(&["N"]);
1088        let n2 = store.create_node(&["N"]);
1089        let e1 = store.create_edge(n1, n2, "R");
1090        let e2 = store.create_edge(n2, n1, "S");
1091        assert_eq!(store.edge_count(), 2);
1092
1093        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1094        builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1095        builder.advance_row();
1096        builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1097        builder.advance_row();
1098
1099        let mut op = DeleteEdgeOperator::new(
1100            Arc::clone(&store),
1101            MockInput::boxed(builder.finish()),
1102            0,
1103            vec![LogicalType::Int64],
1104        );
1105
1106        let chunk = op.next().unwrap().unwrap();
1107        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1108        assert_eq!(deleted, 2);
1109        assert_eq!(store.edge_count(), 0);
1110    }
1111
1112    // ── DeleteNodeOperator with DETACH ───────────────────────────
1113
1114    #[test]
1115    fn test_delete_node_detach() {
1116        let store = create_test_store();
1117
1118        let n1 = store.create_node(&["Person"]);
1119        let n2 = store.create_node(&["Person"]);
1120        store.create_edge(n1, n2, "KNOWS");
1121        store.create_edge(n2, n1, "FOLLOWS");
1122        assert_eq!(store.edge_count(), 2);
1123
1124        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1125        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1126        builder.advance_row();
1127
1128        let mut op = DeleteNodeOperator::new(
1129            Arc::clone(&store),
1130            MockInput::boxed(builder.finish()),
1131            0,
1132            vec![LogicalType::Int64],
1133            true, // detach = true
1134        );
1135
1136        let chunk = op.next().unwrap().unwrap();
1137        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1138        assert_eq!(deleted, 1);
1139        assert_eq!(store.node_count(), 1);
1140        assert_eq!(store.edge_count(), 0); // edges detached
1141    }
1142
1143    // ── AddLabelOperator ─────────────────────────────────────────
1144
1145    #[test]
1146    fn test_add_label() {
1147        let store = create_test_store();
1148
1149        let node = store.create_node(&["Person"]);
1150
1151        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1152        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1153        builder.advance_row();
1154
1155        let mut op = AddLabelOperator::new(
1156            Arc::clone(&store),
1157            MockInput::boxed(builder.finish()),
1158            0,
1159            vec!["Employee".to_string()],
1160            vec![LogicalType::Int64],
1161        );
1162
1163        let chunk = op.next().unwrap().unwrap();
1164        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1165        assert_eq!(updated, 1);
1166
1167        // Verify label was added
1168        let node_data = store.get_node(node).unwrap();
1169        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1170        assert!(labels.contains(&"Person"));
1171        assert!(labels.contains(&"Employee"));
1172    }
1173
1174    #[test]
1175    fn test_add_multiple_labels() {
1176        let store = create_test_store();
1177
1178        let node = store.create_node(&["Base"]);
1179
1180        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1181        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1182        builder.advance_row();
1183
1184        let mut op = AddLabelOperator::new(
1185            Arc::clone(&store),
1186            MockInput::boxed(builder.finish()),
1187            0,
1188            vec!["LabelA".to_string(), "LabelB".to_string()],
1189            vec![LogicalType::Int64],
1190        );
1191
1192        let chunk = op.next().unwrap().unwrap();
1193        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1194        assert_eq!(updated, 2); // 2 labels added
1195
1196        let node_data = store.get_node(node).unwrap();
1197        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1198        assert!(labels.contains(&"LabelA"));
1199        assert!(labels.contains(&"LabelB"));
1200    }
1201
1202    #[test]
1203    fn test_add_label_no_input_returns_none() {
1204        let store = create_test_store();
1205
1206        struct EmptyInput;
1207        impl Operator for EmptyInput {
1208            fn next(&mut self) -> OperatorResult {
1209                Ok(None)
1210            }
1211            fn reset(&mut self) {}
1212            fn name(&self) -> &'static str {
1213                "EmptyInput"
1214            }
1215        }
1216
1217        let mut op = AddLabelOperator::new(
1218            Arc::clone(&store),
1219            Box::new(EmptyInput),
1220            0,
1221            vec!["Foo".to_string()],
1222            vec![LogicalType::Int64],
1223        );
1224
1225        assert!(op.next().unwrap().is_none());
1226    }
1227
1228    // ── RemoveLabelOperator ──────────────────────────────────────
1229
1230    #[test]
1231    fn test_remove_label() {
1232        let store = create_test_store();
1233
1234        let node = store.create_node(&["Person", "Employee"]);
1235
1236        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1237        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1238        builder.advance_row();
1239
1240        let mut op = RemoveLabelOperator::new(
1241            Arc::clone(&store),
1242            MockInput::boxed(builder.finish()),
1243            0,
1244            vec!["Employee".to_string()],
1245            vec![LogicalType::Int64],
1246        );
1247
1248        let chunk = op.next().unwrap().unwrap();
1249        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1250        assert_eq!(updated, 1);
1251
1252        // Verify label was removed
1253        let node_data = store.get_node(node).unwrap();
1254        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1255        assert!(labels.contains(&"Person"));
1256        assert!(!labels.contains(&"Employee"));
1257    }
1258
1259    #[test]
1260    fn test_remove_nonexistent_label() {
1261        let store = create_test_store();
1262
1263        let node = store.create_node(&["Person"]);
1264
1265        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1266        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1267        builder.advance_row();
1268
1269        let mut op = RemoveLabelOperator::new(
1270            Arc::clone(&store),
1271            MockInput::boxed(builder.finish()),
1272            0,
1273            vec!["NonExistent".to_string()],
1274            vec![LogicalType::Int64],
1275        );
1276
1277        let chunk = op.next().unwrap().unwrap();
1278        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1279        assert_eq!(updated, 0); // nothing removed
1280    }
1281
1282    // ── SetPropertyOperator ──────────────────────────────────────
1283
1284    #[test]
1285    fn test_set_node_property_constant() {
1286        let store = create_test_store();
1287
1288        let node = store.create_node(&["Person"]);
1289
1290        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1291        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1292        builder.advance_row();
1293
1294        let mut op = SetPropertyOperator::new_for_node(
1295            Arc::clone(&store),
1296            MockInput::boxed(builder.finish()),
1297            0,
1298            vec![(
1299                "name".to_string(),
1300                PropertySource::Constant(Value::String("Alice".into())),
1301            )],
1302            vec![LogicalType::Int64],
1303        );
1304
1305        let chunk = op.next().unwrap().unwrap();
1306        assert_eq!(chunk.row_count(), 1);
1307
1308        // Verify property was set
1309        let node_data = store.get_node(node).unwrap();
1310        assert_eq!(
1311            node_data
1312                .properties
1313                .get(&grafeo_common::types::PropertyKey::new("name")),
1314            Some(&Value::String("Alice".into()))
1315        );
1316    }
1317
1318    #[test]
1319    fn test_set_node_property_from_column() {
1320        let store = create_test_store();
1321
1322        let node = store.create_node(&["Person"]);
1323
1324        // Input: column 0 = node ID, column 1 = property value
1325        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1326        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1327        builder
1328            .column_mut(1)
1329            .unwrap()
1330            .push_value(Value::String("Bob".into()));
1331        builder.advance_row();
1332
1333        let mut op = SetPropertyOperator::new_for_node(
1334            Arc::clone(&store),
1335            MockInput::boxed(builder.finish()),
1336            0,
1337            vec![("name".to_string(), PropertySource::Column(1))],
1338            vec![LogicalType::Int64, LogicalType::String],
1339        );
1340
1341        let chunk = op.next().unwrap().unwrap();
1342        assert_eq!(chunk.row_count(), 1);
1343
1344        let node_data = store.get_node(node).unwrap();
1345        assert_eq!(
1346            node_data
1347                .properties
1348                .get(&grafeo_common::types::PropertyKey::new("name")),
1349            Some(&Value::String("Bob".into()))
1350        );
1351    }
1352
1353    #[test]
1354    fn test_set_edge_property() {
1355        let store = create_test_store();
1356
1357        let n1 = store.create_node(&["N"]);
1358        let n2 = store.create_node(&["N"]);
1359        let eid = store.create_edge(n1, n2, "KNOWS");
1360
1361        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1362        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1363        builder.advance_row();
1364
1365        let mut op = SetPropertyOperator::new_for_edge(
1366            Arc::clone(&store),
1367            MockInput::boxed(builder.finish()),
1368            0,
1369            vec![(
1370                "weight".to_string(),
1371                PropertySource::Constant(Value::Float64(0.75)),
1372            )],
1373            vec![LogicalType::Int64],
1374        );
1375
1376        let chunk = op.next().unwrap().unwrap();
1377        assert_eq!(chunk.row_count(), 1);
1378
1379        let edge_data = store.get_edge(eid).unwrap();
1380        assert_eq!(
1381            edge_data
1382                .properties
1383                .get(&grafeo_common::types::PropertyKey::new("weight")),
1384            Some(&Value::Float64(0.75))
1385        );
1386    }
1387
1388    #[test]
1389    fn test_set_multiple_properties() {
1390        let store = create_test_store();
1391
1392        let node = store.create_node(&["Person"]);
1393
1394        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1395        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1396        builder.advance_row();
1397
1398        let mut op = SetPropertyOperator::new_for_node(
1399            Arc::clone(&store),
1400            MockInput::boxed(builder.finish()),
1401            0,
1402            vec![
1403                (
1404                    "name".to_string(),
1405                    PropertySource::Constant(Value::String("Alice".into())),
1406                ),
1407                (
1408                    "age".to_string(),
1409                    PropertySource::Constant(Value::Int64(30)),
1410                ),
1411            ],
1412            vec![LogicalType::Int64],
1413        );
1414
1415        op.next().unwrap().unwrap();
1416
1417        let node_data = store.get_node(node).unwrap();
1418        assert_eq!(
1419            node_data
1420                .properties
1421                .get(&grafeo_common::types::PropertyKey::new("name")),
1422            Some(&Value::String("Alice".into()))
1423        );
1424        assert_eq!(
1425            node_data
1426                .properties
1427                .get(&grafeo_common::types::PropertyKey::new("age")),
1428            Some(&Value::Int64(30))
1429        );
1430    }
1431
1432    #[test]
1433    fn test_set_property_no_input_returns_none() {
1434        let store = create_test_store();
1435
1436        struct EmptyInput;
1437        impl Operator for EmptyInput {
1438            fn next(&mut self) -> OperatorResult {
1439                Ok(None)
1440            }
1441            fn reset(&mut self) {}
1442            fn name(&self) -> &'static str {
1443                "EmptyInput"
1444            }
1445        }
1446
1447        let mut op = SetPropertyOperator::new_for_node(
1448            Arc::clone(&store),
1449            Box::new(EmptyInput),
1450            0,
1451            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1452            vec![LogicalType::Int64],
1453        );
1454
1455        assert!(op.next().unwrap().is_none());
1456    }
1457
1458    // ── Operator name() ──────────────────────────────────────────
1459
1460    #[test]
1461    fn test_operator_names() {
1462        let store = create_test_store();
1463
1464        struct EmptyInput;
1465        impl Operator for EmptyInput {
1466            fn next(&mut self) -> OperatorResult {
1467                Ok(None)
1468            }
1469            fn reset(&mut self) {}
1470            fn name(&self) -> &'static str {
1471                "EmptyInput"
1472            }
1473        }
1474
1475        let op = DeleteEdgeOperator::new(
1476            Arc::clone(&store),
1477            Box::new(EmptyInput),
1478            0,
1479            vec![LogicalType::Int64],
1480        );
1481        assert_eq!(op.name(), "DeleteEdge");
1482
1483        let op = AddLabelOperator::new(
1484            Arc::clone(&store),
1485            Box::new(EmptyInput),
1486            0,
1487            vec!["L".to_string()],
1488            vec![LogicalType::Int64],
1489        );
1490        assert_eq!(op.name(), "AddLabel");
1491
1492        let op = RemoveLabelOperator::new(
1493            Arc::clone(&store),
1494            Box::new(EmptyInput),
1495            0,
1496            vec!["L".to_string()],
1497            vec![LogicalType::Int64],
1498        );
1499        assert_eq!(op.name(), "RemoveLabel");
1500
1501        let op = SetPropertyOperator::new_for_node(
1502            Arc::clone(&store),
1503            Box::new(EmptyInput),
1504            0,
1505            vec![],
1506            vec![LogicalType::Int64],
1507        );
1508        assert_eq!(op.name(), "SetProperty");
1509    }
1510}