1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17pub struct CreateNodeOperator {
22 store: Arc<LpgStore>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49}
50
51impl CreateNodeOperator {
52 pub fn new(
62 store: Arc<LpgStore>,
63 input: Option<Box<dyn Operator>>,
64 labels: Vec<String>,
65 properties: Vec<(String, PropertySource)>,
66 output_schema: Vec<LogicalType>,
67 output_column: usize,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 labels,
73 properties,
74 output_schema,
75 output_column,
76 executed: false,
77 viewing_epoch: None,
78 tx_id: None,
79 }
80 }
81
82 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84 self.viewing_epoch = Some(epoch);
85 self.tx_id = tx_id;
86 self
87 }
88}
89
90impl Operator for CreateNodeOperator {
91 fn next(&mut self) -> OperatorResult {
92 let epoch = self
94 .viewing_epoch
95 .unwrap_or_else(|| self.store.current_epoch());
96 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98 if let Some(ref mut input) = self.input {
99 if let Some(chunk) = input.next()? {
101 let mut builder =
102 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104 for row in chunk.selected_indices() {
105 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109 for (prop_name, source) in &self.properties {
111 let value = match source {
112 PropertySource::Column(col_idx) => chunk
113 .column(*col_idx)
114 .and_then(|c| c.get_value(row))
115 .unwrap_or(Value::Null),
116 PropertySource::Constant(v) => v.clone(),
117 };
118 self.store.set_node_property(node_id, prop_name, value);
119 }
120
121 for col_idx in 0..chunk.column_count() {
123 if col_idx < self.output_column {
124 if let (Some(src), Some(dst)) =
125 (chunk.column(col_idx), builder.column_mut(col_idx))
126 {
127 if let Some(val) = src.get_value(row) {
128 dst.push_value(val);
129 } else {
130 dst.push_value(Value::Null);
131 }
132 }
133 }
134 }
135
136 if let Some(dst) = builder.column_mut(self.output_column) {
138 dst.push_value(Value::Int64(node_id.0 as i64));
139 }
140
141 builder.advance_row();
142 }
143
144 return Ok(Some(builder.finish()));
145 }
146 Ok(None)
147 } else {
148 if self.executed {
150 return Ok(None);
151 }
152 self.executed = true;
153
154 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
156 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
157
158 for (prop_name, source) in &self.properties {
160 if let PropertySource::Constant(value) = source {
161 self.store
162 .set_node_property(node_id, prop_name, value.clone());
163 }
164 }
165
166 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
168 if let Some(dst) = builder.column_mut(self.output_column) {
169 dst.push_value(Value::Int64(node_id.0 as i64));
170 }
171 builder.advance_row();
172
173 Ok(Some(builder.finish()))
174 }
175 }
176
177 fn reset(&mut self) {
178 if let Some(ref mut input) = self.input {
179 input.reset();
180 }
181 self.executed = false;
182 }
183
184 fn name(&self) -> &'static str {
185 "CreateNode"
186 }
187}
188
189pub struct CreateEdgeOperator {
191 store: Arc<LpgStore>,
193 input: Box<dyn Operator>,
195 from_column: usize,
197 to_column: usize,
199 edge_type: String,
201 properties: Vec<(String, PropertySource)>,
203 output_schema: Vec<LogicalType>,
205 output_column: Option<usize>,
207 viewing_epoch: Option<EpochId>,
209 tx_id: Option<TxId>,
211}
212
213impl CreateEdgeOperator {
214 pub fn new(
221 store: Arc<LpgStore>,
222 input: Box<dyn Operator>,
223 from_column: usize,
224 to_column: usize,
225 edge_type: String,
226 output_schema: Vec<LogicalType>,
227 ) -> Self {
228 Self {
229 store,
230 input,
231 from_column,
232 to_column,
233 edge_type,
234 properties: Vec::new(),
235 output_schema,
236 output_column: None,
237 viewing_epoch: None,
238 tx_id: None,
239 }
240 }
241
242 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
244 self.properties = properties;
245 self
246 }
247
248 pub fn with_output_column(mut self, column: usize) -> Self {
250 self.output_column = Some(column);
251 self
252 }
253
254 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
256 self.viewing_epoch = Some(epoch);
257 self.tx_id = tx_id;
258 self
259 }
260}
261
262impl Operator for CreateEdgeOperator {
263 fn next(&mut self) -> OperatorResult {
264 let epoch = self
266 .viewing_epoch
267 .unwrap_or_else(|| self.store.current_epoch());
268 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
269
270 if let Some(chunk) = self.input.next()? {
271 let mut builder =
272 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
273
274 for row in chunk.selected_indices() {
275 let from_id = chunk
277 .column(self.from_column)
278 .and_then(|c| c.get_value(row))
279 .ok_or_else(|| {
280 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
281 })?;
282
283 let to_id = chunk
284 .column(self.to_column)
285 .and_then(|c| c.get_value(row))
286 .ok_or_else(|| {
287 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
288 })?;
289
290 let from_node_id = match from_id {
292 Value::Int64(id) => NodeId(id as u64),
293 _ => {
294 return Err(OperatorError::TypeMismatch {
295 expected: "Int64 (node ID)".to_string(),
296 found: format!("{from_id:?}"),
297 });
298 }
299 };
300
301 let to_node_id = match to_id {
302 Value::Int64(id) => NodeId(id as u64),
303 _ => {
304 return Err(OperatorError::TypeMismatch {
305 expected: "Int64 (node ID)".to_string(),
306 found: format!("{to_id:?}"),
307 });
308 }
309 };
310
311 let edge_id = self.store.create_edge_versioned(
313 from_node_id,
314 to_node_id,
315 &self.edge_type,
316 epoch,
317 tx,
318 );
319
320 for (prop_name, source) in &self.properties {
322 let value = match source {
323 PropertySource::Column(col_idx) => chunk
324 .column(*col_idx)
325 .and_then(|c| c.get_value(row))
326 .unwrap_or(Value::Null),
327 PropertySource::Constant(v) => v.clone(),
328 };
329 self.store.set_edge_property(edge_id, prop_name, value);
330 }
331
332 for col_idx in 0..chunk.column_count() {
334 if let (Some(src), Some(dst)) =
335 (chunk.column(col_idx), builder.column_mut(col_idx))
336 {
337 if let Some(val) = src.get_value(row) {
338 dst.push_value(val);
339 } else {
340 dst.push_value(Value::Null);
341 }
342 }
343 }
344
345 if let Some(out_col) = self.output_column {
347 if let Some(dst) = builder.column_mut(out_col) {
348 dst.push_value(Value::Int64(edge_id.0 as i64));
349 }
350 }
351
352 builder.advance_row();
353 }
354
355 return Ok(Some(builder.finish()));
356 }
357 Ok(None)
358 }
359
360 fn reset(&mut self) {
361 self.input.reset();
362 }
363
364 fn name(&self) -> &'static str {
365 "CreateEdge"
366 }
367}
368
369pub struct DeleteNodeOperator {
371 store: Arc<LpgStore>,
373 input: Box<dyn Operator>,
375 node_column: usize,
377 output_schema: Vec<LogicalType>,
379 detach: bool,
381 viewing_epoch: Option<EpochId>,
383 #[allow(dead_code)]
385 tx_id: Option<TxId>,
386}
387
388impl DeleteNodeOperator {
389 pub fn new(
391 store: Arc<LpgStore>,
392 input: Box<dyn Operator>,
393 node_column: usize,
394 output_schema: Vec<LogicalType>,
395 detach: bool,
396 ) -> Self {
397 Self {
398 store,
399 input,
400 node_column,
401 output_schema,
402 detach,
403 viewing_epoch: None,
404 tx_id: None,
405 }
406 }
407
408 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
410 self.viewing_epoch = Some(epoch);
411 self.tx_id = tx_id;
412 self
413 }
414}
415
416impl Operator for DeleteNodeOperator {
417 fn next(&mut self) -> OperatorResult {
418 let epoch = self
420 .viewing_epoch
421 .unwrap_or_else(|| self.store.current_epoch());
422
423 if let Some(chunk) = self.input.next()? {
424 let mut deleted_count = 0;
425
426 for row in chunk.selected_indices() {
427 let node_val = chunk
428 .column(self.node_column)
429 .and_then(|c| c.get_value(row))
430 .ok_or_else(|| {
431 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
432 })?;
433
434 let node_id = match node_val {
435 Value::Int64(id) => NodeId(id as u64),
436 _ => {
437 return Err(OperatorError::TypeMismatch {
438 expected: "Int64 (node ID)".to_string(),
439 found: format!("{node_val:?}"),
440 });
441 }
442 };
443
444 if self.detach {
445 self.store.delete_node_edges(node_id);
448 }
449
450 if self.store.delete_node_at_epoch(node_id, epoch) {
452 deleted_count += 1;
453 }
454 }
455
456 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
458 if let Some(dst) = builder.column_mut(0) {
459 dst.push_value(Value::Int64(deleted_count));
460 }
461 builder.advance_row();
462
463 return Ok(Some(builder.finish()));
464 }
465 Ok(None)
466 }
467
468 fn reset(&mut self) {
469 self.input.reset();
470 }
471
472 fn name(&self) -> &'static str {
473 "DeleteNode"
474 }
475}
476
477pub struct DeleteEdgeOperator {
479 store: Arc<LpgStore>,
481 input: Box<dyn Operator>,
483 edge_column: usize,
485 output_schema: Vec<LogicalType>,
487 viewing_epoch: Option<EpochId>,
489 #[allow(dead_code)]
491 tx_id: Option<TxId>,
492}
493
494impl DeleteEdgeOperator {
495 pub fn new(
497 store: Arc<LpgStore>,
498 input: Box<dyn Operator>,
499 edge_column: usize,
500 output_schema: Vec<LogicalType>,
501 ) -> Self {
502 Self {
503 store,
504 input,
505 edge_column,
506 output_schema,
507 viewing_epoch: None,
508 tx_id: None,
509 }
510 }
511
512 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
514 self.viewing_epoch = Some(epoch);
515 self.tx_id = tx_id;
516 self
517 }
518}
519
520impl Operator for DeleteEdgeOperator {
521 fn next(&mut self) -> OperatorResult {
522 let epoch = self
524 .viewing_epoch
525 .unwrap_or_else(|| self.store.current_epoch());
526
527 if let Some(chunk) = self.input.next()? {
528 let mut deleted_count = 0;
529
530 for row in chunk.selected_indices() {
531 let edge_val = chunk
532 .column(self.edge_column)
533 .and_then(|c| c.get_value(row))
534 .ok_or_else(|| {
535 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
536 })?;
537
538 let edge_id = match edge_val {
539 Value::Int64(id) => EdgeId(id as u64),
540 _ => {
541 return Err(OperatorError::TypeMismatch {
542 expected: "Int64 (edge ID)".to_string(),
543 found: format!("{edge_val:?}"),
544 });
545 }
546 };
547
548 if self.store.delete_edge_at_epoch(edge_id, epoch) {
550 deleted_count += 1;
551 }
552 }
553
554 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
556 if let Some(dst) = builder.column_mut(0) {
557 dst.push_value(Value::Int64(deleted_count));
558 }
559 builder.advance_row();
560
561 return Ok(Some(builder.finish()));
562 }
563 Ok(None)
564 }
565
566 fn reset(&mut self) {
567 self.input.reset();
568 }
569
570 fn name(&self) -> &'static str {
571 "DeleteEdge"
572 }
573}
574
575pub struct AddLabelOperator {
577 store: Arc<LpgStore>,
579 input: Box<dyn Operator>,
581 node_column: usize,
583 labels: Vec<String>,
585 output_schema: Vec<LogicalType>,
587}
588
589impl AddLabelOperator {
590 pub fn new(
592 store: Arc<LpgStore>,
593 input: Box<dyn Operator>,
594 node_column: usize,
595 labels: Vec<String>,
596 output_schema: Vec<LogicalType>,
597 ) -> Self {
598 Self {
599 store,
600 input,
601 node_column,
602 labels,
603 output_schema,
604 }
605 }
606}
607
608impl Operator for AddLabelOperator {
609 fn next(&mut self) -> OperatorResult {
610 if let Some(chunk) = self.input.next()? {
611 let mut updated_count = 0;
612
613 for row in chunk.selected_indices() {
614 let node_val = chunk
615 .column(self.node_column)
616 .and_then(|c| c.get_value(row))
617 .ok_or_else(|| {
618 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
619 })?;
620
621 let node_id = match node_val {
622 Value::Int64(id) => NodeId(id as u64),
623 _ => {
624 return Err(OperatorError::TypeMismatch {
625 expected: "Int64 (node ID)".to_string(),
626 found: format!("{node_val:?}"),
627 });
628 }
629 };
630
631 for label in &self.labels {
633 if self.store.add_label(node_id, label) {
634 updated_count += 1;
635 }
636 }
637 }
638
639 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
641 if let Some(dst) = builder.column_mut(0) {
642 dst.push_value(Value::Int64(updated_count));
643 }
644 builder.advance_row();
645
646 return Ok(Some(builder.finish()));
647 }
648 Ok(None)
649 }
650
651 fn reset(&mut self) {
652 self.input.reset();
653 }
654
655 fn name(&self) -> &'static str {
656 "AddLabel"
657 }
658}
659
660pub struct RemoveLabelOperator {
662 store: Arc<LpgStore>,
664 input: Box<dyn Operator>,
666 node_column: usize,
668 labels: Vec<String>,
670 output_schema: Vec<LogicalType>,
672}
673
674impl RemoveLabelOperator {
675 pub fn new(
677 store: Arc<LpgStore>,
678 input: Box<dyn Operator>,
679 node_column: usize,
680 labels: Vec<String>,
681 output_schema: Vec<LogicalType>,
682 ) -> Self {
683 Self {
684 store,
685 input,
686 node_column,
687 labels,
688 output_schema,
689 }
690 }
691}
692
693impl Operator for RemoveLabelOperator {
694 fn next(&mut self) -> OperatorResult {
695 if let Some(chunk) = self.input.next()? {
696 let mut updated_count = 0;
697
698 for row in chunk.selected_indices() {
699 let node_val = chunk
700 .column(self.node_column)
701 .and_then(|c| c.get_value(row))
702 .ok_or_else(|| {
703 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
704 })?;
705
706 let node_id = match node_val {
707 Value::Int64(id) => NodeId(id as u64),
708 _ => {
709 return Err(OperatorError::TypeMismatch {
710 expected: "Int64 (node ID)".to_string(),
711 found: format!("{node_val:?}"),
712 });
713 }
714 };
715
716 for label in &self.labels {
718 if self.store.remove_label(node_id, label) {
719 updated_count += 1;
720 }
721 }
722 }
723
724 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
726 if let Some(dst) = builder.column_mut(0) {
727 dst.push_value(Value::Int64(updated_count));
728 }
729 builder.advance_row();
730
731 return Ok(Some(builder.finish()));
732 }
733 Ok(None)
734 }
735
736 fn reset(&mut self) {
737 self.input.reset();
738 }
739
740 fn name(&self) -> &'static str {
741 "RemoveLabel"
742 }
743}
744
745pub struct SetPropertyOperator {
750 store: Arc<LpgStore>,
752 input: Box<dyn Operator>,
754 entity_column: usize,
756 is_edge: bool,
758 properties: Vec<(String, PropertySource)>,
760 output_schema: Vec<LogicalType>,
762}
763
764impl SetPropertyOperator {
765 pub fn new_for_node(
767 store: Arc<LpgStore>,
768 input: Box<dyn Operator>,
769 node_column: usize,
770 properties: Vec<(String, PropertySource)>,
771 output_schema: Vec<LogicalType>,
772 ) -> Self {
773 Self {
774 store,
775 input,
776 entity_column: node_column,
777 is_edge: false,
778 properties,
779 output_schema,
780 }
781 }
782
783 pub fn new_for_edge(
785 store: Arc<LpgStore>,
786 input: Box<dyn Operator>,
787 edge_column: usize,
788 properties: Vec<(String, PropertySource)>,
789 output_schema: Vec<LogicalType>,
790 ) -> Self {
791 Self {
792 store,
793 input,
794 entity_column: edge_column,
795 is_edge: true,
796 properties,
797 output_schema,
798 }
799 }
800}
801
802impl Operator for SetPropertyOperator {
803 fn next(&mut self) -> OperatorResult {
804 if let Some(chunk) = self.input.next()? {
805 let mut builder =
806 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
807
808 for row in chunk.selected_indices() {
809 let entity_val = chunk
810 .column(self.entity_column)
811 .and_then(|c| c.get_value(row))
812 .ok_or_else(|| {
813 OperatorError::ColumnNotFound(format!(
814 "entity column {}",
815 self.entity_column
816 ))
817 })?;
818
819 let entity_id = match entity_val {
820 Value::Int64(id) => id as u64,
821 _ => {
822 return Err(OperatorError::TypeMismatch {
823 expected: "Int64 (entity ID)".to_string(),
824 found: format!("{entity_val:?}"),
825 });
826 }
827 };
828
829 for (prop_name, source) in &self.properties {
831 let value = match source {
832 PropertySource::Column(col_idx) => chunk
833 .column(*col_idx)
834 .and_then(|c| c.get_value(row))
835 .unwrap_or(Value::Null),
836 PropertySource::Constant(v) => v.clone(),
837 };
838
839 if self.is_edge {
840 self.store
841 .set_edge_property(EdgeId(entity_id), prop_name, value);
842 } else {
843 self.store
844 .set_node_property(NodeId(entity_id), prop_name, value);
845 }
846 }
847
848 for col_idx in 0..chunk.column_count() {
850 if let (Some(src), Some(dst)) =
851 (chunk.column(col_idx), builder.column_mut(col_idx))
852 {
853 if let Some(val) = src.get_value(row) {
854 dst.push_value(val);
855 } else {
856 dst.push_value(Value::Null);
857 }
858 }
859 }
860
861 builder.advance_row();
862 }
863
864 return Ok(Some(builder.finish()));
865 }
866 Ok(None)
867 }
868
869 fn reset(&mut self) {
870 self.input.reset();
871 }
872
873 fn name(&self) -> &'static str {
874 "SetProperty"
875 }
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use crate::execution::DataChunk;
882 use crate::execution::chunk::DataChunkBuilder;
883
884 fn create_test_store() -> Arc<LpgStore> {
885 Arc::new(LpgStore::new())
886 }
887
888 #[test]
889 fn test_create_node_standalone() {
890 let store = create_test_store();
891
892 let mut op = CreateNodeOperator::new(
893 Arc::clone(&store),
894 None,
895 vec!["Person".to_string()],
896 vec![(
897 "name".to_string(),
898 PropertySource::Constant(Value::String("Alice".into())),
899 )],
900 vec![LogicalType::Int64],
901 0,
902 );
903
904 let chunk = op.next().unwrap().unwrap();
906 assert_eq!(chunk.row_count(), 1);
907
908 assert!(op.next().unwrap().is_none());
910
911 assert_eq!(store.node_count(), 1);
913 }
914
915 #[test]
916 fn test_create_edge() {
917 let store = create_test_store();
918
919 let node1 = store.create_node(&["Person"]);
921 let node2 = store.create_node(&["Person"]);
922
923 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
925 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
926 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
927 builder.advance_row();
928 let input_chunk = builder.finish();
929
930 struct MockInput {
932 chunk: Option<DataChunk>,
933 }
934 impl Operator for MockInput {
935 fn next(&mut self) -> OperatorResult {
936 Ok(self.chunk.take())
937 }
938 fn reset(&mut self) {}
939 fn name(&self) -> &'static str {
940 "MockInput"
941 }
942 }
943
944 let mut op = CreateEdgeOperator::new(
945 Arc::clone(&store),
946 Box::new(MockInput {
947 chunk: Some(input_chunk),
948 }),
949 0, 1, "KNOWS".to_string(),
952 vec![LogicalType::Int64, LogicalType::Int64],
953 );
954
955 let _chunk = op.next().unwrap().unwrap();
957
958 assert_eq!(store.edge_count(), 1);
960 }
961
962 #[test]
963 fn test_delete_node() {
964 let store = create_test_store();
965
966 let node_id = store.create_node(&["Person"]);
968 assert_eq!(store.node_count(), 1);
969
970 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
972 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
973 builder.advance_row();
974 let input_chunk = builder.finish();
975
976 struct MockInput {
977 chunk: Option<DataChunk>,
978 }
979 impl Operator for MockInput {
980 fn next(&mut self) -> OperatorResult {
981 Ok(self.chunk.take())
982 }
983 fn reset(&mut self) {}
984 fn name(&self) -> &'static str {
985 "MockInput"
986 }
987 }
988
989 let mut op = DeleteNodeOperator::new(
990 Arc::clone(&store),
991 Box::new(MockInput {
992 chunk: Some(input_chunk),
993 }),
994 0,
995 vec![LogicalType::Int64],
996 false,
997 );
998
999 let chunk = op.next().unwrap().unwrap();
1001
1002 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1004 assert_eq!(deleted, 1);
1005 assert_eq!(store.node_count(), 0);
1006 }
1007}