1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::lpg::LpgStore;
16
17pub struct CreateNodeOperator {
22 store: Arc<LpgStore>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49 PropertyAccess {
51 column: usize,
53 property: String,
55 },
56}
57
58impl PropertySource {
59 pub fn resolve(
61 &self,
62 chunk: &crate::execution::chunk::DataChunk,
63 row: usize,
64 store: &LpgStore,
65 ) -> Value {
66 match self {
67 PropertySource::Column(col_idx) => chunk
68 .column(*col_idx)
69 .and_then(|c| c.get_value(row))
70 .unwrap_or(Value::Null),
71 PropertySource::Constant(v) => v.clone(),
72 PropertySource::PropertyAccess { column, property } => {
73 let Some(col) = chunk.column(*column) else {
74 return Value::Null;
75 };
76 if let Some(node_id) = col.get_node_id(row) {
78 store
79 .get_node(node_id)
80 .and_then(|node| node.get_property(property).cloned())
81 .unwrap_or(Value::Null)
82 } else if let Some(edge_id) = col.get_edge_id(row) {
83 store
84 .get_edge(edge_id)
85 .and_then(|edge| edge.get_property(property).cloned())
86 .unwrap_or(Value::Null)
87 } else if let Some(Value::Map(map)) = col.get_value(row) {
88 let key = PropertyKey::new(property);
89 map.get(&key).cloned().unwrap_or(Value::Null)
90 } else {
91 Value::Null
92 }
93 }
94 }
95 }
96}
97
98impl CreateNodeOperator {
99 pub fn new(
109 store: Arc<LpgStore>,
110 input: Option<Box<dyn Operator>>,
111 labels: Vec<String>,
112 properties: Vec<(String, PropertySource)>,
113 output_schema: Vec<LogicalType>,
114 output_column: usize,
115 ) -> Self {
116 Self {
117 store,
118 input,
119 labels,
120 properties,
121 output_schema,
122 output_column,
123 executed: false,
124 viewing_epoch: None,
125 tx_id: None,
126 }
127 }
128
129 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
131 self.viewing_epoch = Some(epoch);
132 self.tx_id = tx_id;
133 self
134 }
135}
136
137impl Operator for CreateNodeOperator {
138 fn next(&mut self) -> OperatorResult {
139 let epoch = self
141 .viewing_epoch
142 .unwrap_or_else(|| self.store.current_epoch());
143 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
144
145 if let Some(ref mut input) = self.input {
146 if let Some(chunk) = input.next()? {
148 let mut builder =
149 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
150
151 for row in chunk.selected_indices() {
152 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
154 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
155
156 for (prop_name, source) in &self.properties {
158 let value = source.resolve(&chunk, row, &self.store);
159 self.store.set_node_property(node_id, prop_name, value);
160 }
161
162 for col_idx in 0..chunk.column_count() {
164 if col_idx < self.output_column
165 && let (Some(src), Some(dst)) =
166 (chunk.column(col_idx), builder.column_mut(col_idx))
167 {
168 if let Some(val) = src.get_value(row) {
169 dst.push_value(val);
170 } else {
171 dst.push_value(Value::Null);
172 }
173 }
174 }
175
176 if let Some(dst) = builder.column_mut(self.output_column) {
178 dst.push_value(Value::Int64(node_id.0 as i64));
179 }
180
181 builder.advance_row();
182 }
183
184 return Ok(Some(builder.finish()));
185 }
186 Ok(None)
187 } else {
188 if self.executed {
190 return Ok(None);
191 }
192 self.executed = true;
193
194 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
196 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
197
198 for (prop_name, source) in &self.properties {
200 if let PropertySource::Constant(value) = source {
201 self.store
202 .set_node_property(node_id, prop_name, value.clone());
203 }
204 }
205
206 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
208 if let Some(dst) = builder.column_mut(self.output_column) {
209 dst.push_value(Value::Int64(node_id.0 as i64));
210 }
211 builder.advance_row();
212
213 Ok(Some(builder.finish()))
214 }
215 }
216
217 fn reset(&mut self) {
218 if let Some(ref mut input) = self.input {
219 input.reset();
220 }
221 self.executed = false;
222 }
223
224 fn name(&self) -> &'static str {
225 "CreateNode"
226 }
227}
228
229pub struct CreateEdgeOperator {
231 store: Arc<LpgStore>,
233 input: Box<dyn Operator>,
235 from_column: usize,
237 to_column: usize,
239 edge_type: String,
241 properties: Vec<(String, PropertySource)>,
243 output_schema: Vec<LogicalType>,
245 output_column: Option<usize>,
247 viewing_epoch: Option<EpochId>,
249 tx_id: Option<TxId>,
251}
252
253impl CreateEdgeOperator {
254 pub fn new(
261 store: Arc<LpgStore>,
262 input: Box<dyn Operator>,
263 from_column: usize,
264 to_column: usize,
265 edge_type: String,
266 output_schema: Vec<LogicalType>,
267 ) -> Self {
268 Self {
269 store,
270 input,
271 from_column,
272 to_column,
273 edge_type,
274 properties: Vec::new(),
275 output_schema,
276 output_column: None,
277 viewing_epoch: None,
278 tx_id: None,
279 }
280 }
281
282 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
284 self.properties = properties;
285 self
286 }
287
288 pub fn with_output_column(mut self, column: usize) -> Self {
290 self.output_column = Some(column);
291 self
292 }
293
294 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
296 self.viewing_epoch = Some(epoch);
297 self.tx_id = tx_id;
298 self
299 }
300}
301
302impl Operator for CreateEdgeOperator {
303 fn next(&mut self) -> OperatorResult {
304 let epoch = self
306 .viewing_epoch
307 .unwrap_or_else(|| self.store.current_epoch());
308 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
309
310 if let Some(chunk) = self.input.next()? {
311 let mut builder =
312 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
313
314 for row in chunk.selected_indices() {
315 let from_id = chunk
317 .column(self.from_column)
318 .and_then(|c| c.get_value(row))
319 .ok_or_else(|| {
320 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
321 })?;
322
323 let to_id = chunk
324 .column(self.to_column)
325 .and_then(|c| c.get_value(row))
326 .ok_or_else(|| {
327 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
328 })?;
329
330 let from_node_id = match from_id {
332 Value::Int64(id) => NodeId(id as u64),
333 _ => {
334 return Err(OperatorError::TypeMismatch {
335 expected: "Int64 (node ID)".to_string(),
336 found: format!("{from_id:?}"),
337 });
338 }
339 };
340
341 let to_node_id = match to_id {
342 Value::Int64(id) => NodeId(id as u64),
343 _ => {
344 return Err(OperatorError::TypeMismatch {
345 expected: "Int64 (node ID)".to_string(),
346 found: format!("{to_id:?}"),
347 });
348 }
349 };
350
351 let edge_id = self.store.create_edge_versioned(
353 from_node_id,
354 to_node_id,
355 &self.edge_type,
356 epoch,
357 tx,
358 );
359
360 for (prop_name, source) in &self.properties {
362 let value = source.resolve(&chunk, row, &self.store);
363 self.store.set_edge_property(edge_id, prop_name, value);
364 }
365
366 for col_idx in 0..chunk.column_count() {
368 if let (Some(src), Some(dst)) =
369 (chunk.column(col_idx), builder.column_mut(col_idx))
370 {
371 if let Some(val) = src.get_value(row) {
372 dst.push_value(val);
373 } else {
374 dst.push_value(Value::Null);
375 }
376 }
377 }
378
379 if let Some(out_col) = self.output_column
381 && let Some(dst) = builder.column_mut(out_col)
382 {
383 dst.push_value(Value::Int64(edge_id.0 as i64));
384 }
385
386 builder.advance_row();
387 }
388
389 return Ok(Some(builder.finish()));
390 }
391 Ok(None)
392 }
393
394 fn reset(&mut self) {
395 self.input.reset();
396 }
397
398 fn name(&self) -> &'static str {
399 "CreateEdge"
400 }
401}
402
403pub struct DeleteNodeOperator {
405 store: Arc<LpgStore>,
407 input: Box<dyn Operator>,
409 node_column: usize,
411 output_schema: Vec<LogicalType>,
413 detach: bool,
415 viewing_epoch: Option<EpochId>,
417}
418
419impl DeleteNodeOperator {
420 pub fn new(
422 store: Arc<LpgStore>,
423 input: Box<dyn Operator>,
424 node_column: usize,
425 output_schema: Vec<LogicalType>,
426 detach: bool,
427 ) -> Self {
428 Self {
429 store,
430 input,
431 node_column,
432 output_schema,
433 detach,
434 viewing_epoch: None,
435 }
436 }
437
438 pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
440 self.viewing_epoch = Some(epoch);
441 self
442 }
443}
444
445impl Operator for DeleteNodeOperator {
446 fn next(&mut self) -> OperatorResult {
447 let epoch = self
449 .viewing_epoch
450 .unwrap_or_else(|| self.store.current_epoch());
451
452 if let Some(chunk) = self.input.next()? {
453 let mut deleted_count = 0;
454
455 for row in chunk.selected_indices() {
456 let node_val = chunk
457 .column(self.node_column)
458 .and_then(|c| c.get_value(row))
459 .ok_or_else(|| {
460 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
461 })?;
462
463 let node_id = match node_val {
464 Value::Int64(id) => NodeId(id as u64),
465 _ => {
466 return Err(OperatorError::TypeMismatch {
467 expected: "Int64 (node ID)".to_string(),
468 found: format!("{node_val:?}"),
469 });
470 }
471 };
472
473 if self.detach {
474 self.store.delete_node_edges(node_id);
477 }
478
479 if self.store.delete_node_at_epoch(node_id, epoch) {
481 deleted_count += 1;
482 }
483 }
484
485 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
487 if let Some(dst) = builder.column_mut(0) {
488 dst.push_value(Value::Int64(deleted_count));
489 }
490 builder.advance_row();
491
492 return Ok(Some(builder.finish()));
493 }
494 Ok(None)
495 }
496
497 fn reset(&mut self) {
498 self.input.reset();
499 }
500
501 fn name(&self) -> &'static str {
502 "DeleteNode"
503 }
504}
505
506pub struct DeleteEdgeOperator {
508 store: Arc<LpgStore>,
510 input: Box<dyn Operator>,
512 edge_column: usize,
514 output_schema: Vec<LogicalType>,
516 viewing_epoch: Option<EpochId>,
518}
519
520impl DeleteEdgeOperator {
521 pub fn new(
523 store: Arc<LpgStore>,
524 input: Box<dyn Operator>,
525 edge_column: usize,
526 output_schema: Vec<LogicalType>,
527 ) -> Self {
528 Self {
529 store,
530 input,
531 edge_column,
532 output_schema,
533 viewing_epoch: None,
534 }
535 }
536
537 pub fn with_tx_context(mut self, epoch: EpochId, _tx_id: Option<TxId>) -> Self {
539 self.viewing_epoch = Some(epoch);
540 self
541 }
542}
543
544impl Operator for DeleteEdgeOperator {
545 fn next(&mut self) -> OperatorResult {
546 let epoch = self
548 .viewing_epoch
549 .unwrap_or_else(|| self.store.current_epoch());
550
551 if let Some(chunk) = self.input.next()? {
552 let mut deleted_count = 0;
553
554 for row in chunk.selected_indices() {
555 let edge_val = chunk
556 .column(self.edge_column)
557 .and_then(|c| c.get_value(row))
558 .ok_or_else(|| {
559 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
560 })?;
561
562 let edge_id = match edge_val {
563 Value::Int64(id) => EdgeId(id as u64),
564 _ => {
565 return Err(OperatorError::TypeMismatch {
566 expected: "Int64 (edge ID)".to_string(),
567 found: format!("{edge_val:?}"),
568 });
569 }
570 };
571
572 if self.store.delete_edge_at_epoch(edge_id, epoch) {
574 deleted_count += 1;
575 }
576 }
577
578 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
580 if let Some(dst) = builder.column_mut(0) {
581 dst.push_value(Value::Int64(deleted_count));
582 }
583 builder.advance_row();
584
585 return Ok(Some(builder.finish()));
586 }
587 Ok(None)
588 }
589
590 fn reset(&mut self) {
591 self.input.reset();
592 }
593
594 fn name(&self) -> &'static str {
595 "DeleteEdge"
596 }
597}
598
599pub struct AddLabelOperator {
601 store: Arc<LpgStore>,
603 input: Box<dyn Operator>,
605 node_column: usize,
607 labels: Vec<String>,
609 output_schema: Vec<LogicalType>,
611}
612
613impl AddLabelOperator {
614 pub fn new(
616 store: Arc<LpgStore>,
617 input: Box<dyn Operator>,
618 node_column: usize,
619 labels: Vec<String>,
620 output_schema: Vec<LogicalType>,
621 ) -> Self {
622 Self {
623 store,
624 input,
625 node_column,
626 labels,
627 output_schema,
628 }
629 }
630}
631
632impl Operator for AddLabelOperator {
633 fn next(&mut self) -> OperatorResult {
634 if let Some(chunk) = self.input.next()? {
635 let mut updated_count = 0;
636
637 for row in chunk.selected_indices() {
638 let node_val = chunk
639 .column(self.node_column)
640 .and_then(|c| c.get_value(row))
641 .ok_or_else(|| {
642 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
643 })?;
644
645 let node_id = match node_val {
646 Value::Int64(id) => NodeId(id as u64),
647 _ => {
648 return Err(OperatorError::TypeMismatch {
649 expected: "Int64 (node ID)".to_string(),
650 found: format!("{node_val:?}"),
651 });
652 }
653 };
654
655 for label in &self.labels {
657 if self.store.add_label(node_id, label) {
658 updated_count += 1;
659 }
660 }
661 }
662
663 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
665 if let Some(dst) = builder.column_mut(0) {
666 dst.push_value(Value::Int64(updated_count));
667 }
668 builder.advance_row();
669
670 return Ok(Some(builder.finish()));
671 }
672 Ok(None)
673 }
674
675 fn reset(&mut self) {
676 self.input.reset();
677 }
678
679 fn name(&self) -> &'static str {
680 "AddLabel"
681 }
682}
683
684pub struct RemoveLabelOperator {
686 store: Arc<LpgStore>,
688 input: Box<dyn Operator>,
690 node_column: usize,
692 labels: Vec<String>,
694 output_schema: Vec<LogicalType>,
696}
697
698impl RemoveLabelOperator {
699 pub fn new(
701 store: Arc<LpgStore>,
702 input: Box<dyn Operator>,
703 node_column: usize,
704 labels: Vec<String>,
705 output_schema: Vec<LogicalType>,
706 ) -> Self {
707 Self {
708 store,
709 input,
710 node_column,
711 labels,
712 output_schema,
713 }
714 }
715}
716
717impl Operator for RemoveLabelOperator {
718 fn next(&mut self) -> OperatorResult {
719 if let Some(chunk) = self.input.next()? {
720 let mut updated_count = 0;
721
722 for row in chunk.selected_indices() {
723 let node_val = chunk
724 .column(self.node_column)
725 .and_then(|c| c.get_value(row))
726 .ok_or_else(|| {
727 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
728 })?;
729
730 let node_id = match node_val {
731 Value::Int64(id) => NodeId(id as u64),
732 _ => {
733 return Err(OperatorError::TypeMismatch {
734 expected: "Int64 (node ID)".to_string(),
735 found: format!("{node_val:?}"),
736 });
737 }
738 };
739
740 for label in &self.labels {
742 if self.store.remove_label(node_id, label) {
743 updated_count += 1;
744 }
745 }
746 }
747
748 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
750 if let Some(dst) = builder.column_mut(0) {
751 dst.push_value(Value::Int64(updated_count));
752 }
753 builder.advance_row();
754
755 return Ok(Some(builder.finish()));
756 }
757 Ok(None)
758 }
759
760 fn reset(&mut self) {
761 self.input.reset();
762 }
763
764 fn name(&self) -> &'static str {
765 "RemoveLabel"
766 }
767}
768
769pub struct SetPropertyOperator {
774 store: Arc<LpgStore>,
776 input: Box<dyn Operator>,
778 entity_column: usize,
780 is_edge: bool,
782 properties: Vec<(String, PropertySource)>,
784 output_schema: Vec<LogicalType>,
786}
787
788impl SetPropertyOperator {
789 pub fn new_for_node(
791 store: Arc<LpgStore>,
792 input: Box<dyn Operator>,
793 node_column: usize,
794 properties: Vec<(String, PropertySource)>,
795 output_schema: Vec<LogicalType>,
796 ) -> Self {
797 Self {
798 store,
799 input,
800 entity_column: node_column,
801 is_edge: false,
802 properties,
803 output_schema,
804 }
805 }
806
807 pub fn new_for_edge(
809 store: Arc<LpgStore>,
810 input: Box<dyn Operator>,
811 edge_column: usize,
812 properties: Vec<(String, PropertySource)>,
813 output_schema: Vec<LogicalType>,
814 ) -> Self {
815 Self {
816 store,
817 input,
818 entity_column: edge_column,
819 is_edge: true,
820 properties,
821 output_schema,
822 }
823 }
824}
825
826impl Operator for SetPropertyOperator {
827 fn next(&mut self) -> OperatorResult {
828 if let Some(chunk) = self.input.next()? {
829 let mut builder =
830 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
831
832 for row in chunk.selected_indices() {
833 let entity_val = chunk
834 .column(self.entity_column)
835 .and_then(|c| c.get_value(row))
836 .ok_or_else(|| {
837 OperatorError::ColumnNotFound(format!(
838 "entity column {}",
839 self.entity_column
840 ))
841 })?;
842
843 let entity_id = match entity_val {
844 Value::Int64(id) => id as u64,
845 _ => {
846 return Err(OperatorError::TypeMismatch {
847 expected: "Int64 (entity ID)".to_string(),
848 found: format!("{entity_val:?}"),
849 });
850 }
851 };
852
853 for (prop_name, source) in &self.properties {
855 let value = source.resolve(&chunk, row, &self.store);
856
857 if self.is_edge {
858 self.store
859 .set_edge_property(EdgeId(entity_id), prop_name, value);
860 } else {
861 self.store
862 .set_node_property(NodeId(entity_id), prop_name, value);
863 }
864 }
865
866 for col_idx in 0..chunk.column_count() {
868 if let (Some(src), Some(dst)) =
869 (chunk.column(col_idx), builder.column_mut(col_idx))
870 {
871 if let Some(val) = src.get_value(row) {
872 dst.push_value(val);
873 } else {
874 dst.push_value(Value::Null);
875 }
876 }
877 }
878
879 builder.advance_row();
880 }
881
882 return Ok(Some(builder.finish()));
883 }
884 Ok(None)
885 }
886
887 fn reset(&mut self) {
888 self.input.reset();
889 }
890
891 fn name(&self) -> &'static str {
892 "SetProperty"
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use crate::execution::DataChunk;
900 use crate::execution::chunk::DataChunkBuilder;
901
902 fn create_test_store() -> Arc<LpgStore> {
903 Arc::new(LpgStore::new())
904 }
905
906 #[test]
907 fn test_create_node_standalone() {
908 let store = create_test_store();
909
910 let mut op = CreateNodeOperator::new(
911 Arc::clone(&store),
912 None,
913 vec!["Person".to_string()],
914 vec![(
915 "name".to_string(),
916 PropertySource::Constant(Value::String("Alice".into())),
917 )],
918 vec![LogicalType::Int64],
919 0,
920 );
921
922 let chunk = op.next().unwrap().unwrap();
924 assert_eq!(chunk.row_count(), 1);
925
926 assert!(op.next().unwrap().is_none());
928
929 assert_eq!(store.node_count(), 1);
931 }
932
933 #[test]
934 fn test_create_edge() {
935 let store = create_test_store();
936
937 let node1 = store.create_node(&["Person"]);
939 let node2 = store.create_node(&["Person"]);
940
941 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
943 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
944 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
945 builder.advance_row();
946 let input_chunk = builder.finish();
947
948 struct MockInput {
950 chunk: Option<DataChunk>,
951 }
952 impl Operator for MockInput {
953 fn next(&mut self) -> OperatorResult {
954 Ok(self.chunk.take())
955 }
956 fn reset(&mut self) {}
957 fn name(&self) -> &'static str {
958 "MockInput"
959 }
960 }
961
962 let mut op = CreateEdgeOperator::new(
963 Arc::clone(&store),
964 Box::new(MockInput {
965 chunk: Some(input_chunk),
966 }),
967 0, 1, "KNOWS".to_string(),
970 vec![LogicalType::Int64, LogicalType::Int64],
971 );
972
973 let _chunk = op.next().unwrap().unwrap();
975
976 assert_eq!(store.edge_count(), 1);
978 }
979
980 #[test]
981 fn test_delete_node() {
982 let store = create_test_store();
983
984 let node_id = store.create_node(&["Person"]);
986 assert_eq!(store.node_count(), 1);
987
988 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
990 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
991 builder.advance_row();
992 let input_chunk = builder.finish();
993
994 struct MockInput {
995 chunk: Option<DataChunk>,
996 }
997 impl Operator for MockInput {
998 fn next(&mut self) -> OperatorResult {
999 Ok(self.chunk.take())
1000 }
1001 fn reset(&mut self) {}
1002 fn name(&self) -> &'static str {
1003 "MockInput"
1004 }
1005 }
1006
1007 let mut op = DeleteNodeOperator::new(
1008 Arc::clone(&store),
1009 Box::new(MockInput {
1010 chunk: Some(input_chunk),
1011 }),
1012 0,
1013 vec![LogicalType::Int64],
1014 false,
1015 );
1016
1017 let chunk = op.next().unwrap().unwrap();
1019
1020 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1022 assert_eq!(deleted, 1);
1023 assert_eq!(store.node_count(), 0);
1024 }
1025
1026 struct MockInput {
1029 chunk: Option<DataChunk>,
1030 }
1031
1032 impl MockInput {
1033 fn boxed(chunk: DataChunk) -> Box<Self> {
1034 Box::new(Self { chunk: Some(chunk) })
1035 }
1036 }
1037
1038 impl Operator for MockInput {
1039 fn next(&mut self) -> OperatorResult {
1040 Ok(self.chunk.take())
1041 }
1042 fn reset(&mut self) {}
1043 fn name(&self) -> &'static str {
1044 "MockInput"
1045 }
1046 }
1047
1048 #[test]
1051 fn test_delete_edge() {
1052 let store = create_test_store();
1053
1054 let n1 = store.create_node(&["Person"]);
1055 let n2 = store.create_node(&["Person"]);
1056 let eid = store.create_edge(n1, n2, "KNOWS");
1057 assert_eq!(store.edge_count(), 1);
1058
1059 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1060 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1061 builder.advance_row();
1062
1063 let mut op = DeleteEdgeOperator::new(
1064 Arc::clone(&store),
1065 MockInput::boxed(builder.finish()),
1066 0,
1067 vec![LogicalType::Int64],
1068 );
1069
1070 let chunk = op.next().unwrap().unwrap();
1071 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1072 assert_eq!(deleted, 1);
1073 assert_eq!(store.edge_count(), 0);
1074 }
1075
1076 #[test]
1077 fn test_delete_edge_no_input_returns_none() {
1078 let store = create_test_store();
1079
1080 struct EmptyInput;
1082 impl Operator for EmptyInput {
1083 fn next(&mut self) -> OperatorResult {
1084 Ok(None)
1085 }
1086 fn reset(&mut self) {}
1087 fn name(&self) -> &'static str {
1088 "EmptyInput"
1089 }
1090 }
1091
1092 let mut op = DeleteEdgeOperator::new(
1093 Arc::clone(&store),
1094 Box::new(EmptyInput),
1095 0,
1096 vec![LogicalType::Int64],
1097 );
1098
1099 assert!(op.next().unwrap().is_none());
1100 }
1101
1102 #[test]
1103 fn test_delete_multiple_edges() {
1104 let store = create_test_store();
1105
1106 let n1 = store.create_node(&["N"]);
1107 let n2 = store.create_node(&["N"]);
1108 let e1 = store.create_edge(n1, n2, "R");
1109 let e2 = store.create_edge(n2, n1, "S");
1110 assert_eq!(store.edge_count(), 2);
1111
1112 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1113 builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1114 builder.advance_row();
1115 builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1116 builder.advance_row();
1117
1118 let mut op = DeleteEdgeOperator::new(
1119 Arc::clone(&store),
1120 MockInput::boxed(builder.finish()),
1121 0,
1122 vec![LogicalType::Int64],
1123 );
1124
1125 let chunk = op.next().unwrap().unwrap();
1126 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1127 assert_eq!(deleted, 2);
1128 assert_eq!(store.edge_count(), 0);
1129 }
1130
1131 #[test]
1134 fn test_delete_node_detach() {
1135 let store = create_test_store();
1136
1137 let n1 = store.create_node(&["Person"]);
1138 let n2 = store.create_node(&["Person"]);
1139 store.create_edge(n1, n2, "KNOWS");
1140 store.create_edge(n2, n1, "FOLLOWS");
1141 assert_eq!(store.edge_count(), 2);
1142
1143 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1144 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1145 builder.advance_row();
1146
1147 let mut op = DeleteNodeOperator::new(
1148 Arc::clone(&store),
1149 MockInput::boxed(builder.finish()),
1150 0,
1151 vec![LogicalType::Int64],
1152 true, );
1154
1155 let chunk = op.next().unwrap().unwrap();
1156 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1157 assert_eq!(deleted, 1);
1158 assert_eq!(store.node_count(), 1);
1159 assert_eq!(store.edge_count(), 0); }
1161
1162 #[test]
1165 fn test_add_label() {
1166 let store = create_test_store();
1167
1168 let node = store.create_node(&["Person"]);
1169
1170 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1171 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1172 builder.advance_row();
1173
1174 let mut op = AddLabelOperator::new(
1175 Arc::clone(&store),
1176 MockInput::boxed(builder.finish()),
1177 0,
1178 vec!["Employee".to_string()],
1179 vec![LogicalType::Int64],
1180 );
1181
1182 let chunk = op.next().unwrap().unwrap();
1183 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1184 assert_eq!(updated, 1);
1185
1186 let node_data = store.get_node(node).unwrap();
1188 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1189 assert!(labels.contains(&"Person"));
1190 assert!(labels.contains(&"Employee"));
1191 }
1192
1193 #[test]
1194 fn test_add_multiple_labels() {
1195 let store = create_test_store();
1196
1197 let node = store.create_node(&["Base"]);
1198
1199 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1200 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1201 builder.advance_row();
1202
1203 let mut op = AddLabelOperator::new(
1204 Arc::clone(&store),
1205 MockInput::boxed(builder.finish()),
1206 0,
1207 vec!["LabelA".to_string(), "LabelB".to_string()],
1208 vec![LogicalType::Int64],
1209 );
1210
1211 let chunk = op.next().unwrap().unwrap();
1212 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1213 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1216 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1217 assert!(labels.contains(&"LabelA"));
1218 assert!(labels.contains(&"LabelB"));
1219 }
1220
1221 #[test]
1222 fn test_add_label_no_input_returns_none() {
1223 let store = create_test_store();
1224
1225 struct EmptyInput;
1226 impl Operator for EmptyInput {
1227 fn next(&mut self) -> OperatorResult {
1228 Ok(None)
1229 }
1230 fn reset(&mut self) {}
1231 fn name(&self) -> &'static str {
1232 "EmptyInput"
1233 }
1234 }
1235
1236 let mut op = AddLabelOperator::new(
1237 Arc::clone(&store),
1238 Box::new(EmptyInput),
1239 0,
1240 vec!["Foo".to_string()],
1241 vec![LogicalType::Int64],
1242 );
1243
1244 assert!(op.next().unwrap().is_none());
1245 }
1246
1247 #[test]
1250 fn test_remove_label() {
1251 let store = create_test_store();
1252
1253 let node = store.create_node(&["Person", "Employee"]);
1254
1255 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1256 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1257 builder.advance_row();
1258
1259 let mut op = RemoveLabelOperator::new(
1260 Arc::clone(&store),
1261 MockInput::boxed(builder.finish()),
1262 0,
1263 vec!["Employee".to_string()],
1264 vec![LogicalType::Int64],
1265 );
1266
1267 let chunk = op.next().unwrap().unwrap();
1268 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1269 assert_eq!(updated, 1);
1270
1271 let node_data = store.get_node(node).unwrap();
1273 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1274 assert!(labels.contains(&"Person"));
1275 assert!(!labels.contains(&"Employee"));
1276 }
1277
1278 #[test]
1279 fn test_remove_nonexistent_label() {
1280 let store = create_test_store();
1281
1282 let node = store.create_node(&["Person"]);
1283
1284 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1285 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1286 builder.advance_row();
1287
1288 let mut op = RemoveLabelOperator::new(
1289 Arc::clone(&store),
1290 MockInput::boxed(builder.finish()),
1291 0,
1292 vec!["NonExistent".to_string()],
1293 vec![LogicalType::Int64],
1294 );
1295
1296 let chunk = op.next().unwrap().unwrap();
1297 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1298 assert_eq!(updated, 0); }
1300
1301 #[test]
1304 fn test_set_node_property_constant() {
1305 let store = create_test_store();
1306
1307 let node = store.create_node(&["Person"]);
1308
1309 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1310 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1311 builder.advance_row();
1312
1313 let mut op = SetPropertyOperator::new_for_node(
1314 Arc::clone(&store),
1315 MockInput::boxed(builder.finish()),
1316 0,
1317 vec![(
1318 "name".to_string(),
1319 PropertySource::Constant(Value::String("Alice".into())),
1320 )],
1321 vec![LogicalType::Int64],
1322 );
1323
1324 let chunk = op.next().unwrap().unwrap();
1325 assert_eq!(chunk.row_count(), 1);
1326
1327 let node_data = store.get_node(node).unwrap();
1329 assert_eq!(
1330 node_data
1331 .properties
1332 .get(&grafeo_common::types::PropertyKey::new("name")),
1333 Some(&Value::String("Alice".into()))
1334 );
1335 }
1336
1337 #[test]
1338 fn test_set_node_property_from_column() {
1339 let store = create_test_store();
1340
1341 let node = store.create_node(&["Person"]);
1342
1343 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1345 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1346 builder
1347 .column_mut(1)
1348 .unwrap()
1349 .push_value(Value::String("Bob".into()));
1350 builder.advance_row();
1351
1352 let mut op = SetPropertyOperator::new_for_node(
1353 Arc::clone(&store),
1354 MockInput::boxed(builder.finish()),
1355 0,
1356 vec![("name".to_string(), PropertySource::Column(1))],
1357 vec![LogicalType::Int64, LogicalType::String],
1358 );
1359
1360 let chunk = op.next().unwrap().unwrap();
1361 assert_eq!(chunk.row_count(), 1);
1362
1363 let node_data = store.get_node(node).unwrap();
1364 assert_eq!(
1365 node_data
1366 .properties
1367 .get(&grafeo_common::types::PropertyKey::new("name")),
1368 Some(&Value::String("Bob".into()))
1369 );
1370 }
1371
1372 #[test]
1373 fn test_set_edge_property() {
1374 let store = create_test_store();
1375
1376 let n1 = store.create_node(&["N"]);
1377 let n2 = store.create_node(&["N"]);
1378 let eid = store.create_edge(n1, n2, "KNOWS");
1379
1380 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1381 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1382 builder.advance_row();
1383
1384 let mut op = SetPropertyOperator::new_for_edge(
1385 Arc::clone(&store),
1386 MockInput::boxed(builder.finish()),
1387 0,
1388 vec![(
1389 "weight".to_string(),
1390 PropertySource::Constant(Value::Float64(0.75)),
1391 )],
1392 vec![LogicalType::Int64],
1393 );
1394
1395 let chunk = op.next().unwrap().unwrap();
1396 assert_eq!(chunk.row_count(), 1);
1397
1398 let edge_data = store.get_edge(eid).unwrap();
1399 assert_eq!(
1400 edge_data
1401 .properties
1402 .get(&grafeo_common::types::PropertyKey::new("weight")),
1403 Some(&Value::Float64(0.75))
1404 );
1405 }
1406
1407 #[test]
1408 fn test_set_multiple_properties() {
1409 let store = create_test_store();
1410
1411 let node = store.create_node(&["Person"]);
1412
1413 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1414 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1415 builder.advance_row();
1416
1417 let mut op = SetPropertyOperator::new_for_node(
1418 Arc::clone(&store),
1419 MockInput::boxed(builder.finish()),
1420 0,
1421 vec![
1422 (
1423 "name".to_string(),
1424 PropertySource::Constant(Value::String("Alice".into())),
1425 ),
1426 (
1427 "age".to_string(),
1428 PropertySource::Constant(Value::Int64(30)),
1429 ),
1430 ],
1431 vec![LogicalType::Int64],
1432 );
1433
1434 op.next().unwrap().unwrap();
1435
1436 let node_data = store.get_node(node).unwrap();
1437 assert_eq!(
1438 node_data
1439 .properties
1440 .get(&grafeo_common::types::PropertyKey::new("name")),
1441 Some(&Value::String("Alice".into()))
1442 );
1443 assert_eq!(
1444 node_data
1445 .properties
1446 .get(&grafeo_common::types::PropertyKey::new("age")),
1447 Some(&Value::Int64(30))
1448 );
1449 }
1450
1451 #[test]
1452 fn test_set_property_no_input_returns_none() {
1453 let store = create_test_store();
1454
1455 struct EmptyInput;
1456 impl Operator for EmptyInput {
1457 fn next(&mut self) -> OperatorResult {
1458 Ok(None)
1459 }
1460 fn reset(&mut self) {}
1461 fn name(&self) -> &'static str {
1462 "EmptyInput"
1463 }
1464 }
1465
1466 let mut op = SetPropertyOperator::new_for_node(
1467 Arc::clone(&store),
1468 Box::new(EmptyInput),
1469 0,
1470 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1471 vec![LogicalType::Int64],
1472 );
1473
1474 assert!(op.next().unwrap().is_none());
1475 }
1476
1477 #[test]
1480 fn test_operator_names() {
1481 let store = create_test_store();
1482
1483 struct EmptyInput;
1484 impl Operator for EmptyInput {
1485 fn next(&mut self) -> OperatorResult {
1486 Ok(None)
1487 }
1488 fn reset(&mut self) {}
1489 fn name(&self) -> &'static str {
1490 "EmptyInput"
1491 }
1492 }
1493
1494 let op = DeleteEdgeOperator::new(
1495 Arc::clone(&store),
1496 Box::new(EmptyInput),
1497 0,
1498 vec![LogicalType::Int64],
1499 );
1500 assert_eq!(op.name(), "DeleteEdge");
1501
1502 let op = AddLabelOperator::new(
1503 Arc::clone(&store),
1504 Box::new(EmptyInput),
1505 0,
1506 vec!["L".to_string()],
1507 vec![LogicalType::Int64],
1508 );
1509 assert_eq!(op.name(), "AddLabel");
1510
1511 let op = RemoveLabelOperator::new(
1512 Arc::clone(&store),
1513 Box::new(EmptyInput),
1514 0,
1515 vec!["L".to_string()],
1516 vec![LogicalType::Int64],
1517 );
1518 assert_eq!(op.name(), "RemoveLabel");
1519
1520 let op = SetPropertyOperator::new_for_node(
1521 Arc::clone(&store),
1522 Box::new(EmptyInput),
1523 0,
1524 vec![],
1525 vec![LogicalType::Int64],
1526 );
1527 assert_eq!(op.name(), "SetProperty");
1528 }
1529}