1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17pub struct CreateNodeOperator {
22 store: Arc<LpgStore>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49}
50
51impl CreateNodeOperator {
52 pub fn new(
62 store: Arc<LpgStore>,
63 input: Option<Box<dyn Operator>>,
64 labels: Vec<String>,
65 properties: Vec<(String, PropertySource)>,
66 output_schema: Vec<LogicalType>,
67 output_column: usize,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 labels,
73 properties,
74 output_schema,
75 output_column,
76 executed: false,
77 viewing_epoch: None,
78 tx_id: None,
79 }
80 }
81
82 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84 self.viewing_epoch = Some(epoch);
85 self.tx_id = tx_id;
86 self
87 }
88}
89
90impl Operator for CreateNodeOperator {
91 fn next(&mut self) -> OperatorResult {
92 let epoch = self
94 .viewing_epoch
95 .unwrap_or_else(|| self.store.current_epoch());
96 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98 if let Some(ref mut input) = self.input {
99 if let Some(chunk) = input.next()? {
101 let mut builder =
102 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104 for row in chunk.selected_indices() {
105 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109 for (prop_name, source) in &self.properties {
111 let value = match source {
112 PropertySource::Column(col_idx) => chunk
113 .column(*col_idx)
114 .and_then(|c| c.get_value(row))
115 .unwrap_or(Value::Null),
116 PropertySource::Constant(v) => v.clone(),
117 };
118 self.store.set_node_property(node_id, prop_name, value);
119 }
120
121 for col_idx in 0..chunk.column_count() {
123 if col_idx < self.output_column
124 && let (Some(src), Some(dst)) =
125 (chunk.column(col_idx), builder.column_mut(col_idx))
126 {
127 if let Some(val) = src.get_value(row) {
128 dst.push_value(val);
129 } else {
130 dst.push_value(Value::Null);
131 }
132 }
133 }
134
135 if let Some(dst) = builder.column_mut(self.output_column) {
137 dst.push_value(Value::Int64(node_id.0 as i64));
138 }
139
140 builder.advance_row();
141 }
142
143 return Ok(Some(builder.finish()));
144 }
145 Ok(None)
146 } else {
147 if self.executed {
149 return Ok(None);
150 }
151 self.executed = true;
152
153 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
155 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
156
157 for (prop_name, source) in &self.properties {
159 if let PropertySource::Constant(value) = source {
160 self.store
161 .set_node_property(node_id, prop_name, value.clone());
162 }
163 }
164
165 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
167 if let Some(dst) = builder.column_mut(self.output_column) {
168 dst.push_value(Value::Int64(node_id.0 as i64));
169 }
170 builder.advance_row();
171
172 Ok(Some(builder.finish()))
173 }
174 }
175
176 fn reset(&mut self) {
177 if let Some(ref mut input) = self.input {
178 input.reset();
179 }
180 self.executed = false;
181 }
182
183 fn name(&self) -> &'static str {
184 "CreateNode"
185 }
186}
187
188pub struct CreateEdgeOperator {
190 store: Arc<LpgStore>,
192 input: Box<dyn Operator>,
194 from_column: usize,
196 to_column: usize,
198 edge_type: String,
200 properties: Vec<(String, PropertySource)>,
202 output_schema: Vec<LogicalType>,
204 output_column: Option<usize>,
206 viewing_epoch: Option<EpochId>,
208 tx_id: Option<TxId>,
210}
211
212impl CreateEdgeOperator {
213 pub fn new(
220 store: Arc<LpgStore>,
221 input: Box<dyn Operator>,
222 from_column: usize,
223 to_column: usize,
224 edge_type: String,
225 output_schema: Vec<LogicalType>,
226 ) -> Self {
227 Self {
228 store,
229 input,
230 from_column,
231 to_column,
232 edge_type,
233 properties: Vec::new(),
234 output_schema,
235 output_column: None,
236 viewing_epoch: None,
237 tx_id: None,
238 }
239 }
240
241 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
243 self.properties = properties;
244 self
245 }
246
247 pub fn with_output_column(mut self, column: usize) -> Self {
249 self.output_column = Some(column);
250 self
251 }
252
253 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
255 self.viewing_epoch = Some(epoch);
256 self.tx_id = tx_id;
257 self
258 }
259}
260
261impl Operator for CreateEdgeOperator {
262 fn next(&mut self) -> OperatorResult {
263 let epoch = self
265 .viewing_epoch
266 .unwrap_or_else(|| self.store.current_epoch());
267 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
268
269 if let Some(chunk) = self.input.next()? {
270 let mut builder =
271 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
272
273 for row in chunk.selected_indices() {
274 let from_id = chunk
276 .column(self.from_column)
277 .and_then(|c| c.get_value(row))
278 .ok_or_else(|| {
279 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
280 })?;
281
282 let to_id = chunk
283 .column(self.to_column)
284 .and_then(|c| c.get_value(row))
285 .ok_or_else(|| {
286 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
287 })?;
288
289 let from_node_id = match from_id {
291 Value::Int64(id) => NodeId(id as u64),
292 _ => {
293 return Err(OperatorError::TypeMismatch {
294 expected: "Int64 (node ID)".to_string(),
295 found: format!("{from_id:?}"),
296 });
297 }
298 };
299
300 let to_node_id = match to_id {
301 Value::Int64(id) => NodeId(id as u64),
302 _ => {
303 return Err(OperatorError::TypeMismatch {
304 expected: "Int64 (node ID)".to_string(),
305 found: format!("{to_id:?}"),
306 });
307 }
308 };
309
310 let edge_id = self.store.create_edge_versioned(
312 from_node_id,
313 to_node_id,
314 &self.edge_type,
315 epoch,
316 tx,
317 );
318
319 for (prop_name, source) in &self.properties {
321 let value = match source {
322 PropertySource::Column(col_idx) => chunk
323 .column(*col_idx)
324 .and_then(|c| c.get_value(row))
325 .unwrap_or(Value::Null),
326 PropertySource::Constant(v) => v.clone(),
327 };
328 self.store.set_edge_property(edge_id, prop_name, value);
329 }
330
331 for col_idx in 0..chunk.column_count() {
333 if let (Some(src), Some(dst)) =
334 (chunk.column(col_idx), builder.column_mut(col_idx))
335 {
336 if let Some(val) = src.get_value(row) {
337 dst.push_value(val);
338 } else {
339 dst.push_value(Value::Null);
340 }
341 }
342 }
343
344 if let Some(out_col) = self.output_column
346 && let Some(dst) = builder.column_mut(out_col)
347 {
348 dst.push_value(Value::Int64(edge_id.0 as i64));
349 }
350
351 builder.advance_row();
352 }
353
354 return Ok(Some(builder.finish()));
355 }
356 Ok(None)
357 }
358
359 fn reset(&mut self) {
360 self.input.reset();
361 }
362
363 fn name(&self) -> &'static str {
364 "CreateEdge"
365 }
366}
367
368pub struct DeleteNodeOperator {
370 store: Arc<LpgStore>,
372 input: Box<dyn Operator>,
374 node_column: usize,
376 output_schema: Vec<LogicalType>,
378 detach: bool,
380 viewing_epoch: Option<EpochId>,
382}
383
384impl DeleteNodeOperator {
385 pub fn new(
387 store: Arc<LpgStore>,
388 input: Box<dyn Operator>,
389 node_column: usize,
390 output_schema: Vec<LogicalType>,
391 detach: bool,
392 ) -> Self {
393 Self {
394 store,
395 input,
396 node_column,
397 output_schema,
398 detach,
399 viewing_epoch: None,
400 }
401 }
402
403 pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
405 self.viewing_epoch = Some(epoch);
406 self
407 }
408}
409
410impl Operator for DeleteNodeOperator {
411 fn next(&mut self) -> OperatorResult {
412 let epoch = self
414 .viewing_epoch
415 .unwrap_or_else(|| self.store.current_epoch());
416
417 if let Some(chunk) = self.input.next()? {
418 let mut deleted_count = 0;
419
420 for row in chunk.selected_indices() {
421 let node_val = chunk
422 .column(self.node_column)
423 .and_then(|c| c.get_value(row))
424 .ok_or_else(|| {
425 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
426 })?;
427
428 let node_id = match node_val {
429 Value::Int64(id) => NodeId(id as u64),
430 _ => {
431 return Err(OperatorError::TypeMismatch {
432 expected: "Int64 (node ID)".to_string(),
433 found: format!("{node_val:?}"),
434 });
435 }
436 };
437
438 if self.detach {
439 self.store.delete_node_edges(node_id);
442 }
443
444 if self.store.delete_node_at_epoch(node_id, epoch) {
446 deleted_count += 1;
447 }
448 }
449
450 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
452 if let Some(dst) = builder.column_mut(0) {
453 dst.push_value(Value::Int64(deleted_count));
454 }
455 builder.advance_row();
456
457 return Ok(Some(builder.finish()));
458 }
459 Ok(None)
460 }
461
462 fn reset(&mut self) {
463 self.input.reset();
464 }
465
466 fn name(&self) -> &'static str {
467 "DeleteNode"
468 }
469}
470
471pub struct DeleteEdgeOperator {
473 store: Arc<LpgStore>,
475 input: Box<dyn Operator>,
477 edge_column: usize,
479 output_schema: Vec<LogicalType>,
481 viewing_epoch: Option<EpochId>,
483}
484
485impl DeleteEdgeOperator {
486 pub fn new(
488 store: Arc<LpgStore>,
489 input: Box<dyn Operator>,
490 edge_column: usize,
491 output_schema: Vec<LogicalType>,
492 ) -> Self {
493 Self {
494 store,
495 input,
496 edge_column,
497 output_schema,
498 viewing_epoch: None,
499 }
500 }
501
502 pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
504 self.viewing_epoch = Some(epoch);
505 self
506 }
507}
508
509impl Operator for DeleteEdgeOperator {
510 fn next(&mut self) -> OperatorResult {
511 let epoch = self
513 .viewing_epoch
514 .unwrap_or_else(|| self.store.current_epoch());
515
516 if let Some(chunk) = self.input.next()? {
517 let mut deleted_count = 0;
518
519 for row in chunk.selected_indices() {
520 let edge_val = chunk
521 .column(self.edge_column)
522 .and_then(|c| c.get_value(row))
523 .ok_or_else(|| {
524 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
525 })?;
526
527 let edge_id = match edge_val {
528 Value::Int64(id) => EdgeId(id as u64),
529 _ => {
530 return Err(OperatorError::TypeMismatch {
531 expected: "Int64 (edge ID)".to_string(),
532 found: format!("{edge_val:?}"),
533 });
534 }
535 };
536
537 if self.store.delete_edge_at_epoch(edge_id, epoch) {
539 deleted_count += 1;
540 }
541 }
542
543 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
545 if let Some(dst) = builder.column_mut(0) {
546 dst.push_value(Value::Int64(deleted_count));
547 }
548 builder.advance_row();
549
550 return Ok(Some(builder.finish()));
551 }
552 Ok(None)
553 }
554
555 fn reset(&mut self) {
556 self.input.reset();
557 }
558
559 fn name(&self) -> &'static str {
560 "DeleteEdge"
561 }
562}
563
564pub struct AddLabelOperator {
566 store: Arc<LpgStore>,
568 input: Box<dyn Operator>,
570 node_column: usize,
572 labels: Vec<String>,
574 output_schema: Vec<LogicalType>,
576}
577
578impl AddLabelOperator {
579 pub fn new(
581 store: Arc<LpgStore>,
582 input: Box<dyn Operator>,
583 node_column: usize,
584 labels: Vec<String>,
585 output_schema: Vec<LogicalType>,
586 ) -> Self {
587 Self {
588 store,
589 input,
590 node_column,
591 labels,
592 output_schema,
593 }
594 }
595}
596
597impl Operator for AddLabelOperator {
598 fn next(&mut self) -> OperatorResult {
599 if let Some(chunk) = self.input.next()? {
600 let mut updated_count = 0;
601
602 for row in chunk.selected_indices() {
603 let node_val = chunk
604 .column(self.node_column)
605 .and_then(|c| c.get_value(row))
606 .ok_or_else(|| {
607 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
608 })?;
609
610 let node_id = match node_val {
611 Value::Int64(id) => NodeId(id as u64),
612 _ => {
613 return Err(OperatorError::TypeMismatch {
614 expected: "Int64 (node ID)".to_string(),
615 found: format!("{node_val:?}"),
616 });
617 }
618 };
619
620 for label in &self.labels {
622 if self.store.add_label(node_id, label) {
623 updated_count += 1;
624 }
625 }
626 }
627
628 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
630 if let Some(dst) = builder.column_mut(0) {
631 dst.push_value(Value::Int64(updated_count));
632 }
633 builder.advance_row();
634
635 return Ok(Some(builder.finish()));
636 }
637 Ok(None)
638 }
639
640 fn reset(&mut self) {
641 self.input.reset();
642 }
643
644 fn name(&self) -> &'static str {
645 "AddLabel"
646 }
647}
648
649pub struct RemoveLabelOperator {
651 store: Arc<LpgStore>,
653 input: Box<dyn Operator>,
655 node_column: usize,
657 labels: Vec<String>,
659 output_schema: Vec<LogicalType>,
661}
662
663impl RemoveLabelOperator {
664 pub fn new(
666 store: Arc<LpgStore>,
667 input: Box<dyn Operator>,
668 node_column: usize,
669 labels: Vec<String>,
670 output_schema: Vec<LogicalType>,
671 ) -> Self {
672 Self {
673 store,
674 input,
675 node_column,
676 labels,
677 output_schema,
678 }
679 }
680}
681
682impl Operator for RemoveLabelOperator {
683 fn next(&mut self) -> OperatorResult {
684 if let Some(chunk) = self.input.next()? {
685 let mut updated_count = 0;
686
687 for row in chunk.selected_indices() {
688 let node_val = chunk
689 .column(self.node_column)
690 .and_then(|c| c.get_value(row))
691 .ok_or_else(|| {
692 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
693 })?;
694
695 let node_id = match node_val {
696 Value::Int64(id) => NodeId(id as u64),
697 _ => {
698 return Err(OperatorError::TypeMismatch {
699 expected: "Int64 (node ID)".to_string(),
700 found: format!("{node_val:?}"),
701 });
702 }
703 };
704
705 for label in &self.labels {
707 if self.store.remove_label(node_id, label) {
708 updated_count += 1;
709 }
710 }
711 }
712
713 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
715 if let Some(dst) = builder.column_mut(0) {
716 dst.push_value(Value::Int64(updated_count));
717 }
718 builder.advance_row();
719
720 return Ok(Some(builder.finish()));
721 }
722 Ok(None)
723 }
724
725 fn reset(&mut self) {
726 self.input.reset();
727 }
728
729 fn name(&self) -> &'static str {
730 "RemoveLabel"
731 }
732}
733
734pub struct SetPropertyOperator {
739 store: Arc<LpgStore>,
741 input: Box<dyn Operator>,
743 entity_column: usize,
745 is_edge: bool,
747 properties: Vec<(String, PropertySource)>,
749 output_schema: Vec<LogicalType>,
751}
752
753impl SetPropertyOperator {
754 pub fn new_for_node(
756 store: Arc<LpgStore>,
757 input: Box<dyn Operator>,
758 node_column: usize,
759 properties: Vec<(String, PropertySource)>,
760 output_schema: Vec<LogicalType>,
761 ) -> Self {
762 Self {
763 store,
764 input,
765 entity_column: node_column,
766 is_edge: false,
767 properties,
768 output_schema,
769 }
770 }
771
772 pub fn new_for_edge(
774 store: Arc<LpgStore>,
775 input: Box<dyn Operator>,
776 edge_column: usize,
777 properties: Vec<(String, PropertySource)>,
778 output_schema: Vec<LogicalType>,
779 ) -> Self {
780 Self {
781 store,
782 input,
783 entity_column: edge_column,
784 is_edge: true,
785 properties,
786 output_schema,
787 }
788 }
789}
790
791impl Operator for SetPropertyOperator {
792 fn next(&mut self) -> OperatorResult {
793 if let Some(chunk) = self.input.next()? {
794 let mut builder =
795 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
796
797 for row in chunk.selected_indices() {
798 let entity_val = chunk
799 .column(self.entity_column)
800 .and_then(|c| c.get_value(row))
801 .ok_or_else(|| {
802 OperatorError::ColumnNotFound(format!(
803 "entity column {}",
804 self.entity_column
805 ))
806 })?;
807
808 let entity_id = match entity_val {
809 Value::Int64(id) => id as u64,
810 _ => {
811 return Err(OperatorError::TypeMismatch {
812 expected: "Int64 (entity ID)".to_string(),
813 found: format!("{entity_val:?}"),
814 });
815 }
816 };
817
818 for (prop_name, source) in &self.properties {
820 let value = match source {
821 PropertySource::Column(col_idx) => chunk
822 .column(*col_idx)
823 .and_then(|c| c.get_value(row))
824 .unwrap_or(Value::Null),
825 PropertySource::Constant(v) => v.clone(),
826 };
827
828 if self.is_edge {
829 self.store
830 .set_edge_property(EdgeId(entity_id), prop_name, value);
831 } else {
832 self.store
833 .set_node_property(NodeId(entity_id), prop_name, value);
834 }
835 }
836
837 for col_idx in 0..chunk.column_count() {
839 if let (Some(src), Some(dst)) =
840 (chunk.column(col_idx), builder.column_mut(col_idx))
841 {
842 if let Some(val) = src.get_value(row) {
843 dst.push_value(val);
844 } else {
845 dst.push_value(Value::Null);
846 }
847 }
848 }
849
850 builder.advance_row();
851 }
852
853 return Ok(Some(builder.finish()));
854 }
855 Ok(None)
856 }
857
858 fn reset(&mut self) {
859 self.input.reset();
860 }
861
862 fn name(&self) -> &'static str {
863 "SetProperty"
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870 use crate::execution::DataChunk;
871 use crate::execution::chunk::DataChunkBuilder;
872
873 fn create_test_store() -> Arc<LpgStore> {
874 Arc::new(LpgStore::new())
875 }
876
877 #[test]
878 fn test_create_node_standalone() {
879 let store = create_test_store();
880
881 let mut op = CreateNodeOperator::new(
882 Arc::clone(&store),
883 None,
884 vec!["Person".to_string()],
885 vec![(
886 "name".to_string(),
887 PropertySource::Constant(Value::String("Alice".into())),
888 )],
889 vec![LogicalType::Int64],
890 0,
891 );
892
893 let chunk = op.next().unwrap().unwrap();
895 assert_eq!(chunk.row_count(), 1);
896
897 assert!(op.next().unwrap().is_none());
899
900 assert_eq!(store.node_count(), 1);
902 }
903
904 #[test]
905 fn test_create_edge() {
906 let store = create_test_store();
907
908 let node1 = store.create_node(&["Person"]);
910 let node2 = store.create_node(&["Person"]);
911
912 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
914 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
915 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
916 builder.advance_row();
917 let input_chunk = builder.finish();
918
919 struct MockInput {
921 chunk: Option<DataChunk>,
922 }
923 impl Operator for MockInput {
924 fn next(&mut self) -> OperatorResult {
925 Ok(self.chunk.take())
926 }
927 fn reset(&mut self) {}
928 fn name(&self) -> &'static str {
929 "MockInput"
930 }
931 }
932
933 let mut op = CreateEdgeOperator::new(
934 Arc::clone(&store),
935 Box::new(MockInput {
936 chunk: Some(input_chunk),
937 }),
938 0, 1, "KNOWS".to_string(),
941 vec![LogicalType::Int64, LogicalType::Int64],
942 );
943
944 let _chunk = op.next().unwrap().unwrap();
946
947 assert_eq!(store.edge_count(), 1);
949 }
950
951 #[test]
952 fn test_delete_node() {
953 let store = create_test_store();
954
955 let node_id = store.create_node(&["Person"]);
957 assert_eq!(store.node_count(), 1);
958
959 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
961 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
962 builder.advance_row();
963 let input_chunk = builder.finish();
964
965 struct MockInput {
966 chunk: Option<DataChunk>,
967 }
968 impl Operator for MockInput {
969 fn next(&mut self) -> OperatorResult {
970 Ok(self.chunk.take())
971 }
972 fn reset(&mut self) {}
973 fn name(&self) -> &'static str {
974 "MockInput"
975 }
976 }
977
978 let mut op = DeleteNodeOperator::new(
979 Arc::clone(&store),
980 Box::new(MockInput {
981 chunk: Some(input_chunk),
982 }),
983 0,
984 vec![LogicalType::Int64],
985 false,
986 );
987
988 let chunk = op.next().unwrap().unwrap();
990
991 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
993 assert_eq!(deleted, 1);
994 assert_eq!(store.node_count(), 0);
995 }
996
997 struct MockInput {
1000 chunk: Option<DataChunk>,
1001 }
1002
1003 impl MockInput {
1004 fn boxed(chunk: DataChunk) -> Box<Self> {
1005 Box::new(Self { chunk: Some(chunk) })
1006 }
1007 }
1008
1009 impl Operator for MockInput {
1010 fn next(&mut self) -> OperatorResult {
1011 Ok(self.chunk.take())
1012 }
1013 fn reset(&mut self) {}
1014 fn name(&self) -> &'static str {
1015 "MockInput"
1016 }
1017 }
1018
1019 #[test]
1022 fn test_delete_edge() {
1023 let store = create_test_store();
1024
1025 let n1 = store.create_node(&["Person"]);
1026 let n2 = store.create_node(&["Person"]);
1027 let eid = store.create_edge(n1, n2, "KNOWS");
1028 assert_eq!(store.edge_count(), 1);
1029
1030 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1031 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1032 builder.advance_row();
1033
1034 let mut op = DeleteEdgeOperator::new(
1035 Arc::clone(&store),
1036 MockInput::boxed(builder.finish()),
1037 0,
1038 vec![LogicalType::Int64],
1039 );
1040
1041 let chunk = op.next().unwrap().unwrap();
1042 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1043 assert_eq!(deleted, 1);
1044 assert_eq!(store.edge_count(), 0);
1045 }
1046
1047 #[test]
1048 fn test_delete_edge_no_input_returns_none() {
1049 let store = create_test_store();
1050
1051 struct EmptyInput;
1053 impl Operator for EmptyInput {
1054 fn next(&mut self) -> OperatorResult {
1055 Ok(None)
1056 }
1057 fn reset(&mut self) {}
1058 fn name(&self) -> &'static str {
1059 "EmptyInput"
1060 }
1061 }
1062
1063 let mut op = DeleteEdgeOperator::new(
1064 Arc::clone(&store),
1065 Box::new(EmptyInput),
1066 0,
1067 vec![LogicalType::Int64],
1068 );
1069
1070 assert!(op.next().unwrap().is_none());
1071 }
1072
1073 #[test]
1074 fn test_delete_multiple_edges() {
1075 let store = create_test_store();
1076
1077 let n1 = store.create_node(&["N"]);
1078 let n2 = store.create_node(&["N"]);
1079 let e1 = store.create_edge(n1, n2, "R");
1080 let e2 = store.create_edge(n2, n1, "S");
1081 assert_eq!(store.edge_count(), 2);
1082
1083 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1084 builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1085 builder.advance_row();
1086 builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1087 builder.advance_row();
1088
1089 let mut op = DeleteEdgeOperator::new(
1090 Arc::clone(&store),
1091 MockInput::boxed(builder.finish()),
1092 0,
1093 vec![LogicalType::Int64],
1094 );
1095
1096 let chunk = op.next().unwrap().unwrap();
1097 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1098 assert_eq!(deleted, 2);
1099 assert_eq!(store.edge_count(), 0);
1100 }
1101
1102 #[test]
1105 fn test_delete_node_detach() {
1106 let store = create_test_store();
1107
1108 let n1 = store.create_node(&["Person"]);
1109 let n2 = store.create_node(&["Person"]);
1110 store.create_edge(n1, n2, "KNOWS");
1111 store.create_edge(n2, n1, "FOLLOWS");
1112 assert_eq!(store.edge_count(), 2);
1113
1114 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1115 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1116 builder.advance_row();
1117
1118 let mut op = DeleteNodeOperator::new(
1119 Arc::clone(&store),
1120 MockInput::boxed(builder.finish()),
1121 0,
1122 vec![LogicalType::Int64],
1123 true, );
1125
1126 let chunk = op.next().unwrap().unwrap();
1127 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1128 assert_eq!(deleted, 1);
1129 assert_eq!(store.node_count(), 1);
1130 assert_eq!(store.edge_count(), 0); }
1132
1133 #[test]
1136 fn test_add_label() {
1137 let store = create_test_store();
1138
1139 let node = store.create_node(&["Person"]);
1140
1141 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1142 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1143 builder.advance_row();
1144
1145 let mut op = AddLabelOperator::new(
1146 Arc::clone(&store),
1147 MockInput::boxed(builder.finish()),
1148 0,
1149 vec!["Employee".to_string()],
1150 vec![LogicalType::Int64],
1151 );
1152
1153 let chunk = op.next().unwrap().unwrap();
1154 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1155 assert_eq!(updated, 1);
1156
1157 let node_data = store.get_node(node).unwrap();
1159 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1160 assert!(labels.contains(&"Person"));
1161 assert!(labels.contains(&"Employee"));
1162 }
1163
1164 #[test]
1165 fn test_add_multiple_labels() {
1166 let store = create_test_store();
1167
1168 let node = store.create_node(&["Base"]);
1169
1170 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1171 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1172 builder.advance_row();
1173
1174 let mut op = AddLabelOperator::new(
1175 Arc::clone(&store),
1176 MockInput::boxed(builder.finish()),
1177 0,
1178 vec!["LabelA".to_string(), "LabelB".to_string()],
1179 vec![LogicalType::Int64],
1180 );
1181
1182 let chunk = op.next().unwrap().unwrap();
1183 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1184 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1187 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1188 assert!(labels.contains(&"LabelA"));
1189 assert!(labels.contains(&"LabelB"));
1190 }
1191
1192 #[test]
1193 fn test_add_label_no_input_returns_none() {
1194 let store = create_test_store();
1195
1196 struct EmptyInput;
1197 impl Operator for EmptyInput {
1198 fn next(&mut self) -> OperatorResult {
1199 Ok(None)
1200 }
1201 fn reset(&mut self) {}
1202 fn name(&self) -> &'static str {
1203 "EmptyInput"
1204 }
1205 }
1206
1207 let mut op = AddLabelOperator::new(
1208 Arc::clone(&store),
1209 Box::new(EmptyInput),
1210 0,
1211 vec!["Foo".to_string()],
1212 vec![LogicalType::Int64],
1213 );
1214
1215 assert!(op.next().unwrap().is_none());
1216 }
1217
1218 #[test]
1221 fn test_remove_label() {
1222 let store = create_test_store();
1223
1224 let node = store.create_node(&["Person", "Employee"]);
1225
1226 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1227 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1228 builder.advance_row();
1229
1230 let mut op = RemoveLabelOperator::new(
1231 Arc::clone(&store),
1232 MockInput::boxed(builder.finish()),
1233 0,
1234 vec!["Employee".to_string()],
1235 vec![LogicalType::Int64],
1236 );
1237
1238 let chunk = op.next().unwrap().unwrap();
1239 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1240 assert_eq!(updated, 1);
1241
1242 let node_data = store.get_node(node).unwrap();
1244 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1245 assert!(labels.contains(&"Person"));
1246 assert!(!labels.contains(&"Employee"));
1247 }
1248
1249 #[test]
1250 fn test_remove_nonexistent_label() {
1251 let store = create_test_store();
1252
1253 let node = store.create_node(&["Person"]);
1254
1255 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1256 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1257 builder.advance_row();
1258
1259 let mut op = RemoveLabelOperator::new(
1260 Arc::clone(&store),
1261 MockInput::boxed(builder.finish()),
1262 0,
1263 vec!["NonExistent".to_string()],
1264 vec![LogicalType::Int64],
1265 );
1266
1267 let chunk = op.next().unwrap().unwrap();
1268 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1269 assert_eq!(updated, 0); }
1271
1272 #[test]
1275 fn test_set_node_property_constant() {
1276 let store = create_test_store();
1277
1278 let node = store.create_node(&["Person"]);
1279
1280 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1281 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1282 builder.advance_row();
1283
1284 let mut op = SetPropertyOperator::new_for_node(
1285 Arc::clone(&store),
1286 MockInput::boxed(builder.finish()),
1287 0,
1288 vec![(
1289 "name".to_string(),
1290 PropertySource::Constant(Value::String("Alice".into())),
1291 )],
1292 vec![LogicalType::Int64],
1293 );
1294
1295 let chunk = op.next().unwrap().unwrap();
1296 assert_eq!(chunk.row_count(), 1);
1297
1298 let node_data = store.get_node(node).unwrap();
1300 assert_eq!(
1301 node_data
1302 .properties
1303 .get(&grafeo_common::types::PropertyKey::new("name")),
1304 Some(&Value::String("Alice".into()))
1305 );
1306 }
1307
1308 #[test]
1309 fn test_set_node_property_from_column() {
1310 let store = create_test_store();
1311
1312 let node = store.create_node(&["Person"]);
1313
1314 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1316 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1317 builder
1318 .column_mut(1)
1319 .unwrap()
1320 .push_value(Value::String("Bob".into()));
1321 builder.advance_row();
1322
1323 let mut op = SetPropertyOperator::new_for_node(
1324 Arc::clone(&store),
1325 MockInput::boxed(builder.finish()),
1326 0,
1327 vec![("name".to_string(), PropertySource::Column(1))],
1328 vec![LogicalType::Int64, LogicalType::String],
1329 );
1330
1331 let chunk = op.next().unwrap().unwrap();
1332 assert_eq!(chunk.row_count(), 1);
1333
1334 let node_data = store.get_node(node).unwrap();
1335 assert_eq!(
1336 node_data
1337 .properties
1338 .get(&grafeo_common::types::PropertyKey::new("name")),
1339 Some(&Value::String("Bob".into()))
1340 );
1341 }
1342
1343 #[test]
1344 fn test_set_edge_property() {
1345 let store = create_test_store();
1346
1347 let n1 = store.create_node(&["N"]);
1348 let n2 = store.create_node(&["N"]);
1349 let eid = store.create_edge(n1, n2, "KNOWS");
1350
1351 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1352 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1353 builder.advance_row();
1354
1355 let mut op = SetPropertyOperator::new_for_edge(
1356 Arc::clone(&store),
1357 MockInput::boxed(builder.finish()),
1358 0,
1359 vec![(
1360 "weight".to_string(),
1361 PropertySource::Constant(Value::Float64(0.75)),
1362 )],
1363 vec![LogicalType::Int64],
1364 );
1365
1366 let chunk = op.next().unwrap().unwrap();
1367 assert_eq!(chunk.row_count(), 1);
1368
1369 let edge_data = store.get_edge(eid).unwrap();
1370 assert_eq!(
1371 edge_data
1372 .properties
1373 .get(&grafeo_common::types::PropertyKey::new("weight")),
1374 Some(&Value::Float64(0.75))
1375 );
1376 }
1377
1378 #[test]
1379 fn test_set_multiple_properties() {
1380 let store = create_test_store();
1381
1382 let node = store.create_node(&["Person"]);
1383
1384 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1385 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1386 builder.advance_row();
1387
1388 let mut op = SetPropertyOperator::new_for_node(
1389 Arc::clone(&store),
1390 MockInput::boxed(builder.finish()),
1391 0,
1392 vec![
1393 (
1394 "name".to_string(),
1395 PropertySource::Constant(Value::String("Alice".into())),
1396 ),
1397 (
1398 "age".to_string(),
1399 PropertySource::Constant(Value::Int64(30)),
1400 ),
1401 ],
1402 vec![LogicalType::Int64],
1403 );
1404
1405 op.next().unwrap().unwrap();
1406
1407 let node_data = store.get_node(node).unwrap();
1408 assert_eq!(
1409 node_data
1410 .properties
1411 .get(&grafeo_common::types::PropertyKey::new("name")),
1412 Some(&Value::String("Alice".into()))
1413 );
1414 assert_eq!(
1415 node_data
1416 .properties
1417 .get(&grafeo_common::types::PropertyKey::new("age")),
1418 Some(&Value::Int64(30))
1419 );
1420 }
1421
1422 #[test]
1423 fn test_set_property_no_input_returns_none() {
1424 let store = create_test_store();
1425
1426 struct EmptyInput;
1427 impl Operator for EmptyInput {
1428 fn next(&mut self) -> OperatorResult {
1429 Ok(None)
1430 }
1431 fn reset(&mut self) {}
1432 fn name(&self) -> &'static str {
1433 "EmptyInput"
1434 }
1435 }
1436
1437 let mut op = SetPropertyOperator::new_for_node(
1438 Arc::clone(&store),
1439 Box::new(EmptyInput),
1440 0,
1441 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1442 vec![LogicalType::Int64],
1443 );
1444
1445 assert!(op.next().unwrap().is_none());
1446 }
1447
1448 #[test]
1451 fn test_operator_names() {
1452 let store = create_test_store();
1453
1454 struct EmptyInput;
1455 impl Operator for EmptyInput {
1456 fn next(&mut self) -> OperatorResult {
1457 Ok(None)
1458 }
1459 fn reset(&mut self) {}
1460 fn name(&self) -> &'static str {
1461 "EmptyInput"
1462 }
1463 }
1464
1465 let op = DeleteEdgeOperator::new(
1466 Arc::clone(&store),
1467 Box::new(EmptyInput),
1468 0,
1469 vec![LogicalType::Int64],
1470 );
1471 assert_eq!(op.name(), "DeleteEdge");
1472
1473 let op = AddLabelOperator::new(
1474 Arc::clone(&store),
1475 Box::new(EmptyInput),
1476 0,
1477 vec!["L".to_string()],
1478 vec![LogicalType::Int64],
1479 );
1480 assert_eq!(op.name(), "AddLabel");
1481
1482 let op = RemoveLabelOperator::new(
1483 Arc::clone(&store),
1484 Box::new(EmptyInput),
1485 0,
1486 vec!["L".to_string()],
1487 vec![LogicalType::Int64],
1488 );
1489 assert_eq!(op.name(), "RemoveLabel");
1490
1491 let op = SetPropertyOperator::new_for_node(
1492 Arc::clone(&store),
1493 Box::new(EmptyInput),
1494 0,
1495 vec![],
1496 vec![LogicalType::Int64],
1497 );
1498 assert_eq!(op.name(), "SetProperty");
1499 }
1500}