Skip to main content

grafeo_core/execution/operators/
mutation.rs

1//! Mutation operators for creating and deleting graph elements.
2//!
3//! These operators modify the graph structure:
4//! - `CreateNodeOperator`: Creates new nodes
5//! - `CreateEdgeOperator`: Creates new edges
6//! - `DeleteNodeOperator`: Deletes nodes
7//! - `DeleteEdgeOperator`: Deletes edges
8
9use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17/// Operator that creates new nodes.
18///
19/// For each input row, creates a new node with the specified labels
20/// and properties, then outputs the row with the new node.
21pub struct CreateNodeOperator {
22    /// The graph store to modify.
23    store: Arc<LpgStore>,
24    /// Input operator.
25    input: Option<Box<dyn Operator>>,
26    /// Labels for the new nodes.
27    labels: Vec<String>,
28    /// Properties to set (name -> column index or constant value).
29    properties: Vec<(String, PropertySource)>,
30    /// Output schema.
31    output_schema: Vec<LogicalType>,
32    /// Column index for the created node variable.
33    output_column: usize,
34    /// Whether this operator has been executed (for no-input case).
35    executed: bool,
36    /// Epoch for MVCC versioning.
37    viewing_epoch: Option<EpochId>,
38    /// Transaction ID for MVCC versioning.
39    tx_id: Option<TxId>,
40}
41
42/// Source for a property value.
43#[derive(Debug, Clone)]
44pub enum PropertySource {
45    /// Get value from an input column.
46    Column(usize),
47    /// Use a constant value.
48    Constant(Value),
49    /// Access a named property from a map/node/edge in an input column.
50    PropertyAccess {
51        /// The column containing the map, node ID, or edge ID.
52        column: usize,
53        /// The property name to extract.
54        property: String,
55    },
56}
57
58impl PropertySource {
59    /// Resolves a property value from a data chunk row.
60    pub fn resolve(
61        &self,
62        chunk: &crate::execution::chunk::DataChunk,
63        row: usize,
64        store: &LpgStore,
65    ) -> Value {
66        match self {
67            PropertySource::Column(col_idx) => chunk
68                .column(*col_idx)
69                .and_then(|c| c.get_value(row))
70                .unwrap_or(Value::Null),
71            PropertySource::Constant(v) => v.clone(),
72            PropertySource::PropertyAccess { column, property } => {
73                let Some(col) = chunk.column(*column) else {
74                    return Value::Null;
75                };
76                // Try node ID first, then edge ID, then map value
77                if let Some(node_id) = col.get_node_id(row) {
78                    store
79                        .get_node(node_id)
80                        .and_then(|node| node.get_property(property).cloned())
81                        .unwrap_or(Value::Null)
82                } else if let Some(edge_id) = col.get_edge_id(row) {
83                    store
84                        .get_edge(edge_id)
85                        .and_then(|edge| edge.get_property(property).cloned())
86                        .unwrap_or(Value::Null)
87                } else if let Some(Value::Map(map)) = col.get_value(row) {
88                    let key = PropertyKey::new(property);
89                    map.get(&key).cloned().unwrap_or(Value::Null)
90                } else {
91                    Value::Null
92                }
93            }
94        }
95    }
96}
97
98impl CreateNodeOperator {
99    /// Creates a new node creation operator.
100    ///
101    /// # Arguments
102    /// * `store` - The graph store to modify.
103    /// * `input` - Optional input operator (None for standalone CREATE).
104    /// * `labels` - Labels to assign to created nodes.
105    /// * `properties` - Properties to set on created nodes.
106    /// * `output_schema` - Schema of the output.
107    /// * `output_column` - Column index where the created node ID goes.
108    pub fn new(
109        store: Arc<LpgStore>,
110        input: Option<Box<dyn Operator>>,
111        labels: Vec<String>,
112        properties: Vec<(String, PropertySource)>,
113        output_schema: Vec<LogicalType>,
114        output_column: usize,
115    ) -> Self {
116        Self {
117            store,
118            input,
119            labels,
120            properties,
121            output_schema,
122            output_column,
123            executed: false,
124            viewing_epoch: None,
125            tx_id: None,
126        }
127    }
128
129    /// Sets the transaction context for MVCC versioning.
130    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
131        self.viewing_epoch = Some(epoch);
132        self.tx_id = tx_id;
133        self
134    }
135}
136
137impl Operator for CreateNodeOperator {
138    fn next(&mut self) -> OperatorResult {
139        // Get transaction context for versioned creation
140        let epoch = self
141            .viewing_epoch
142            .unwrap_or_else(|| self.store.current_epoch());
143        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
144
145        if let Some(ref mut input) = self.input {
146            // For each input row, create a node
147            if let Some(chunk) = input.next()? {
148                let mut builder =
149                    DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
150
151                for row in chunk.selected_indices() {
152                    // Create the node with MVCC versioning
153                    let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
154                    let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
155
156                    // Set properties
157                    for (prop_name, source) in &self.properties {
158                        let value = source.resolve(&chunk, row, &self.store);
159                        self.store.set_node_property(node_id, prop_name, value);
160                    }
161
162                    // Copy input columns to output
163                    for col_idx in 0..chunk.column_count() {
164                        if col_idx < self.output_column
165                            && let (Some(src), Some(dst)) =
166                                (chunk.column(col_idx), builder.column_mut(col_idx))
167                        {
168                            if let Some(val) = src.get_value(row) {
169                                dst.push_value(val);
170                            } else {
171                                dst.push_value(Value::Null);
172                            }
173                        }
174                    }
175
176                    // Add the new node ID
177                    if let Some(dst) = builder.column_mut(self.output_column) {
178                        dst.push_value(Value::Int64(node_id.0 as i64));
179                    }
180
181                    builder.advance_row();
182                }
183
184                return Ok(Some(builder.finish()));
185            }
186            Ok(None)
187        } else {
188            // No input - create a single node
189            if self.executed {
190                return Ok(None);
191            }
192            self.executed = true;
193
194            // Create the node with MVCC versioning
195            let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
196            let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
197
198            // Set properties from constants only
199            for (prop_name, source) in &self.properties {
200                if let PropertySource::Constant(value) = source {
201                    self.store
202                        .set_node_property(node_id, prop_name, value.clone());
203                }
204            }
205
206            // Build output chunk with just the node ID
207            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
208            if let Some(dst) = builder.column_mut(self.output_column) {
209                dst.push_value(Value::Int64(node_id.0 as i64));
210            }
211            builder.advance_row();
212
213            Ok(Some(builder.finish()))
214        }
215    }
216
217    fn reset(&mut self) {
218        if let Some(ref mut input) = self.input {
219            input.reset();
220        }
221        self.executed = false;
222    }
223
224    fn name(&self) -> &'static str {
225        "CreateNode"
226    }
227}
228
229/// Operator that creates new edges.
230pub struct CreateEdgeOperator {
231    /// The graph store to modify.
232    store: Arc<LpgStore>,
233    /// Input operator.
234    input: Box<dyn Operator>,
235    /// Column index for the source node.
236    from_column: usize,
237    /// Column index for the target node.
238    to_column: usize,
239    /// Edge type.
240    edge_type: String,
241    /// Properties to set.
242    properties: Vec<(String, PropertySource)>,
243    /// Output schema.
244    output_schema: Vec<LogicalType>,
245    /// Column index for the created edge variable (if any).
246    output_column: Option<usize>,
247    /// Epoch for MVCC versioning.
248    viewing_epoch: Option<EpochId>,
249    /// Transaction ID for MVCC versioning.
250    tx_id: Option<TxId>,
251}
252
253impl CreateEdgeOperator {
254    /// Creates a new edge creation operator.
255    ///
256    /// Use builder methods to set additional options:
257    /// - [`with_properties`](Self::with_properties) - set edge properties
258    /// - [`with_output_column`](Self::with_output_column) - output the created edge ID
259    /// - [`with_tx_context`](Self::with_tx_context) - set transaction context
260    pub fn new(
261        store: Arc<LpgStore>,
262        input: Box<dyn Operator>,
263        from_column: usize,
264        to_column: usize,
265        edge_type: String,
266        output_schema: Vec<LogicalType>,
267    ) -> Self {
268        Self {
269            store,
270            input,
271            from_column,
272            to_column,
273            edge_type,
274            properties: Vec::new(),
275            output_schema,
276            output_column: None,
277            viewing_epoch: None,
278            tx_id: None,
279        }
280    }
281
282    /// Sets the properties to assign to created edges.
283    pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
284        self.properties = properties;
285        self
286    }
287
288    /// Sets the output column for the created edge ID.
289    pub fn with_output_column(mut self, column: usize) -> Self {
290        self.output_column = Some(column);
291        self
292    }
293
294    /// Sets the transaction context for MVCC versioning.
295    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
296        self.viewing_epoch = Some(epoch);
297        self.tx_id = tx_id;
298        self
299    }
300}
301
302impl Operator for CreateEdgeOperator {
303    fn next(&mut self) -> OperatorResult {
304        // Get transaction context for versioned creation
305        let epoch = self
306            .viewing_epoch
307            .unwrap_or_else(|| self.store.current_epoch());
308        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
309
310        if let Some(chunk) = self.input.next()? {
311            let mut builder =
312                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
313
314            for row in chunk.selected_indices() {
315                // Get source and target node IDs
316                let from_id = chunk
317                    .column(self.from_column)
318                    .and_then(|c| c.get_value(row))
319                    .ok_or_else(|| {
320                        OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
321                    })?;
322
323                let to_id = chunk
324                    .column(self.to_column)
325                    .and_then(|c| c.get_value(row))
326                    .ok_or_else(|| {
327                        OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
328                    })?;
329
330                // Extract node IDs
331                let from_node_id = match from_id {
332                    Value::Int64(id) => NodeId(id as u64),
333                    _ => {
334                        return Err(OperatorError::TypeMismatch {
335                            expected: "Int64 (node ID)".to_string(),
336                            found: format!("{from_id:?}"),
337                        });
338                    }
339                };
340
341                let to_node_id = match to_id {
342                    Value::Int64(id) => NodeId(id as u64),
343                    _ => {
344                        return Err(OperatorError::TypeMismatch {
345                            expected: "Int64 (node ID)".to_string(),
346                            found: format!("{to_id:?}"),
347                        });
348                    }
349                };
350
351                // Create the edge with MVCC versioning
352                let edge_id = self.store.create_edge_versioned(
353                    from_node_id,
354                    to_node_id,
355                    &self.edge_type,
356                    epoch,
357                    tx,
358                );
359
360                // Set properties
361                for (prop_name, source) in &self.properties {
362                    let value = source.resolve(&chunk, row, &self.store);
363                    self.store.set_edge_property(edge_id, prop_name, value);
364                }
365
366                // Copy input columns
367                for col_idx in 0..chunk.column_count() {
368                    if let (Some(src), Some(dst)) =
369                        (chunk.column(col_idx), builder.column_mut(col_idx))
370                    {
371                        if let Some(val) = src.get_value(row) {
372                            dst.push_value(val);
373                        } else {
374                            dst.push_value(Value::Null);
375                        }
376                    }
377                }
378
379                // Add edge ID if requested
380                if let Some(out_col) = self.output_column
381                    && let Some(dst) = builder.column_mut(out_col)
382                {
383                    dst.push_value(Value::Int64(edge_id.0 as i64));
384                }
385
386                builder.advance_row();
387            }
388
389            return Ok(Some(builder.finish()));
390        }
391        Ok(None)
392    }
393
394    fn reset(&mut self) {
395        self.input.reset();
396    }
397
398    fn name(&self) -> &'static str {
399        "CreateEdge"
400    }
401}
402
403/// Operator that deletes nodes.
404pub struct DeleteNodeOperator {
405    /// The graph store to modify.
406    store: Arc<LpgStore>,
407    /// Input operator.
408    input: Box<dyn Operator>,
409    /// Column index for the node to delete.
410    node_column: usize,
411    /// Output schema.
412    output_schema: Vec<LogicalType>,
413    /// Whether to detach (delete connected edges) before deleting.
414    detach: bool,
415    /// Epoch for MVCC versioning.
416    viewing_epoch: Option<EpochId>,
417}
418
419impl DeleteNodeOperator {
420    /// Creates a new node deletion operator.
421    pub fn new(
422        store: Arc<LpgStore>,
423        input: Box<dyn Operator>,
424        node_column: usize,
425        output_schema: Vec<LogicalType>,
426        detach: bool,
427    ) -> Self {
428        Self {
429            store,
430            input,
431            node_column,
432            output_schema,
433            detach,
434            viewing_epoch: None,
435        }
436    }
437
438    /// Sets the transaction context for MVCC versioning.
439    pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
440        self.viewing_epoch = Some(epoch);
441        self
442    }
443}
444
445impl Operator for DeleteNodeOperator {
446    fn next(&mut self) -> OperatorResult {
447        // Get transaction context for versioned deletion
448        let epoch = self
449            .viewing_epoch
450            .unwrap_or_else(|| self.store.current_epoch());
451
452        if let Some(chunk) = self.input.next()? {
453            let mut deleted_count = 0;
454
455            for row in chunk.selected_indices() {
456                let node_val = chunk
457                    .column(self.node_column)
458                    .and_then(|c| c.get_value(row))
459                    .ok_or_else(|| {
460                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
461                    })?;
462
463                let node_id = match node_val {
464                    Value::Int64(id) => NodeId(id as u64),
465                    _ => {
466                        return Err(OperatorError::TypeMismatch {
467                            expected: "Int64 (node ID)".to_string(),
468                            found: format!("{node_val:?}"),
469                        });
470                    }
471                };
472
473                if self.detach {
474                    // Delete all connected edges first
475                    // Note: Edge deletion will use epoch internally
476                    self.store.delete_node_edges(node_id);
477                }
478
479                // Delete the node with MVCC versioning
480                if self.store.delete_node_at_epoch(node_id, epoch) {
481                    deleted_count += 1;
482                }
483            }
484
485            // Return a chunk with the delete count
486            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
487            if let Some(dst) = builder.column_mut(0) {
488                dst.push_value(Value::Int64(deleted_count));
489            }
490            builder.advance_row();
491
492            return Ok(Some(builder.finish()));
493        }
494        Ok(None)
495    }
496
497    fn reset(&mut self) {
498        self.input.reset();
499    }
500
501    fn name(&self) -> &'static str {
502        "DeleteNode"
503    }
504}
505
506/// Operator that deletes edges.
507pub struct DeleteEdgeOperator {
508    /// The graph store to modify.
509    store: Arc<LpgStore>,
510    /// Input operator.
511    input: Box<dyn Operator>,
512    /// Column index for the edge to delete.
513    edge_column: usize,
514    /// Output schema.
515    output_schema: Vec<LogicalType>,
516    /// Epoch for MVCC versioning.
517    viewing_epoch: Option<EpochId>,
518}
519
520impl DeleteEdgeOperator {
521    /// Creates a new edge deletion operator.
522    pub fn new(
523        store: Arc<LpgStore>,
524        input: Box<dyn Operator>,
525        edge_column: usize,
526        output_schema: Vec<LogicalType>,
527    ) -> Self {
528        Self {
529            store,
530            input,
531            edge_column,
532            output_schema,
533            viewing_epoch: None,
534        }
535    }
536
537    /// Sets the transaction context for MVCC versioning.
538    pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
539        self.viewing_epoch = Some(epoch);
540        self
541    }
542}
543
544impl Operator for DeleteEdgeOperator {
545    fn next(&mut self) -> OperatorResult {
546        // Get transaction context for versioned deletion
547        let epoch = self
548            .viewing_epoch
549            .unwrap_or_else(|| self.store.current_epoch());
550
551        if let Some(chunk) = self.input.next()? {
552            let mut deleted_count = 0;
553
554            for row in chunk.selected_indices() {
555                let edge_val = chunk
556                    .column(self.edge_column)
557                    .and_then(|c| c.get_value(row))
558                    .ok_or_else(|| {
559                        OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
560                    })?;
561
562                let edge_id = match edge_val {
563                    Value::Int64(id) => EdgeId(id as u64),
564                    _ => {
565                        return Err(OperatorError::TypeMismatch {
566                            expected: "Int64 (edge ID)".to_string(),
567                            found: format!("{edge_val:?}"),
568                        });
569                    }
570                };
571
572                // Delete the edge with MVCC versioning
573                if self.store.delete_edge_at_epoch(edge_id, epoch) {
574                    deleted_count += 1;
575                }
576            }
577
578            // Return a chunk with the delete count
579            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
580            if let Some(dst) = builder.column_mut(0) {
581                dst.push_value(Value::Int64(deleted_count));
582            }
583            builder.advance_row();
584
585            return Ok(Some(builder.finish()));
586        }
587        Ok(None)
588    }
589
590    fn reset(&mut self) {
591        self.input.reset();
592    }
593
594    fn name(&self) -> &'static str {
595        "DeleteEdge"
596    }
597}
598
599/// Operator that adds labels to nodes.
600pub struct AddLabelOperator {
601    /// The graph store.
602    store: Arc<LpgStore>,
603    /// Child operator providing nodes.
604    input: Box<dyn Operator>,
605    /// Column index containing node IDs.
606    node_column: usize,
607    /// Labels to add.
608    labels: Vec<String>,
609    /// Output schema.
610    output_schema: Vec<LogicalType>,
611}
612
613impl AddLabelOperator {
614    /// Creates a new add label operator.
615    pub fn new(
616        store: Arc<LpgStore>,
617        input: Box<dyn Operator>,
618        node_column: usize,
619        labels: Vec<String>,
620        output_schema: Vec<LogicalType>,
621    ) -> Self {
622        Self {
623            store,
624            input,
625            node_column,
626            labels,
627            output_schema,
628        }
629    }
630}
631
632impl Operator for AddLabelOperator {
633    fn next(&mut self) -> OperatorResult {
634        if let Some(chunk) = self.input.next()? {
635            let mut updated_count = 0;
636
637            for row in chunk.selected_indices() {
638                let node_val = chunk
639                    .column(self.node_column)
640                    .and_then(|c| c.get_value(row))
641                    .ok_or_else(|| {
642                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
643                    })?;
644
645                let node_id = match node_val {
646                    Value::Int64(id) => NodeId(id as u64),
647                    _ => {
648                        return Err(OperatorError::TypeMismatch {
649                            expected: "Int64 (node ID)".to_string(),
650                            found: format!("{node_val:?}"),
651                        });
652                    }
653                };
654
655                // Add all labels
656                for label in &self.labels {
657                    if self.store.add_label(node_id, label) {
658                        updated_count += 1;
659                    }
660                }
661            }
662
663            // Return a chunk with the update count
664            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
665            if let Some(dst) = builder.column_mut(0) {
666                dst.push_value(Value::Int64(updated_count));
667            }
668            builder.advance_row();
669
670            return Ok(Some(builder.finish()));
671        }
672        Ok(None)
673    }
674
675    fn reset(&mut self) {
676        self.input.reset();
677    }
678
679    fn name(&self) -> &'static str {
680        "AddLabel"
681    }
682}
683
684/// Operator that removes labels from nodes.
685pub struct RemoveLabelOperator {
686    /// The graph store.
687    store: Arc<LpgStore>,
688    /// Child operator providing nodes.
689    input: Box<dyn Operator>,
690    /// Column index containing node IDs.
691    node_column: usize,
692    /// Labels to remove.
693    labels: Vec<String>,
694    /// Output schema.
695    output_schema: Vec<LogicalType>,
696}
697
698impl RemoveLabelOperator {
699    /// Creates a new remove label operator.
700    pub fn new(
701        store: Arc<LpgStore>,
702        input: Box<dyn Operator>,
703        node_column: usize,
704        labels: Vec<String>,
705        output_schema: Vec<LogicalType>,
706    ) -> Self {
707        Self {
708            store,
709            input,
710            node_column,
711            labels,
712            output_schema,
713        }
714    }
715}
716
717impl Operator for RemoveLabelOperator {
718    fn next(&mut self) -> OperatorResult {
719        if let Some(chunk) = self.input.next()? {
720            let mut updated_count = 0;
721
722            for row in chunk.selected_indices() {
723                let node_val = chunk
724                    .column(self.node_column)
725                    .and_then(|c| c.get_value(row))
726                    .ok_or_else(|| {
727                        OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
728                    })?;
729
730                let node_id = match node_val {
731                    Value::Int64(id) => NodeId(id as u64),
732                    _ => {
733                        return Err(OperatorError::TypeMismatch {
734                            expected: "Int64 (node ID)".to_string(),
735                            found: format!("{node_val:?}"),
736                        });
737                    }
738                };
739
740                // Remove all labels
741                for label in &self.labels {
742                    if self.store.remove_label(node_id, label) {
743                        updated_count += 1;
744                    }
745                }
746            }
747
748            // Return a chunk with the update count
749            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
750            if let Some(dst) = builder.column_mut(0) {
751                dst.push_value(Value::Int64(updated_count));
752            }
753            builder.advance_row();
754
755            return Ok(Some(builder.finish()));
756        }
757        Ok(None)
758    }
759
760    fn reset(&mut self) {
761        self.input.reset();
762    }
763
764    fn name(&self) -> &'static str {
765        "RemoveLabel"
766    }
767}
768
769/// Operator that sets properties on nodes or edges.
770///
771/// This operator reads node/edge IDs from a column and sets the
772/// specified properties on each entity.
773pub struct SetPropertyOperator {
774    /// The graph store.
775    store: Arc<LpgStore>,
776    /// Child operator providing entities.
777    input: Box<dyn Operator>,
778    /// Column index containing entity IDs (node or edge).
779    entity_column: usize,
780    /// Whether the entity is an edge (false = node).
781    is_edge: bool,
782    /// Properties to set (name -> source).
783    properties: Vec<(String, PropertySource)>,
784    /// Output schema.
785    output_schema: Vec<LogicalType>,
786}
787
788impl SetPropertyOperator {
789    /// Creates a new set property operator for nodes.
790    pub fn new_for_node(
791        store: Arc<LpgStore>,
792        input: Box<dyn Operator>,
793        node_column: usize,
794        properties: Vec<(String, PropertySource)>,
795        output_schema: Vec<LogicalType>,
796    ) -> Self {
797        Self {
798            store,
799            input,
800            entity_column: node_column,
801            is_edge: false,
802            properties,
803            output_schema,
804        }
805    }
806
807    /// Creates a new set property operator for edges.
808    pub fn new_for_edge(
809        store: Arc<LpgStore>,
810        input: Box<dyn Operator>,
811        edge_column: usize,
812        properties: Vec<(String, PropertySource)>,
813        output_schema: Vec<LogicalType>,
814    ) -> Self {
815        Self {
816            store,
817            input,
818            entity_column: edge_column,
819            is_edge: true,
820            properties,
821            output_schema,
822        }
823    }
824}
825
826impl Operator for SetPropertyOperator {
827    fn next(&mut self) -> OperatorResult {
828        if let Some(chunk) = self.input.next()? {
829            let mut builder =
830                DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
831
832            for row in chunk.selected_indices() {
833                let entity_val = chunk
834                    .column(self.entity_column)
835                    .and_then(|c| c.get_value(row))
836                    .ok_or_else(|| {
837                        OperatorError::ColumnNotFound(format!(
838                            "entity column {}",
839                            self.entity_column
840                        ))
841                    })?;
842
843                let entity_id = match entity_val {
844                    Value::Int64(id) => id as u64,
845                    _ => {
846                        return Err(OperatorError::TypeMismatch {
847                            expected: "Int64 (entity ID)".to_string(),
848                            found: format!("{entity_val:?}"),
849                        });
850                    }
851                };
852
853                // Set all properties
854                for (prop_name, source) in &self.properties {
855                    let value = source.resolve(&chunk, row, &self.store);
856
857                    if self.is_edge {
858                        self.store
859                            .set_edge_property(EdgeId(entity_id), prop_name, value);
860                    } else {
861                        self.store
862                            .set_node_property(NodeId(entity_id), prop_name, value);
863                    }
864                }
865
866                // Copy input columns to output
867                for col_idx in 0..chunk.column_count() {
868                    if let (Some(src), Some(dst)) =
869                        (chunk.column(col_idx), builder.column_mut(col_idx))
870                    {
871                        if let Some(val) = src.get_value(row) {
872                            dst.push_value(val);
873                        } else {
874                            dst.push_value(Value::Null);
875                        }
876                    }
877                }
878
879                builder.advance_row();
880            }
881
882            return Ok(Some(builder.finish()));
883        }
884        Ok(None)
885    }
886
887    fn reset(&mut self) {
888        self.input.reset();
889    }
890
891    fn name(&self) -> &'static str {
892        "SetProperty"
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use crate::execution::DataChunk;
900    use crate::execution::chunk::DataChunkBuilder;
901
902    fn create_test_store() -> Arc<LpgStore> {
903        Arc::new(LpgStore::new())
904    }
905
906    #[test]
907    fn test_create_node_standalone() {
908        let store = create_test_store();
909
910        let mut op = CreateNodeOperator::new(
911            Arc::clone(&store),
912            None,
913            vec!["Person".to_string()],
914            vec![(
915                "name".to_string(),
916                PropertySource::Constant(Value::String("Alice".into())),
917            )],
918            vec![LogicalType::Int64],
919            0,
920        );
921
922        // First call should create a node
923        let chunk = op.next().unwrap().unwrap();
924        assert_eq!(chunk.row_count(), 1);
925
926        // Second call should return None
927        assert!(op.next().unwrap().is_none());
928
929        // Verify node was created
930        assert_eq!(store.node_count(), 1);
931    }
932
933    #[test]
934    fn test_create_edge() {
935        let store = create_test_store();
936
937        // Create two nodes first
938        let node1 = store.create_node(&["Person"]);
939        let node2 = store.create_node(&["Person"]);
940
941        // Create input chunk with node IDs
942        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
943        builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
944        builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
945        builder.advance_row();
946        let input_chunk = builder.finish();
947
948        // Mock input operator
949        struct MockInput {
950            chunk: Option<DataChunk>,
951        }
952        impl Operator for MockInput {
953            fn next(&mut self) -> OperatorResult {
954                Ok(self.chunk.take())
955            }
956            fn reset(&mut self) {}
957            fn name(&self) -> &'static str {
958                "MockInput"
959            }
960        }
961
962        let mut op = CreateEdgeOperator::new(
963            Arc::clone(&store),
964            Box::new(MockInput {
965                chunk: Some(input_chunk),
966            }),
967            0, // from column
968            1, // to column
969            "KNOWS".to_string(),
970            vec![LogicalType::Int64, LogicalType::Int64],
971        );
972
973        // Execute
974        let _chunk = op.next().unwrap().unwrap();
975
976        // Verify edge was created
977        assert_eq!(store.edge_count(), 1);
978    }
979
980    #[test]
981    fn test_delete_node() {
982        let store = create_test_store();
983
984        // Create a node
985        let node_id = store.create_node(&["Person"]);
986        assert_eq!(store.node_count(), 1);
987
988        // Create input chunk with the node ID
989        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
990        builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
991        builder.advance_row();
992        let input_chunk = builder.finish();
993
994        struct MockInput {
995            chunk: Option<DataChunk>,
996        }
997        impl Operator for MockInput {
998            fn next(&mut self) -> OperatorResult {
999                Ok(self.chunk.take())
1000            }
1001            fn reset(&mut self) {}
1002            fn name(&self) -> &'static str {
1003                "MockInput"
1004            }
1005        }
1006
1007        let mut op = DeleteNodeOperator::new(
1008            Arc::clone(&store),
1009            Box::new(MockInput {
1010                chunk: Some(input_chunk),
1011            }),
1012            0,
1013            vec![LogicalType::Int64],
1014            false,
1015        );
1016
1017        // Execute
1018        let chunk = op.next().unwrap().unwrap();
1019
1020        // Verify deletion
1021        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1022        assert_eq!(deleted, 1);
1023        assert_eq!(store.node_count(), 0);
1024    }
1025
1026    // ── Helper: reusable MockInput ───────────────────────────────
1027
1028    struct MockInput {
1029        chunk: Option<DataChunk>,
1030    }
1031
1032    impl MockInput {
1033        fn boxed(chunk: DataChunk) -> Box<Self> {
1034            Box::new(Self { chunk: Some(chunk) })
1035        }
1036    }
1037
1038    impl Operator for MockInput {
1039        fn next(&mut self) -> OperatorResult {
1040            Ok(self.chunk.take())
1041        }
1042        fn reset(&mut self) {}
1043        fn name(&self) -> &'static str {
1044            "MockInput"
1045        }
1046    }
1047
1048    // ── DeleteEdgeOperator ───────────────────────────────────────
1049
1050    #[test]
1051    fn test_delete_edge() {
1052        let store = create_test_store();
1053
1054        let n1 = store.create_node(&["Person"]);
1055        let n2 = store.create_node(&["Person"]);
1056        let eid = store.create_edge(n1, n2, "KNOWS");
1057        assert_eq!(store.edge_count(), 1);
1058
1059        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1060        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1061        builder.advance_row();
1062
1063        let mut op = DeleteEdgeOperator::new(
1064            Arc::clone(&store),
1065            MockInput::boxed(builder.finish()),
1066            0,
1067            vec![LogicalType::Int64],
1068        );
1069
1070        let chunk = op.next().unwrap().unwrap();
1071        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1072        assert_eq!(deleted, 1);
1073        assert_eq!(store.edge_count(), 0);
1074    }
1075
1076    #[test]
1077    fn test_delete_edge_no_input_returns_none() {
1078        let store = create_test_store();
1079
1080        // Empty chunk: MockInput returns None immediately
1081        struct EmptyInput;
1082        impl Operator for EmptyInput {
1083            fn next(&mut self) -> OperatorResult {
1084                Ok(None)
1085            }
1086            fn reset(&mut self) {}
1087            fn name(&self) -> &'static str {
1088                "EmptyInput"
1089            }
1090        }
1091
1092        let mut op = DeleteEdgeOperator::new(
1093            Arc::clone(&store),
1094            Box::new(EmptyInput),
1095            0,
1096            vec![LogicalType::Int64],
1097        );
1098
1099        assert!(op.next().unwrap().is_none());
1100    }
1101
1102    #[test]
1103    fn test_delete_multiple_edges() {
1104        let store = create_test_store();
1105
1106        let n1 = store.create_node(&["N"]);
1107        let n2 = store.create_node(&["N"]);
1108        let e1 = store.create_edge(n1, n2, "R");
1109        let e2 = store.create_edge(n2, n1, "S");
1110        assert_eq!(store.edge_count(), 2);
1111
1112        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1113        builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1114        builder.advance_row();
1115        builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1116        builder.advance_row();
1117
1118        let mut op = DeleteEdgeOperator::new(
1119            Arc::clone(&store),
1120            MockInput::boxed(builder.finish()),
1121            0,
1122            vec![LogicalType::Int64],
1123        );
1124
1125        let chunk = op.next().unwrap().unwrap();
1126        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1127        assert_eq!(deleted, 2);
1128        assert_eq!(store.edge_count(), 0);
1129    }
1130
1131    // ── DeleteNodeOperator with DETACH ───────────────────────────
1132
1133    #[test]
1134    fn test_delete_node_detach() {
1135        let store = create_test_store();
1136
1137        let n1 = store.create_node(&["Person"]);
1138        let n2 = store.create_node(&["Person"]);
1139        store.create_edge(n1, n2, "KNOWS");
1140        store.create_edge(n2, n1, "FOLLOWS");
1141        assert_eq!(store.edge_count(), 2);
1142
1143        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1144        builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1145        builder.advance_row();
1146
1147        let mut op = DeleteNodeOperator::new(
1148            Arc::clone(&store),
1149            MockInput::boxed(builder.finish()),
1150            0,
1151            vec![LogicalType::Int64],
1152            true, // detach = true
1153        );
1154
1155        let chunk = op.next().unwrap().unwrap();
1156        let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1157        assert_eq!(deleted, 1);
1158        assert_eq!(store.node_count(), 1);
1159        assert_eq!(store.edge_count(), 0); // edges detached
1160    }
1161
1162    // ── AddLabelOperator ─────────────────────────────────────────
1163
1164    #[test]
1165    fn test_add_label() {
1166        let store = create_test_store();
1167
1168        let node = store.create_node(&["Person"]);
1169
1170        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1171        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1172        builder.advance_row();
1173
1174        let mut op = AddLabelOperator::new(
1175            Arc::clone(&store),
1176            MockInput::boxed(builder.finish()),
1177            0,
1178            vec!["Employee".to_string()],
1179            vec![LogicalType::Int64],
1180        );
1181
1182        let chunk = op.next().unwrap().unwrap();
1183        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1184        assert_eq!(updated, 1);
1185
1186        // Verify label was added
1187        let node_data = store.get_node(node).unwrap();
1188        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1189        assert!(labels.contains(&"Person"));
1190        assert!(labels.contains(&"Employee"));
1191    }
1192
1193    #[test]
1194    fn test_add_multiple_labels() {
1195        let store = create_test_store();
1196
1197        let node = store.create_node(&["Base"]);
1198
1199        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1200        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1201        builder.advance_row();
1202
1203        let mut op = AddLabelOperator::new(
1204            Arc::clone(&store),
1205            MockInput::boxed(builder.finish()),
1206            0,
1207            vec!["LabelA".to_string(), "LabelB".to_string()],
1208            vec![LogicalType::Int64],
1209        );
1210
1211        let chunk = op.next().unwrap().unwrap();
1212        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1213        assert_eq!(updated, 2); // 2 labels added
1214
1215        let node_data = store.get_node(node).unwrap();
1216        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1217        assert!(labels.contains(&"LabelA"));
1218        assert!(labels.contains(&"LabelB"));
1219    }
1220
1221    #[test]
1222    fn test_add_label_no_input_returns_none() {
1223        let store = create_test_store();
1224
1225        struct EmptyInput;
1226        impl Operator for EmptyInput {
1227            fn next(&mut self) -> OperatorResult {
1228                Ok(None)
1229            }
1230            fn reset(&mut self) {}
1231            fn name(&self) -> &'static str {
1232                "EmptyInput"
1233            }
1234        }
1235
1236        let mut op = AddLabelOperator::new(
1237            Arc::clone(&store),
1238            Box::new(EmptyInput),
1239            0,
1240            vec!["Foo".to_string()],
1241            vec![LogicalType::Int64],
1242        );
1243
1244        assert!(op.next().unwrap().is_none());
1245    }
1246
1247    // ── RemoveLabelOperator ──────────────────────────────────────
1248
1249    #[test]
1250    fn test_remove_label() {
1251        let store = create_test_store();
1252
1253        let node = store.create_node(&["Person", "Employee"]);
1254
1255        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1256        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1257        builder.advance_row();
1258
1259        let mut op = RemoveLabelOperator::new(
1260            Arc::clone(&store),
1261            MockInput::boxed(builder.finish()),
1262            0,
1263            vec!["Employee".to_string()],
1264            vec![LogicalType::Int64],
1265        );
1266
1267        let chunk = op.next().unwrap().unwrap();
1268        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1269        assert_eq!(updated, 1);
1270
1271        // Verify label was removed
1272        let node_data = store.get_node(node).unwrap();
1273        let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1274        assert!(labels.contains(&"Person"));
1275        assert!(!labels.contains(&"Employee"));
1276    }
1277
1278    #[test]
1279    fn test_remove_nonexistent_label() {
1280        let store = create_test_store();
1281
1282        let node = store.create_node(&["Person"]);
1283
1284        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1285        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1286        builder.advance_row();
1287
1288        let mut op = RemoveLabelOperator::new(
1289            Arc::clone(&store),
1290            MockInput::boxed(builder.finish()),
1291            0,
1292            vec!["NonExistent".to_string()],
1293            vec![LogicalType::Int64],
1294        );
1295
1296        let chunk = op.next().unwrap().unwrap();
1297        let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1298        assert_eq!(updated, 0); // nothing removed
1299    }
1300
1301    // ── SetPropertyOperator ──────────────────────────────────────
1302
1303    #[test]
1304    fn test_set_node_property_constant() {
1305        let store = create_test_store();
1306
1307        let node = store.create_node(&["Person"]);
1308
1309        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1310        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1311        builder.advance_row();
1312
1313        let mut op = SetPropertyOperator::new_for_node(
1314            Arc::clone(&store),
1315            MockInput::boxed(builder.finish()),
1316            0,
1317            vec![(
1318                "name".to_string(),
1319                PropertySource::Constant(Value::String("Alice".into())),
1320            )],
1321            vec![LogicalType::Int64],
1322        );
1323
1324        let chunk = op.next().unwrap().unwrap();
1325        assert_eq!(chunk.row_count(), 1);
1326
1327        // Verify property was set
1328        let node_data = store.get_node(node).unwrap();
1329        assert_eq!(
1330            node_data
1331                .properties
1332                .get(&grafeo_common::types::PropertyKey::new("name")),
1333            Some(&Value::String("Alice".into()))
1334        );
1335    }
1336
1337    #[test]
1338    fn test_set_node_property_from_column() {
1339        let store = create_test_store();
1340
1341        let node = store.create_node(&["Person"]);
1342
1343        // Input: column 0 = node ID, column 1 = property value
1344        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1345        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1346        builder
1347            .column_mut(1)
1348            .unwrap()
1349            .push_value(Value::String("Bob".into()));
1350        builder.advance_row();
1351
1352        let mut op = SetPropertyOperator::new_for_node(
1353            Arc::clone(&store),
1354            MockInput::boxed(builder.finish()),
1355            0,
1356            vec![("name".to_string(), PropertySource::Column(1))],
1357            vec![LogicalType::Int64, LogicalType::String],
1358        );
1359
1360        let chunk = op.next().unwrap().unwrap();
1361        assert_eq!(chunk.row_count(), 1);
1362
1363        let node_data = store.get_node(node).unwrap();
1364        assert_eq!(
1365            node_data
1366                .properties
1367                .get(&grafeo_common::types::PropertyKey::new("name")),
1368            Some(&Value::String("Bob".into()))
1369        );
1370    }
1371
1372    #[test]
1373    fn test_set_edge_property() {
1374        let store = create_test_store();
1375
1376        let n1 = store.create_node(&["N"]);
1377        let n2 = store.create_node(&["N"]);
1378        let eid = store.create_edge(n1, n2, "KNOWS");
1379
1380        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1381        builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1382        builder.advance_row();
1383
1384        let mut op = SetPropertyOperator::new_for_edge(
1385            Arc::clone(&store),
1386            MockInput::boxed(builder.finish()),
1387            0,
1388            vec![(
1389                "weight".to_string(),
1390                PropertySource::Constant(Value::Float64(0.75)),
1391            )],
1392            vec![LogicalType::Int64],
1393        );
1394
1395        let chunk = op.next().unwrap().unwrap();
1396        assert_eq!(chunk.row_count(), 1);
1397
1398        let edge_data = store.get_edge(eid).unwrap();
1399        assert_eq!(
1400            edge_data
1401                .properties
1402                .get(&grafeo_common::types::PropertyKey::new("weight")),
1403            Some(&Value::Float64(0.75))
1404        );
1405    }
1406
1407    #[test]
1408    fn test_set_multiple_properties() {
1409        let store = create_test_store();
1410
1411        let node = store.create_node(&["Person"]);
1412
1413        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1414        builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1415        builder.advance_row();
1416
1417        let mut op = SetPropertyOperator::new_for_node(
1418            Arc::clone(&store),
1419            MockInput::boxed(builder.finish()),
1420            0,
1421            vec![
1422                (
1423                    "name".to_string(),
1424                    PropertySource::Constant(Value::String("Alice".into())),
1425                ),
1426                (
1427                    "age".to_string(),
1428                    PropertySource::Constant(Value::Int64(30)),
1429                ),
1430            ],
1431            vec![LogicalType::Int64],
1432        );
1433
1434        op.next().unwrap().unwrap();
1435
1436        let node_data = store.get_node(node).unwrap();
1437        assert_eq!(
1438            node_data
1439                .properties
1440                .get(&grafeo_common::types::PropertyKey::new("name")),
1441            Some(&Value::String("Alice".into()))
1442        );
1443        assert_eq!(
1444            node_data
1445                .properties
1446                .get(&grafeo_common::types::PropertyKey::new("age")),
1447            Some(&Value::Int64(30))
1448        );
1449    }
1450
1451    #[test]
1452    fn test_set_property_no_input_returns_none() {
1453        let store = create_test_store();
1454
1455        struct EmptyInput;
1456        impl Operator for EmptyInput {
1457            fn next(&mut self) -> OperatorResult {
1458                Ok(None)
1459            }
1460            fn reset(&mut self) {}
1461            fn name(&self) -> &'static str {
1462                "EmptyInput"
1463            }
1464        }
1465
1466        let mut op = SetPropertyOperator::new_for_node(
1467            Arc::clone(&store),
1468            Box::new(EmptyInput),
1469            0,
1470            vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1471            vec![LogicalType::Int64],
1472        );
1473
1474        assert!(op.next().unwrap().is_none());
1475    }
1476
1477    // ── Operator name() ──────────────────────────────────────────
1478
1479    #[test]
1480    fn test_operator_names() {
1481        let store = create_test_store();
1482
1483        struct EmptyInput;
1484        impl Operator for EmptyInput {
1485            fn next(&mut self) -> OperatorResult {
1486                Ok(None)
1487            }
1488            fn reset(&mut self) {}
1489            fn name(&self) -> &'static str {
1490                "EmptyInput"
1491            }
1492        }
1493
1494        let op = DeleteEdgeOperator::new(
1495            Arc::clone(&store),
1496            Box::new(EmptyInput),
1497            0,
1498            vec![LogicalType::Int64],
1499        );
1500        assert_eq!(op.name(), "DeleteEdge");
1501
1502        let op = AddLabelOperator::new(
1503            Arc::clone(&store),
1504            Box::new(EmptyInput),
1505            0,
1506            vec!["L".to_string()],
1507            vec![LogicalType::Int64],
1508        );
1509        assert_eq!(op.name(), "AddLabel");
1510
1511        let op = RemoveLabelOperator::new(
1512            Arc::clone(&store),
1513            Box::new(EmptyInput),
1514            0,
1515            vec!["L".to_string()],
1516            vec![LogicalType::Int64],
1517        );
1518        assert_eq!(op.name(), "RemoveLabel");
1519
1520        let op = SetPropertyOperator::new_for_node(
1521            Arc::clone(&store),
1522            Box::new(EmptyInput),
1523            0,
1524            vec![],
1525            vec![LogicalType::Int64],
1526        );
1527        assert_eq!(op.name(), "SetProperty");
1528    }
1529}