1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17pub struct CreateNodeOperator {
22 store: Arc<LpgStore>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49}
50
51impl CreateNodeOperator {
52 pub fn new(
62 store: Arc<LpgStore>,
63 input: Option<Box<dyn Operator>>,
64 labels: Vec<String>,
65 properties: Vec<(String, PropertySource)>,
66 output_schema: Vec<LogicalType>,
67 output_column: usize,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 labels,
73 properties,
74 output_schema,
75 output_column,
76 executed: false,
77 viewing_epoch: None,
78 tx_id: None,
79 }
80 }
81
82 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84 self.viewing_epoch = Some(epoch);
85 self.tx_id = tx_id;
86 self
87 }
88}
89
90impl Operator for CreateNodeOperator {
91 fn next(&mut self) -> OperatorResult {
92 let epoch = self
94 .viewing_epoch
95 .unwrap_or_else(|| self.store.current_epoch());
96 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98 if let Some(ref mut input) = self.input {
99 if let Some(chunk) = input.next()? {
101 let mut builder =
102 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104 for row in chunk.selected_indices() {
105 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109 for (prop_name, source) in &self.properties {
111 let value = match source {
112 PropertySource::Column(col_idx) => chunk
113 .column(*col_idx)
114 .and_then(|c| c.get_value(row))
115 .unwrap_or(Value::Null),
116 PropertySource::Constant(v) => v.clone(),
117 };
118 self.store.set_node_property(node_id, prop_name, value);
119 }
120
121 for col_idx in 0..chunk.column_count() {
123 if col_idx < self.output_column {
124 if let (Some(src), Some(dst)) =
125 (chunk.column(col_idx), builder.column_mut(col_idx))
126 {
127 if let Some(val) = src.get_value(row) {
128 dst.push_value(val);
129 } else {
130 dst.push_value(Value::Null);
131 }
132 }
133 }
134 }
135
136 if let Some(dst) = builder.column_mut(self.output_column) {
138 dst.push_value(Value::Int64(node_id.0 as i64));
139 }
140
141 builder.advance_row();
142 }
143
144 return Ok(Some(builder.finish()));
145 }
146 Ok(None)
147 } else {
148 if self.executed {
150 return Ok(None);
151 }
152 self.executed = true;
153
154 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
156 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
157
158 for (prop_name, source) in &self.properties {
160 if let PropertySource::Constant(value) = source {
161 self.store
162 .set_node_property(node_id, prop_name, value.clone());
163 }
164 }
165
166 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
168 if let Some(dst) = builder.column_mut(self.output_column) {
169 dst.push_value(Value::Int64(node_id.0 as i64));
170 }
171 builder.advance_row();
172
173 Ok(Some(builder.finish()))
174 }
175 }
176
177 fn reset(&mut self) {
178 if let Some(ref mut input) = self.input {
179 input.reset();
180 }
181 self.executed = false;
182 }
183
184 fn name(&self) -> &'static str {
185 "CreateNode"
186 }
187}
188
189pub struct CreateEdgeOperator {
191 store: Arc<LpgStore>,
193 input: Box<dyn Operator>,
195 from_column: usize,
197 to_column: usize,
199 edge_type: String,
201 properties: Vec<(String, PropertySource)>,
203 output_schema: Vec<LogicalType>,
205 output_column: Option<usize>,
207 viewing_epoch: Option<EpochId>,
209 tx_id: Option<TxId>,
211}
212
213impl CreateEdgeOperator {
214 #[allow(clippy::too_many_arguments)]
216 pub fn new(
217 store: Arc<LpgStore>,
218 input: Box<dyn Operator>,
219 from_column: usize,
220 to_column: usize,
221 edge_type: String,
222 properties: Vec<(String, PropertySource)>,
223 output_schema: Vec<LogicalType>,
224 output_column: Option<usize>,
225 ) -> Self {
226 Self {
227 store,
228 input,
229 from_column,
230 to_column,
231 edge_type,
232 properties,
233 output_schema,
234 output_column,
235 viewing_epoch: None,
236 tx_id: None,
237 }
238 }
239
240 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
242 self.viewing_epoch = Some(epoch);
243 self.tx_id = tx_id;
244 self
245 }
246}
247
248impl Operator for CreateEdgeOperator {
249 fn next(&mut self) -> OperatorResult {
250 let epoch = self
252 .viewing_epoch
253 .unwrap_or_else(|| self.store.current_epoch());
254 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
255
256 if let Some(chunk) = self.input.next()? {
257 let mut builder =
258 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
259
260 for row in chunk.selected_indices() {
261 let from_id = chunk
263 .column(self.from_column)
264 .and_then(|c| c.get_value(row))
265 .ok_or_else(|| {
266 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
267 })?;
268
269 let to_id = chunk
270 .column(self.to_column)
271 .and_then(|c| c.get_value(row))
272 .ok_or_else(|| {
273 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
274 })?;
275
276 let from_node_id = match from_id {
278 Value::Int64(id) => NodeId(id as u64),
279 _ => {
280 return Err(OperatorError::TypeMismatch {
281 expected: "Int64 (node ID)".to_string(),
282 found: format!("{from_id:?}"),
283 });
284 }
285 };
286
287 let to_node_id = match to_id {
288 Value::Int64(id) => NodeId(id as u64),
289 _ => {
290 return Err(OperatorError::TypeMismatch {
291 expected: "Int64 (node ID)".to_string(),
292 found: format!("{to_id:?}"),
293 });
294 }
295 };
296
297 let edge_id = self.store.create_edge_versioned(
299 from_node_id,
300 to_node_id,
301 &self.edge_type,
302 epoch,
303 tx,
304 );
305
306 for (prop_name, source) in &self.properties {
308 let value = match source {
309 PropertySource::Column(col_idx) => chunk
310 .column(*col_idx)
311 .and_then(|c| c.get_value(row))
312 .unwrap_or(Value::Null),
313 PropertySource::Constant(v) => v.clone(),
314 };
315 self.store.set_edge_property(edge_id, prop_name, value);
316 }
317
318 for col_idx in 0..chunk.column_count() {
320 if let (Some(src), Some(dst)) =
321 (chunk.column(col_idx), builder.column_mut(col_idx))
322 {
323 if let Some(val) = src.get_value(row) {
324 dst.push_value(val);
325 } else {
326 dst.push_value(Value::Null);
327 }
328 }
329 }
330
331 if let Some(out_col) = self.output_column {
333 if let Some(dst) = builder.column_mut(out_col) {
334 dst.push_value(Value::Int64(edge_id.0 as i64));
335 }
336 }
337
338 builder.advance_row();
339 }
340
341 return Ok(Some(builder.finish()));
342 }
343 Ok(None)
344 }
345
346 fn reset(&mut self) {
347 self.input.reset();
348 }
349
350 fn name(&self) -> &'static str {
351 "CreateEdge"
352 }
353}
354
355pub struct DeleteNodeOperator {
357 store: Arc<LpgStore>,
359 input: Box<dyn Operator>,
361 node_column: usize,
363 output_schema: Vec<LogicalType>,
365 detach: bool,
367 viewing_epoch: Option<EpochId>,
369 #[allow(dead_code)]
371 tx_id: Option<TxId>,
372}
373
374impl DeleteNodeOperator {
375 pub fn new(
377 store: Arc<LpgStore>,
378 input: Box<dyn Operator>,
379 node_column: usize,
380 output_schema: Vec<LogicalType>,
381 detach: bool,
382 ) -> Self {
383 Self {
384 store,
385 input,
386 node_column,
387 output_schema,
388 detach,
389 viewing_epoch: None,
390 tx_id: None,
391 }
392 }
393
394 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
396 self.viewing_epoch = Some(epoch);
397 self.tx_id = tx_id;
398 self
399 }
400}
401
402impl Operator for DeleteNodeOperator {
403 fn next(&mut self) -> OperatorResult {
404 let epoch = self
406 .viewing_epoch
407 .unwrap_or_else(|| self.store.current_epoch());
408
409 if let Some(chunk) = self.input.next()? {
410 let mut deleted_count = 0;
411
412 for row in chunk.selected_indices() {
413 let node_val = chunk
414 .column(self.node_column)
415 .and_then(|c| c.get_value(row))
416 .ok_or_else(|| {
417 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
418 })?;
419
420 let node_id = match node_val {
421 Value::Int64(id) => NodeId(id as u64),
422 _ => {
423 return Err(OperatorError::TypeMismatch {
424 expected: "Int64 (node ID)".to_string(),
425 found: format!("{node_val:?}"),
426 });
427 }
428 };
429
430 if self.detach {
431 self.store.delete_node_edges(node_id);
434 }
435
436 if self.store.delete_node_at_epoch(node_id, epoch) {
438 deleted_count += 1;
439 }
440 }
441
442 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
444 if let Some(dst) = builder.column_mut(0) {
445 dst.push_value(Value::Int64(deleted_count));
446 }
447 builder.advance_row();
448
449 return Ok(Some(builder.finish()));
450 }
451 Ok(None)
452 }
453
454 fn reset(&mut self) {
455 self.input.reset();
456 }
457
458 fn name(&self) -> &'static str {
459 "DeleteNode"
460 }
461}
462
463pub struct DeleteEdgeOperator {
465 store: Arc<LpgStore>,
467 input: Box<dyn Operator>,
469 edge_column: usize,
471 output_schema: Vec<LogicalType>,
473 viewing_epoch: Option<EpochId>,
475 #[allow(dead_code)]
477 tx_id: Option<TxId>,
478}
479
480impl DeleteEdgeOperator {
481 pub fn new(
483 store: Arc<LpgStore>,
484 input: Box<dyn Operator>,
485 edge_column: usize,
486 output_schema: Vec<LogicalType>,
487 ) -> Self {
488 Self {
489 store,
490 input,
491 edge_column,
492 output_schema,
493 viewing_epoch: None,
494 tx_id: None,
495 }
496 }
497
498 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
500 self.viewing_epoch = Some(epoch);
501 self.tx_id = tx_id;
502 self
503 }
504}
505
506impl Operator for DeleteEdgeOperator {
507 fn next(&mut self) -> OperatorResult {
508 let epoch = self
510 .viewing_epoch
511 .unwrap_or_else(|| self.store.current_epoch());
512
513 if let Some(chunk) = self.input.next()? {
514 let mut deleted_count = 0;
515
516 for row in chunk.selected_indices() {
517 let edge_val = chunk
518 .column(self.edge_column)
519 .and_then(|c| c.get_value(row))
520 .ok_or_else(|| {
521 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
522 })?;
523
524 let edge_id = match edge_val {
525 Value::Int64(id) => EdgeId(id as u64),
526 _ => {
527 return Err(OperatorError::TypeMismatch {
528 expected: "Int64 (edge ID)".to_string(),
529 found: format!("{edge_val:?}"),
530 });
531 }
532 };
533
534 if self.store.delete_edge_at_epoch(edge_id, epoch) {
536 deleted_count += 1;
537 }
538 }
539
540 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
542 if let Some(dst) = builder.column_mut(0) {
543 dst.push_value(Value::Int64(deleted_count));
544 }
545 builder.advance_row();
546
547 return Ok(Some(builder.finish()));
548 }
549 Ok(None)
550 }
551
552 fn reset(&mut self) {
553 self.input.reset();
554 }
555
556 fn name(&self) -> &'static str {
557 "DeleteEdge"
558 }
559}
560
561pub struct AddLabelOperator {
563 store: Arc<LpgStore>,
565 input: Box<dyn Operator>,
567 node_column: usize,
569 labels: Vec<String>,
571 output_schema: Vec<LogicalType>,
573}
574
575impl AddLabelOperator {
576 pub fn new(
578 store: Arc<LpgStore>,
579 input: Box<dyn Operator>,
580 node_column: usize,
581 labels: Vec<String>,
582 output_schema: Vec<LogicalType>,
583 ) -> Self {
584 Self {
585 store,
586 input,
587 node_column,
588 labels,
589 output_schema,
590 }
591 }
592}
593
594impl Operator for AddLabelOperator {
595 fn next(&mut self) -> OperatorResult {
596 if let Some(chunk) = self.input.next()? {
597 let mut updated_count = 0;
598
599 for row in chunk.selected_indices() {
600 let node_val = chunk
601 .column(self.node_column)
602 .and_then(|c| c.get_value(row))
603 .ok_or_else(|| {
604 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
605 })?;
606
607 let node_id = match node_val {
608 Value::Int64(id) => NodeId(id as u64),
609 _ => {
610 return Err(OperatorError::TypeMismatch {
611 expected: "Int64 (node ID)".to_string(),
612 found: format!("{node_val:?}"),
613 });
614 }
615 };
616
617 for label in &self.labels {
619 if self.store.add_label(node_id, label) {
620 updated_count += 1;
621 }
622 }
623 }
624
625 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627 if let Some(dst) = builder.column_mut(0) {
628 dst.push_value(Value::Int64(updated_count));
629 }
630 builder.advance_row();
631
632 return Ok(Some(builder.finish()));
633 }
634 Ok(None)
635 }
636
637 fn reset(&mut self) {
638 self.input.reset();
639 }
640
641 fn name(&self) -> &'static str {
642 "AddLabel"
643 }
644}
645
646pub struct RemoveLabelOperator {
648 store: Arc<LpgStore>,
650 input: Box<dyn Operator>,
652 node_column: usize,
654 labels: Vec<String>,
656 output_schema: Vec<LogicalType>,
658}
659
660impl RemoveLabelOperator {
661 pub fn new(
663 store: Arc<LpgStore>,
664 input: Box<dyn Operator>,
665 node_column: usize,
666 labels: Vec<String>,
667 output_schema: Vec<LogicalType>,
668 ) -> Self {
669 Self {
670 store,
671 input,
672 node_column,
673 labels,
674 output_schema,
675 }
676 }
677}
678
679impl Operator for RemoveLabelOperator {
680 fn next(&mut self) -> OperatorResult {
681 if let Some(chunk) = self.input.next()? {
682 let mut updated_count = 0;
683
684 for row in chunk.selected_indices() {
685 let node_val = chunk
686 .column(self.node_column)
687 .and_then(|c| c.get_value(row))
688 .ok_or_else(|| {
689 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
690 })?;
691
692 let node_id = match node_val {
693 Value::Int64(id) => NodeId(id as u64),
694 _ => {
695 return Err(OperatorError::TypeMismatch {
696 expected: "Int64 (node ID)".to_string(),
697 found: format!("{node_val:?}"),
698 });
699 }
700 };
701
702 for label in &self.labels {
704 if self.store.remove_label(node_id, label) {
705 updated_count += 1;
706 }
707 }
708 }
709
710 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
712 if let Some(dst) = builder.column_mut(0) {
713 dst.push_value(Value::Int64(updated_count));
714 }
715 builder.advance_row();
716
717 return Ok(Some(builder.finish()));
718 }
719 Ok(None)
720 }
721
722 fn reset(&mut self) {
723 self.input.reset();
724 }
725
726 fn name(&self) -> &'static str {
727 "RemoveLabel"
728 }
729}
730
731pub struct SetPropertyOperator {
736 store: Arc<LpgStore>,
738 input: Box<dyn Operator>,
740 entity_column: usize,
742 is_edge: bool,
744 properties: Vec<(String, PropertySource)>,
746 output_schema: Vec<LogicalType>,
748}
749
750impl SetPropertyOperator {
751 pub fn new_for_node(
753 store: Arc<LpgStore>,
754 input: Box<dyn Operator>,
755 node_column: usize,
756 properties: Vec<(String, PropertySource)>,
757 output_schema: Vec<LogicalType>,
758 ) -> Self {
759 Self {
760 store,
761 input,
762 entity_column: node_column,
763 is_edge: false,
764 properties,
765 output_schema,
766 }
767 }
768
769 pub fn new_for_edge(
771 store: Arc<LpgStore>,
772 input: Box<dyn Operator>,
773 edge_column: usize,
774 properties: Vec<(String, PropertySource)>,
775 output_schema: Vec<LogicalType>,
776 ) -> Self {
777 Self {
778 store,
779 input,
780 entity_column: edge_column,
781 is_edge: true,
782 properties,
783 output_schema,
784 }
785 }
786}
787
788impl Operator for SetPropertyOperator {
789 fn next(&mut self) -> OperatorResult {
790 if let Some(chunk) = self.input.next()? {
791 let mut builder =
792 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
793
794 for row in chunk.selected_indices() {
795 let entity_val = chunk
796 .column(self.entity_column)
797 .and_then(|c| c.get_value(row))
798 .ok_or_else(|| {
799 OperatorError::ColumnNotFound(format!(
800 "entity column {}",
801 self.entity_column
802 ))
803 })?;
804
805 let entity_id = match entity_val {
806 Value::Int64(id) => id as u64,
807 _ => {
808 return Err(OperatorError::TypeMismatch {
809 expected: "Int64 (entity ID)".to_string(),
810 found: format!("{entity_val:?}"),
811 });
812 }
813 };
814
815 for (prop_name, source) in &self.properties {
817 let value = match source {
818 PropertySource::Column(col_idx) => chunk
819 .column(*col_idx)
820 .and_then(|c| c.get_value(row))
821 .unwrap_or(Value::Null),
822 PropertySource::Constant(v) => v.clone(),
823 };
824
825 if self.is_edge {
826 self.store
827 .set_edge_property(EdgeId(entity_id), prop_name, value);
828 } else {
829 self.store
830 .set_node_property(NodeId(entity_id), prop_name, value);
831 }
832 }
833
834 for col_idx in 0..chunk.column_count() {
836 if let (Some(src), Some(dst)) =
837 (chunk.column(col_idx), builder.column_mut(col_idx))
838 {
839 if let Some(val) = src.get_value(row) {
840 dst.push_value(val);
841 } else {
842 dst.push_value(Value::Null);
843 }
844 }
845 }
846
847 builder.advance_row();
848 }
849
850 return Ok(Some(builder.finish()));
851 }
852 Ok(None)
853 }
854
855 fn reset(&mut self) {
856 self.input.reset();
857 }
858
859 fn name(&self) -> &'static str {
860 "SetProperty"
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867 use crate::execution::DataChunk;
868 use crate::execution::chunk::DataChunkBuilder;
869
870 fn create_test_store() -> Arc<LpgStore> {
871 Arc::new(LpgStore::new())
872 }
873
874 #[test]
875 fn test_create_node_standalone() {
876 let store = create_test_store();
877
878 let mut op = CreateNodeOperator::new(
879 Arc::clone(&store),
880 None,
881 vec!["Person".to_string()],
882 vec![(
883 "name".to_string(),
884 PropertySource::Constant(Value::String("Alice".into())),
885 )],
886 vec![LogicalType::Int64],
887 0,
888 );
889
890 let chunk = op.next().unwrap().unwrap();
892 assert_eq!(chunk.row_count(), 1);
893
894 assert!(op.next().unwrap().is_none());
896
897 assert_eq!(store.node_count(), 1);
899 }
900
901 #[test]
902 fn test_create_edge() {
903 let store = create_test_store();
904
905 let node1 = store.create_node(&["Person"]);
907 let node2 = store.create_node(&["Person"]);
908
909 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
911 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
912 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
913 builder.advance_row();
914 let input_chunk = builder.finish();
915
916 struct MockInput {
918 chunk: Option<DataChunk>,
919 }
920 impl Operator for MockInput {
921 fn next(&mut self) -> OperatorResult {
922 Ok(self.chunk.take())
923 }
924 fn reset(&mut self) {}
925 fn name(&self) -> &'static str {
926 "MockInput"
927 }
928 }
929
930 let mut op = CreateEdgeOperator::new(
931 Arc::clone(&store),
932 Box::new(MockInput {
933 chunk: Some(input_chunk),
934 }),
935 0, 1, "KNOWS".to_string(),
938 vec![],
939 vec![LogicalType::Int64, LogicalType::Int64],
940 None,
941 );
942
943 let _chunk = op.next().unwrap().unwrap();
945
946 assert_eq!(store.edge_count(), 1);
948 }
949
950 #[test]
951 fn test_delete_node() {
952 let store = create_test_store();
953
954 let node_id = store.create_node(&["Person"]);
956 assert_eq!(store.node_count(), 1);
957
958 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
960 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
961 builder.advance_row();
962 let input_chunk = builder.finish();
963
964 struct MockInput {
965 chunk: Option<DataChunk>,
966 }
967 impl Operator for MockInput {
968 fn next(&mut self) -> OperatorResult {
969 Ok(self.chunk.take())
970 }
971 fn reset(&mut self) {}
972 fn name(&self) -> &'static str {
973 "MockInput"
974 }
975 }
976
977 let mut op = DeleteNodeOperator::new(
978 Arc::clone(&store),
979 Box::new(MockInput {
980 chunk: Some(input_chunk),
981 }),
982 0,
983 vec![LogicalType::Int64],
984 false,
985 );
986
987 let chunk = op.next().unwrap().unwrap();
989
990 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
992 assert_eq!(deleted, 1);
993 assert_eq!(store.node_count(), 0);
994 }
995}