1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17pub struct CreateNodeOperator {
22 store: Arc<LpgStore>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49}
50
51impl CreateNodeOperator {
52 pub fn new(
62 store: Arc<LpgStore>,
63 input: Option<Box<dyn Operator>>,
64 labels: Vec<String>,
65 properties: Vec<(String, PropertySource)>,
66 output_schema: Vec<LogicalType>,
67 output_column: usize,
68 ) -> Self {
69 Self {
70 store,
71 input,
72 labels,
73 properties,
74 output_schema,
75 output_column,
76 executed: false,
77 viewing_epoch: None,
78 tx_id: None,
79 }
80 }
81
82 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
84 self.viewing_epoch = Some(epoch);
85 self.tx_id = tx_id;
86 self
87 }
88}
89
90impl Operator for CreateNodeOperator {
91 fn next(&mut self) -> OperatorResult {
92 let epoch = self
94 .viewing_epoch
95 .unwrap_or_else(|| self.store.current_epoch());
96 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
97
98 if let Some(ref mut input) = self.input {
99 if let Some(chunk) = input.next()? {
101 let mut builder =
102 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
103
104 for row in chunk.selected_indices() {
105 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
107 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
108
109 for (prop_name, source) in &self.properties {
111 let value = match source {
112 PropertySource::Column(col_idx) => chunk
113 .column(*col_idx)
114 .and_then(|c| c.get_value(row))
115 .unwrap_or(Value::Null),
116 PropertySource::Constant(v) => v.clone(),
117 };
118 self.store.set_node_property(node_id, prop_name, value);
119 }
120
121 for col_idx in 0..chunk.column_count() {
123 if col_idx < self.output_column
124 && let (Some(src), Some(dst)) =
125 (chunk.column(col_idx), builder.column_mut(col_idx))
126 {
127 if let Some(val) = src.get_value(row) {
128 dst.push_value(val);
129 } else {
130 dst.push_value(Value::Null);
131 }
132 }
133 }
134
135 if let Some(dst) = builder.column_mut(self.output_column) {
137 dst.push_value(Value::Int64(node_id.0 as i64));
138 }
139
140 builder.advance_row();
141 }
142
143 return Ok(Some(builder.finish()));
144 }
145 Ok(None)
146 } else {
147 if self.executed {
149 return Ok(None);
150 }
151 self.executed = true;
152
153 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
155 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
156
157 for (prop_name, source) in &self.properties {
159 if let PropertySource::Constant(value) = source {
160 self.store
161 .set_node_property(node_id, prop_name, value.clone());
162 }
163 }
164
165 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
167 if let Some(dst) = builder.column_mut(self.output_column) {
168 dst.push_value(Value::Int64(node_id.0 as i64));
169 }
170 builder.advance_row();
171
172 Ok(Some(builder.finish()))
173 }
174 }
175
176 fn reset(&mut self) {
177 if let Some(ref mut input) = self.input {
178 input.reset();
179 }
180 self.executed = false;
181 }
182
183 fn name(&self) -> &'static str {
184 "CreateNode"
185 }
186}
187
188pub struct CreateEdgeOperator {
190 store: Arc<LpgStore>,
192 input: Box<dyn Operator>,
194 from_column: usize,
196 to_column: usize,
198 edge_type: String,
200 properties: Vec<(String, PropertySource)>,
202 output_schema: Vec<LogicalType>,
204 output_column: Option<usize>,
206 viewing_epoch: Option<EpochId>,
208 tx_id: Option<TxId>,
210}
211
212impl CreateEdgeOperator {
213 pub fn new(
220 store: Arc<LpgStore>,
221 input: Box<dyn Operator>,
222 from_column: usize,
223 to_column: usize,
224 edge_type: String,
225 output_schema: Vec<LogicalType>,
226 ) -> Self {
227 Self {
228 store,
229 input,
230 from_column,
231 to_column,
232 edge_type,
233 properties: Vec::new(),
234 output_schema,
235 output_column: None,
236 viewing_epoch: None,
237 tx_id: None,
238 }
239 }
240
241 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
243 self.properties = properties;
244 self
245 }
246
247 pub fn with_output_column(mut self, column: usize) -> Self {
249 self.output_column = Some(column);
250 self
251 }
252
253 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
255 self.viewing_epoch = Some(epoch);
256 self.tx_id = tx_id;
257 self
258 }
259}
260
261impl Operator for CreateEdgeOperator {
262 fn next(&mut self) -> OperatorResult {
263 let epoch = self
265 .viewing_epoch
266 .unwrap_or_else(|| self.store.current_epoch());
267 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
268
269 if let Some(chunk) = self.input.next()? {
270 let mut builder =
271 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
272
273 for row in chunk.selected_indices() {
274 let from_id = chunk
276 .column(self.from_column)
277 .and_then(|c| c.get_value(row))
278 .ok_or_else(|| {
279 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
280 })?;
281
282 let to_id = chunk
283 .column(self.to_column)
284 .and_then(|c| c.get_value(row))
285 .ok_or_else(|| {
286 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
287 })?;
288
289 let from_node_id = match from_id {
291 Value::Int64(id) => NodeId(id as u64),
292 _ => {
293 return Err(OperatorError::TypeMismatch {
294 expected: "Int64 (node ID)".to_string(),
295 found: format!("{from_id:?}"),
296 });
297 }
298 };
299
300 let to_node_id = match to_id {
301 Value::Int64(id) => NodeId(id as u64),
302 _ => {
303 return Err(OperatorError::TypeMismatch {
304 expected: "Int64 (node ID)".to_string(),
305 found: format!("{to_id:?}"),
306 });
307 }
308 };
309
310 let edge_id = self.store.create_edge_versioned(
312 from_node_id,
313 to_node_id,
314 &self.edge_type,
315 epoch,
316 tx,
317 );
318
319 for (prop_name, source) in &self.properties {
321 let value = match source {
322 PropertySource::Column(col_idx) => chunk
323 .column(*col_idx)
324 .and_then(|c| c.get_value(row))
325 .unwrap_or(Value::Null),
326 PropertySource::Constant(v) => v.clone(),
327 };
328 self.store.set_edge_property(edge_id, prop_name, value);
329 }
330
331 for col_idx in 0..chunk.column_count() {
333 if let (Some(src), Some(dst)) =
334 (chunk.column(col_idx), builder.column_mut(col_idx))
335 {
336 if let Some(val) = src.get_value(row) {
337 dst.push_value(val);
338 } else {
339 dst.push_value(Value::Null);
340 }
341 }
342 }
343
344 if let Some(out_col) = self.output_column
346 && let Some(dst) = builder.column_mut(out_col)
347 {
348 dst.push_value(Value::Int64(edge_id.0 as i64));
349 }
350
351 builder.advance_row();
352 }
353
354 return Ok(Some(builder.finish()));
355 }
356 Ok(None)
357 }
358
359 fn reset(&mut self) {
360 self.input.reset();
361 }
362
363 fn name(&self) -> &'static str {
364 "CreateEdge"
365 }
366}
367
368pub struct DeleteNodeOperator {
370 store: Arc<LpgStore>,
372 input: Box<dyn Operator>,
374 node_column: usize,
376 output_schema: Vec<LogicalType>,
378 detach: bool,
380 viewing_epoch: Option<EpochId>,
382 #[allow(dead_code)]
384 tx_id: Option<TxId>,
385}
386
387impl DeleteNodeOperator {
388 pub fn new(
390 store: Arc<LpgStore>,
391 input: Box<dyn Operator>,
392 node_column: usize,
393 output_schema: Vec<LogicalType>,
394 detach: bool,
395 ) -> Self {
396 Self {
397 store,
398 input,
399 node_column,
400 output_schema,
401 detach,
402 viewing_epoch: None,
403 tx_id: None,
404 }
405 }
406
407 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
409 self.viewing_epoch = Some(epoch);
410 self.tx_id = tx_id;
411 self
412 }
413}
414
415impl Operator for DeleteNodeOperator {
416 fn next(&mut self) -> OperatorResult {
417 let epoch = self
419 .viewing_epoch
420 .unwrap_or_else(|| self.store.current_epoch());
421
422 if let Some(chunk) = self.input.next()? {
423 let mut deleted_count = 0;
424
425 for row in chunk.selected_indices() {
426 let node_val = chunk
427 .column(self.node_column)
428 .and_then(|c| c.get_value(row))
429 .ok_or_else(|| {
430 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
431 })?;
432
433 let node_id = match node_val {
434 Value::Int64(id) => NodeId(id as u64),
435 _ => {
436 return Err(OperatorError::TypeMismatch {
437 expected: "Int64 (node ID)".to_string(),
438 found: format!("{node_val:?}"),
439 });
440 }
441 };
442
443 if self.detach {
444 self.store.delete_node_edges(node_id);
447 }
448
449 if self.store.delete_node_at_epoch(node_id, epoch) {
451 deleted_count += 1;
452 }
453 }
454
455 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
457 if let Some(dst) = builder.column_mut(0) {
458 dst.push_value(Value::Int64(deleted_count));
459 }
460 builder.advance_row();
461
462 return Ok(Some(builder.finish()));
463 }
464 Ok(None)
465 }
466
467 fn reset(&mut self) {
468 self.input.reset();
469 }
470
471 fn name(&self) -> &'static str {
472 "DeleteNode"
473 }
474}
475
476pub struct DeleteEdgeOperator {
478 store: Arc<LpgStore>,
480 input: Box<dyn Operator>,
482 edge_column: usize,
484 output_schema: Vec<LogicalType>,
486 viewing_epoch: Option<EpochId>,
488 #[allow(dead_code)]
490 tx_id: Option<TxId>,
491}
492
493impl DeleteEdgeOperator {
494 pub fn new(
496 store: Arc<LpgStore>,
497 input: Box<dyn Operator>,
498 edge_column: usize,
499 output_schema: Vec<LogicalType>,
500 ) -> Self {
501 Self {
502 store,
503 input,
504 edge_column,
505 output_schema,
506 viewing_epoch: None,
507 tx_id: None,
508 }
509 }
510
511 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
513 self.viewing_epoch = Some(epoch);
514 self.tx_id = tx_id;
515 self
516 }
517}
518
519impl Operator for DeleteEdgeOperator {
520 fn next(&mut self) -> OperatorResult {
521 let epoch = self
523 .viewing_epoch
524 .unwrap_or_else(|| self.store.current_epoch());
525
526 if let Some(chunk) = self.input.next()? {
527 let mut deleted_count = 0;
528
529 for row in chunk.selected_indices() {
530 let edge_val = chunk
531 .column(self.edge_column)
532 .and_then(|c| c.get_value(row))
533 .ok_or_else(|| {
534 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
535 })?;
536
537 let edge_id = match edge_val {
538 Value::Int64(id) => EdgeId(id as u64),
539 _ => {
540 return Err(OperatorError::TypeMismatch {
541 expected: "Int64 (edge ID)".to_string(),
542 found: format!("{edge_val:?}"),
543 });
544 }
545 };
546
547 if self.store.delete_edge_at_epoch(edge_id, epoch) {
549 deleted_count += 1;
550 }
551 }
552
553 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
555 if let Some(dst) = builder.column_mut(0) {
556 dst.push_value(Value::Int64(deleted_count));
557 }
558 builder.advance_row();
559
560 return Ok(Some(builder.finish()));
561 }
562 Ok(None)
563 }
564
565 fn reset(&mut self) {
566 self.input.reset();
567 }
568
569 fn name(&self) -> &'static str {
570 "DeleteEdge"
571 }
572}
573
574pub struct AddLabelOperator {
576 store: Arc<LpgStore>,
578 input: Box<dyn Operator>,
580 node_column: usize,
582 labels: Vec<String>,
584 output_schema: Vec<LogicalType>,
586}
587
588impl AddLabelOperator {
589 pub fn new(
591 store: Arc<LpgStore>,
592 input: Box<dyn Operator>,
593 node_column: usize,
594 labels: Vec<String>,
595 output_schema: Vec<LogicalType>,
596 ) -> Self {
597 Self {
598 store,
599 input,
600 node_column,
601 labels,
602 output_schema,
603 }
604 }
605}
606
607impl Operator for AddLabelOperator {
608 fn next(&mut self) -> OperatorResult {
609 if let Some(chunk) = self.input.next()? {
610 let mut updated_count = 0;
611
612 for row in chunk.selected_indices() {
613 let node_val = chunk
614 .column(self.node_column)
615 .and_then(|c| c.get_value(row))
616 .ok_or_else(|| {
617 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
618 })?;
619
620 let node_id = match node_val {
621 Value::Int64(id) => NodeId(id as u64),
622 _ => {
623 return Err(OperatorError::TypeMismatch {
624 expected: "Int64 (node ID)".to_string(),
625 found: format!("{node_val:?}"),
626 });
627 }
628 };
629
630 for label in &self.labels {
632 if self.store.add_label(node_id, label) {
633 updated_count += 1;
634 }
635 }
636 }
637
638 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
640 if let Some(dst) = builder.column_mut(0) {
641 dst.push_value(Value::Int64(updated_count));
642 }
643 builder.advance_row();
644
645 return Ok(Some(builder.finish()));
646 }
647 Ok(None)
648 }
649
650 fn reset(&mut self) {
651 self.input.reset();
652 }
653
654 fn name(&self) -> &'static str {
655 "AddLabel"
656 }
657}
658
659pub struct RemoveLabelOperator {
661 store: Arc<LpgStore>,
663 input: Box<dyn Operator>,
665 node_column: usize,
667 labels: Vec<String>,
669 output_schema: Vec<LogicalType>,
671}
672
673impl RemoveLabelOperator {
674 pub fn new(
676 store: Arc<LpgStore>,
677 input: Box<dyn Operator>,
678 node_column: usize,
679 labels: Vec<String>,
680 output_schema: Vec<LogicalType>,
681 ) -> Self {
682 Self {
683 store,
684 input,
685 node_column,
686 labels,
687 output_schema,
688 }
689 }
690}
691
692impl Operator for RemoveLabelOperator {
693 fn next(&mut self) -> OperatorResult {
694 if let Some(chunk) = self.input.next()? {
695 let mut updated_count = 0;
696
697 for row in chunk.selected_indices() {
698 let node_val = chunk
699 .column(self.node_column)
700 .and_then(|c| c.get_value(row))
701 .ok_or_else(|| {
702 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
703 })?;
704
705 let node_id = match node_val {
706 Value::Int64(id) => NodeId(id as u64),
707 _ => {
708 return Err(OperatorError::TypeMismatch {
709 expected: "Int64 (node ID)".to_string(),
710 found: format!("{node_val:?}"),
711 });
712 }
713 };
714
715 for label in &self.labels {
717 if self.store.remove_label(node_id, label) {
718 updated_count += 1;
719 }
720 }
721 }
722
723 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725 if let Some(dst) = builder.column_mut(0) {
726 dst.push_value(Value::Int64(updated_count));
727 }
728 builder.advance_row();
729
730 return Ok(Some(builder.finish()));
731 }
732 Ok(None)
733 }
734
735 fn reset(&mut self) {
736 self.input.reset();
737 }
738
739 fn name(&self) -> &'static str {
740 "RemoveLabel"
741 }
742}
743
744pub struct SetPropertyOperator {
749 store: Arc<LpgStore>,
751 input: Box<dyn Operator>,
753 entity_column: usize,
755 is_edge: bool,
757 properties: Vec<(String, PropertySource)>,
759 output_schema: Vec<LogicalType>,
761}
762
763impl SetPropertyOperator {
764 pub fn new_for_node(
766 store: Arc<LpgStore>,
767 input: Box<dyn Operator>,
768 node_column: usize,
769 properties: Vec<(String, PropertySource)>,
770 output_schema: Vec<LogicalType>,
771 ) -> Self {
772 Self {
773 store,
774 input,
775 entity_column: node_column,
776 is_edge: false,
777 properties,
778 output_schema,
779 }
780 }
781
782 pub fn new_for_edge(
784 store: Arc<LpgStore>,
785 input: Box<dyn Operator>,
786 edge_column: usize,
787 properties: Vec<(String, PropertySource)>,
788 output_schema: Vec<LogicalType>,
789 ) -> Self {
790 Self {
791 store,
792 input,
793 entity_column: edge_column,
794 is_edge: true,
795 properties,
796 output_schema,
797 }
798 }
799}
800
801impl Operator for SetPropertyOperator {
802 fn next(&mut self) -> OperatorResult {
803 if let Some(chunk) = self.input.next()? {
804 let mut builder =
805 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
806
807 for row in chunk.selected_indices() {
808 let entity_val = chunk
809 .column(self.entity_column)
810 .and_then(|c| c.get_value(row))
811 .ok_or_else(|| {
812 OperatorError::ColumnNotFound(format!(
813 "entity column {}",
814 self.entity_column
815 ))
816 })?;
817
818 let entity_id = match entity_val {
819 Value::Int64(id) => id as u64,
820 _ => {
821 return Err(OperatorError::TypeMismatch {
822 expected: "Int64 (entity ID)".to_string(),
823 found: format!("{entity_val:?}"),
824 });
825 }
826 };
827
828 for (prop_name, source) in &self.properties {
830 let value = match source {
831 PropertySource::Column(col_idx) => chunk
832 .column(*col_idx)
833 .and_then(|c| c.get_value(row))
834 .unwrap_or(Value::Null),
835 PropertySource::Constant(v) => v.clone(),
836 };
837
838 if self.is_edge {
839 self.store
840 .set_edge_property(EdgeId(entity_id), prop_name, value);
841 } else {
842 self.store
843 .set_node_property(NodeId(entity_id), prop_name, value);
844 }
845 }
846
847 for col_idx in 0..chunk.column_count() {
849 if let (Some(src), Some(dst)) =
850 (chunk.column(col_idx), builder.column_mut(col_idx))
851 {
852 if let Some(val) = src.get_value(row) {
853 dst.push_value(val);
854 } else {
855 dst.push_value(Value::Null);
856 }
857 }
858 }
859
860 builder.advance_row();
861 }
862
863 return Ok(Some(builder.finish()));
864 }
865 Ok(None)
866 }
867
868 fn reset(&mut self) {
869 self.input.reset();
870 }
871
872 fn name(&self) -> &'static str {
873 "SetProperty"
874 }
875}
876
877#[cfg(test)]
878mod tests {
879 use super::*;
880 use crate::execution::DataChunk;
881 use crate::execution::chunk::DataChunkBuilder;
882
883 fn create_test_store() -> Arc<LpgStore> {
884 Arc::new(LpgStore::new())
885 }
886
887 #[test]
888 fn test_create_node_standalone() {
889 let store = create_test_store();
890
891 let mut op = CreateNodeOperator::new(
892 Arc::clone(&store),
893 None,
894 vec!["Person".to_string()],
895 vec![(
896 "name".to_string(),
897 PropertySource::Constant(Value::String("Alice".into())),
898 )],
899 vec![LogicalType::Int64],
900 0,
901 );
902
903 let chunk = op.next().unwrap().unwrap();
905 assert_eq!(chunk.row_count(), 1);
906
907 assert!(op.next().unwrap().is_none());
909
910 assert_eq!(store.node_count(), 1);
912 }
913
914 #[test]
915 fn test_create_edge() {
916 let store = create_test_store();
917
918 let node1 = store.create_node(&["Person"]);
920 let node2 = store.create_node(&["Person"]);
921
922 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
924 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
925 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
926 builder.advance_row();
927 let input_chunk = builder.finish();
928
929 struct MockInput {
931 chunk: Option<DataChunk>,
932 }
933 impl Operator for MockInput {
934 fn next(&mut self) -> OperatorResult {
935 Ok(self.chunk.take())
936 }
937 fn reset(&mut self) {}
938 fn name(&self) -> &'static str {
939 "MockInput"
940 }
941 }
942
943 let mut op = CreateEdgeOperator::new(
944 Arc::clone(&store),
945 Box::new(MockInput {
946 chunk: Some(input_chunk),
947 }),
948 0, 1, "KNOWS".to_string(),
951 vec![LogicalType::Int64, LogicalType::Int64],
952 );
953
954 let _chunk = op.next().unwrap().unwrap();
956
957 assert_eq!(store.edge_count(), 1);
959 }
960
961 #[test]
962 fn test_delete_node() {
963 let store = create_test_store();
964
965 let node_id = store.create_node(&["Person"]);
967 assert_eq!(store.node_count(), 1);
968
969 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
971 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
972 builder.advance_row();
973 let input_chunk = builder.finish();
974
975 struct MockInput {
976 chunk: Option<DataChunk>,
977 }
978 impl Operator for MockInput {
979 fn next(&mut self) -> OperatorResult {
980 Ok(self.chunk.take())
981 }
982 fn reset(&mut self) {}
983 fn name(&self) -> &'static str {
984 "MockInput"
985 }
986 }
987
988 let mut op = DeleteNodeOperator::new(
989 Arc::clone(&store),
990 Box::new(MockInput {
991 chunk: Some(input_chunk),
992 }),
993 0,
994 vec![LogicalType::Int64],
995 false,
996 );
997
998 let chunk = op.next().unwrap().unwrap();
1000
1001 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1003 assert_eq!(deleted, 1);
1004 assert_eq!(store.node_count(), 0);
1005 }
1006
1007 struct MockInput {
1010 chunk: Option<DataChunk>,
1011 }
1012
1013 impl MockInput {
1014 fn boxed(chunk: DataChunk) -> Box<Self> {
1015 Box::new(Self { chunk: Some(chunk) })
1016 }
1017 }
1018
1019 impl Operator for MockInput {
1020 fn next(&mut self) -> OperatorResult {
1021 Ok(self.chunk.take())
1022 }
1023 fn reset(&mut self) {}
1024 fn name(&self) -> &'static str {
1025 "MockInput"
1026 }
1027 }
1028
1029 #[test]
1032 fn test_delete_edge() {
1033 let store = create_test_store();
1034
1035 let n1 = store.create_node(&["Person"]);
1036 let n2 = store.create_node(&["Person"]);
1037 let eid = store.create_edge(n1, n2, "KNOWS");
1038 assert_eq!(store.edge_count(), 1);
1039
1040 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1041 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1042 builder.advance_row();
1043
1044 let mut op = DeleteEdgeOperator::new(
1045 Arc::clone(&store),
1046 MockInput::boxed(builder.finish()),
1047 0,
1048 vec![LogicalType::Int64],
1049 );
1050
1051 let chunk = op.next().unwrap().unwrap();
1052 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1053 assert_eq!(deleted, 1);
1054 assert_eq!(store.edge_count(), 0);
1055 }
1056
1057 #[test]
1058 fn test_delete_edge_no_input_returns_none() {
1059 let store = create_test_store();
1060
1061 struct EmptyInput;
1063 impl Operator for EmptyInput {
1064 fn next(&mut self) -> OperatorResult {
1065 Ok(None)
1066 }
1067 fn reset(&mut self) {}
1068 fn name(&self) -> &'static str {
1069 "EmptyInput"
1070 }
1071 }
1072
1073 let mut op = DeleteEdgeOperator::new(
1074 Arc::clone(&store),
1075 Box::new(EmptyInput),
1076 0,
1077 vec![LogicalType::Int64],
1078 );
1079
1080 assert!(op.next().unwrap().is_none());
1081 }
1082
1083 #[test]
1084 fn test_delete_multiple_edges() {
1085 let store = create_test_store();
1086
1087 let n1 = store.create_node(&["N"]);
1088 let n2 = store.create_node(&["N"]);
1089 let e1 = store.create_edge(n1, n2, "R");
1090 let e2 = store.create_edge(n2, n1, "S");
1091 assert_eq!(store.edge_count(), 2);
1092
1093 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1094 builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1095 builder.advance_row();
1096 builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1097 builder.advance_row();
1098
1099 let mut op = DeleteEdgeOperator::new(
1100 Arc::clone(&store),
1101 MockInput::boxed(builder.finish()),
1102 0,
1103 vec![LogicalType::Int64],
1104 );
1105
1106 let chunk = op.next().unwrap().unwrap();
1107 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1108 assert_eq!(deleted, 2);
1109 assert_eq!(store.edge_count(), 0);
1110 }
1111
1112 #[test]
1115 fn test_delete_node_detach() {
1116 let store = create_test_store();
1117
1118 let n1 = store.create_node(&["Person"]);
1119 let n2 = store.create_node(&["Person"]);
1120 store.create_edge(n1, n2, "KNOWS");
1121 store.create_edge(n2, n1, "FOLLOWS");
1122 assert_eq!(store.edge_count(), 2);
1123
1124 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1125 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1126 builder.advance_row();
1127
1128 let mut op = DeleteNodeOperator::new(
1129 Arc::clone(&store),
1130 MockInput::boxed(builder.finish()),
1131 0,
1132 vec![LogicalType::Int64],
1133 true, );
1135
1136 let chunk = op.next().unwrap().unwrap();
1137 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1138 assert_eq!(deleted, 1);
1139 assert_eq!(store.node_count(), 1);
1140 assert_eq!(store.edge_count(), 0); }
1142
1143 #[test]
1146 fn test_add_label() {
1147 let store = create_test_store();
1148
1149 let node = store.create_node(&["Person"]);
1150
1151 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1152 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1153 builder.advance_row();
1154
1155 let mut op = AddLabelOperator::new(
1156 Arc::clone(&store),
1157 MockInput::boxed(builder.finish()),
1158 0,
1159 vec!["Employee".to_string()],
1160 vec![LogicalType::Int64],
1161 );
1162
1163 let chunk = op.next().unwrap().unwrap();
1164 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1165 assert_eq!(updated, 1);
1166
1167 let node_data = store.get_node(node).unwrap();
1169 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1170 assert!(labels.contains(&"Person"));
1171 assert!(labels.contains(&"Employee"));
1172 }
1173
1174 #[test]
1175 fn test_add_multiple_labels() {
1176 let store = create_test_store();
1177
1178 let node = store.create_node(&["Base"]);
1179
1180 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1181 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1182 builder.advance_row();
1183
1184 let mut op = AddLabelOperator::new(
1185 Arc::clone(&store),
1186 MockInput::boxed(builder.finish()),
1187 0,
1188 vec!["LabelA".to_string(), "LabelB".to_string()],
1189 vec![LogicalType::Int64],
1190 );
1191
1192 let chunk = op.next().unwrap().unwrap();
1193 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1194 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1197 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1198 assert!(labels.contains(&"LabelA"));
1199 assert!(labels.contains(&"LabelB"));
1200 }
1201
1202 #[test]
1203 fn test_add_label_no_input_returns_none() {
1204 let store = create_test_store();
1205
1206 struct EmptyInput;
1207 impl Operator for EmptyInput {
1208 fn next(&mut self) -> OperatorResult {
1209 Ok(None)
1210 }
1211 fn reset(&mut self) {}
1212 fn name(&self) -> &'static str {
1213 "EmptyInput"
1214 }
1215 }
1216
1217 let mut op = AddLabelOperator::new(
1218 Arc::clone(&store),
1219 Box::new(EmptyInput),
1220 0,
1221 vec!["Foo".to_string()],
1222 vec![LogicalType::Int64],
1223 );
1224
1225 assert!(op.next().unwrap().is_none());
1226 }
1227
1228 #[test]
1231 fn test_remove_label() {
1232 let store = create_test_store();
1233
1234 let node = store.create_node(&["Person", "Employee"]);
1235
1236 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1237 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1238 builder.advance_row();
1239
1240 let mut op = RemoveLabelOperator::new(
1241 Arc::clone(&store),
1242 MockInput::boxed(builder.finish()),
1243 0,
1244 vec!["Employee".to_string()],
1245 vec![LogicalType::Int64],
1246 );
1247
1248 let chunk = op.next().unwrap().unwrap();
1249 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1250 assert_eq!(updated, 1);
1251
1252 let node_data = store.get_node(node).unwrap();
1254 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1255 assert!(labels.contains(&"Person"));
1256 assert!(!labels.contains(&"Employee"));
1257 }
1258
1259 #[test]
1260 fn test_remove_nonexistent_label() {
1261 let store = create_test_store();
1262
1263 let node = store.create_node(&["Person"]);
1264
1265 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1266 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1267 builder.advance_row();
1268
1269 let mut op = RemoveLabelOperator::new(
1270 Arc::clone(&store),
1271 MockInput::boxed(builder.finish()),
1272 0,
1273 vec!["NonExistent".to_string()],
1274 vec![LogicalType::Int64],
1275 );
1276
1277 let chunk = op.next().unwrap().unwrap();
1278 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1279 assert_eq!(updated, 0); }
1281
1282 #[test]
1285 fn test_set_node_property_constant() {
1286 let store = create_test_store();
1287
1288 let node = store.create_node(&["Person"]);
1289
1290 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1291 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1292 builder.advance_row();
1293
1294 let mut op = SetPropertyOperator::new_for_node(
1295 Arc::clone(&store),
1296 MockInput::boxed(builder.finish()),
1297 0,
1298 vec![(
1299 "name".to_string(),
1300 PropertySource::Constant(Value::String("Alice".into())),
1301 )],
1302 vec![LogicalType::Int64],
1303 );
1304
1305 let chunk = op.next().unwrap().unwrap();
1306 assert_eq!(chunk.row_count(), 1);
1307
1308 let node_data = store.get_node(node).unwrap();
1310 assert_eq!(
1311 node_data
1312 .properties
1313 .get(&grafeo_common::types::PropertyKey::new("name")),
1314 Some(&Value::String("Alice".into()))
1315 );
1316 }
1317
1318 #[test]
1319 fn test_set_node_property_from_column() {
1320 let store = create_test_store();
1321
1322 let node = store.create_node(&["Person"]);
1323
1324 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1326 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1327 builder
1328 .column_mut(1)
1329 .unwrap()
1330 .push_value(Value::String("Bob".into()));
1331 builder.advance_row();
1332
1333 let mut op = SetPropertyOperator::new_for_node(
1334 Arc::clone(&store),
1335 MockInput::boxed(builder.finish()),
1336 0,
1337 vec![("name".to_string(), PropertySource::Column(1))],
1338 vec![LogicalType::Int64, LogicalType::String],
1339 );
1340
1341 let chunk = op.next().unwrap().unwrap();
1342 assert_eq!(chunk.row_count(), 1);
1343
1344 let node_data = store.get_node(node).unwrap();
1345 assert_eq!(
1346 node_data
1347 .properties
1348 .get(&grafeo_common::types::PropertyKey::new("name")),
1349 Some(&Value::String("Bob".into()))
1350 );
1351 }
1352
1353 #[test]
1354 fn test_set_edge_property() {
1355 let store = create_test_store();
1356
1357 let n1 = store.create_node(&["N"]);
1358 let n2 = store.create_node(&["N"]);
1359 let eid = store.create_edge(n1, n2, "KNOWS");
1360
1361 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1362 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1363 builder.advance_row();
1364
1365 let mut op = SetPropertyOperator::new_for_edge(
1366 Arc::clone(&store),
1367 MockInput::boxed(builder.finish()),
1368 0,
1369 vec![(
1370 "weight".to_string(),
1371 PropertySource::Constant(Value::Float64(0.75)),
1372 )],
1373 vec![LogicalType::Int64],
1374 );
1375
1376 let chunk = op.next().unwrap().unwrap();
1377 assert_eq!(chunk.row_count(), 1);
1378
1379 let edge_data = store.get_edge(eid).unwrap();
1380 assert_eq!(
1381 edge_data
1382 .properties
1383 .get(&grafeo_common::types::PropertyKey::new("weight")),
1384 Some(&Value::Float64(0.75))
1385 );
1386 }
1387
1388 #[test]
1389 fn test_set_multiple_properties() {
1390 let store = create_test_store();
1391
1392 let node = store.create_node(&["Person"]);
1393
1394 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1395 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1396 builder.advance_row();
1397
1398 let mut op = SetPropertyOperator::new_for_node(
1399 Arc::clone(&store),
1400 MockInput::boxed(builder.finish()),
1401 0,
1402 vec![
1403 (
1404 "name".to_string(),
1405 PropertySource::Constant(Value::String("Alice".into())),
1406 ),
1407 (
1408 "age".to_string(),
1409 PropertySource::Constant(Value::Int64(30)),
1410 ),
1411 ],
1412 vec![LogicalType::Int64],
1413 );
1414
1415 op.next().unwrap().unwrap();
1416
1417 let node_data = store.get_node(node).unwrap();
1418 assert_eq!(
1419 node_data
1420 .properties
1421 .get(&grafeo_common::types::PropertyKey::new("name")),
1422 Some(&Value::String("Alice".into()))
1423 );
1424 assert_eq!(
1425 node_data
1426 .properties
1427 .get(&grafeo_common::types::PropertyKey::new("age")),
1428 Some(&Value::Int64(30))
1429 );
1430 }
1431
1432 #[test]
1433 fn test_set_property_no_input_returns_none() {
1434 let store = create_test_store();
1435
1436 struct EmptyInput;
1437 impl Operator for EmptyInput {
1438 fn next(&mut self) -> OperatorResult {
1439 Ok(None)
1440 }
1441 fn reset(&mut self) {}
1442 fn name(&self) -> &'static str {
1443 "EmptyInput"
1444 }
1445 }
1446
1447 let mut op = SetPropertyOperator::new_for_node(
1448 Arc::clone(&store),
1449 Box::new(EmptyInput),
1450 0,
1451 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1452 vec![LogicalType::Int64],
1453 );
1454
1455 assert!(op.next().unwrap().is_none());
1456 }
1457
1458 #[test]
1461 fn test_operator_names() {
1462 let store = create_test_store();
1463
1464 struct EmptyInput;
1465 impl Operator for EmptyInput {
1466 fn next(&mut self) -> OperatorResult {
1467 Ok(None)
1468 }
1469 fn reset(&mut self) {}
1470 fn name(&self) -> &'static str {
1471 "EmptyInput"
1472 }
1473 }
1474
1475 let op = DeleteEdgeOperator::new(
1476 Arc::clone(&store),
1477 Box::new(EmptyInput),
1478 0,
1479 vec![LogicalType::Int64],
1480 );
1481 assert_eq!(op.name(), "DeleteEdge");
1482
1483 let op = AddLabelOperator::new(
1484 Arc::clone(&store),
1485 Box::new(EmptyInput),
1486 0,
1487 vec!["L".to_string()],
1488 vec![LogicalType::Int64],
1489 );
1490 assert_eq!(op.name(), "AddLabel");
1491
1492 let op = RemoveLabelOperator::new(
1493 Arc::clone(&store),
1494 Box::new(EmptyInput),
1495 0,
1496 vec!["L".to_string()],
1497 vec![LogicalType::Int64],
1498 );
1499 assert_eq!(op.name(), "RemoveLabel");
1500
1501 let op = SetPropertyOperator::new_for_node(
1502 Arc::clone(&store),
1503 Box::new(EmptyInput),
1504 0,
1505 vec![],
1506 vec![LogicalType::Int64],
1507 );
1508 assert_eq!(op.name(), "SetProperty");
1509 }
1510}