1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17pub struct CreateNodeOperator {
22 store: Arc<dyn GraphStoreMut>,
24 input: Option<Box<dyn Operator>>,
26 labels: Vec<String>,
28 properties: Vec<(String, PropertySource)>,
30 output_schema: Vec<LogicalType>,
32 output_column: usize,
34 executed: bool,
36 viewing_epoch: Option<EpochId>,
38 tx_id: Option<TxId>,
40}
41
42#[derive(Debug, Clone)]
44pub enum PropertySource {
45 Column(usize),
47 Constant(Value),
49 PropertyAccess {
51 column: usize,
53 property: String,
55 },
56}
57
58impl PropertySource {
59 pub fn resolve(
61 &self,
62 chunk: &crate::execution::chunk::DataChunk,
63 row: usize,
64 store: &dyn GraphStore,
65 ) -> Value {
66 match self {
67 PropertySource::Column(col_idx) => chunk
68 .column(*col_idx)
69 .and_then(|c| c.get_value(row))
70 .unwrap_or(Value::Null),
71 PropertySource::Constant(v) => v.clone(),
72 PropertySource::PropertyAccess { column, property } => {
73 let Some(col) = chunk.column(*column) else {
74 return Value::Null;
75 };
76 if let Some(node_id) = col.get_node_id(row) {
78 store
79 .get_node(node_id)
80 .and_then(|node| node.get_property(property).cloned())
81 .unwrap_or(Value::Null)
82 } else if let Some(edge_id) = col.get_edge_id(row) {
83 store
84 .get_edge(edge_id)
85 .and_then(|edge| edge.get_property(property).cloned())
86 .unwrap_or(Value::Null)
87 } else if let Some(Value::Map(map)) = col.get_value(row) {
88 let key = PropertyKey::new(property);
89 map.get(&key).cloned().unwrap_or(Value::Null)
90 } else {
91 Value::Null
92 }
93 }
94 }
95 }
96}
97
98impl CreateNodeOperator {
99 pub fn new(
109 store: Arc<dyn GraphStoreMut>,
110 input: Option<Box<dyn Operator>>,
111 labels: Vec<String>,
112 properties: Vec<(String, PropertySource)>,
113 output_schema: Vec<LogicalType>,
114 output_column: usize,
115 ) -> Self {
116 Self {
117 store,
118 input,
119 labels,
120 properties,
121 output_schema,
122 output_column,
123 executed: false,
124 viewing_epoch: None,
125 tx_id: None,
126 }
127 }
128
129 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
131 self.viewing_epoch = Some(epoch);
132 self.tx_id = tx_id;
133 self
134 }
135}
136
137impl Operator for CreateNodeOperator {
138 fn next(&mut self) -> OperatorResult {
139 let epoch = self
141 .viewing_epoch
142 .unwrap_or_else(|| self.store.current_epoch());
143 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
144
145 if let Some(ref mut input) = self.input {
146 if let Some(chunk) = input.next()? {
148 let mut builder =
149 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
150
151 for row in chunk.selected_indices() {
152 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
154 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
155
156 for (prop_name, source) in &self.properties {
158 let value =
159 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
160 self.store.set_node_property(node_id, prop_name, value);
161 }
162
163 for col_idx in 0..chunk.column_count() {
165 if col_idx < self.output_column
166 && let (Some(src), Some(dst)) =
167 (chunk.column(col_idx), builder.column_mut(col_idx))
168 {
169 if let Some(val) = src.get_value(row) {
170 dst.push_value(val);
171 } else {
172 dst.push_value(Value::Null);
173 }
174 }
175 }
176
177 if let Some(dst) = builder.column_mut(self.output_column) {
179 dst.push_value(Value::Int64(node_id.0 as i64));
180 }
181
182 builder.advance_row();
183 }
184
185 return Ok(Some(builder.finish()));
186 }
187 Ok(None)
188 } else {
189 if self.executed {
191 return Ok(None);
192 }
193 self.executed = true;
194
195 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
197 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
198
199 for (prop_name, source) in &self.properties {
201 if let PropertySource::Constant(value) = source {
202 self.store
203 .set_node_property(node_id, prop_name, value.clone());
204 }
205 }
206
207 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
209 if let Some(dst) = builder.column_mut(self.output_column) {
210 dst.push_value(Value::Int64(node_id.0 as i64));
211 }
212 builder.advance_row();
213
214 Ok(Some(builder.finish()))
215 }
216 }
217
218 fn reset(&mut self) {
219 if let Some(ref mut input) = self.input {
220 input.reset();
221 }
222 self.executed = false;
223 }
224
225 fn name(&self) -> &'static str {
226 "CreateNode"
227 }
228}
229
230pub struct CreateEdgeOperator {
232 store: Arc<dyn GraphStoreMut>,
234 input: Box<dyn Operator>,
236 from_column: usize,
238 to_column: usize,
240 edge_type: String,
242 properties: Vec<(String, PropertySource)>,
244 output_schema: Vec<LogicalType>,
246 output_column: Option<usize>,
248 viewing_epoch: Option<EpochId>,
250 tx_id: Option<TxId>,
252}
253
254impl CreateEdgeOperator {
255 pub fn new(
262 store: Arc<dyn GraphStoreMut>,
263 input: Box<dyn Operator>,
264 from_column: usize,
265 to_column: usize,
266 edge_type: String,
267 output_schema: Vec<LogicalType>,
268 ) -> Self {
269 Self {
270 store,
271 input,
272 from_column,
273 to_column,
274 edge_type,
275 properties: Vec::new(),
276 output_schema,
277 output_column: None,
278 viewing_epoch: None,
279 tx_id: None,
280 }
281 }
282
283 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
285 self.properties = properties;
286 self
287 }
288
289 pub fn with_output_column(mut self, column: usize) -> Self {
291 self.output_column = Some(column);
292 self
293 }
294
295 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
297 self.viewing_epoch = Some(epoch);
298 self.tx_id = tx_id;
299 self
300 }
301}
302
303impl Operator for CreateEdgeOperator {
304 fn next(&mut self) -> OperatorResult {
305 let epoch = self
307 .viewing_epoch
308 .unwrap_or_else(|| self.store.current_epoch());
309 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
310
311 if let Some(chunk) = self.input.next()? {
312 let mut builder =
313 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
314
315 for row in chunk.selected_indices() {
316 let from_id = chunk
318 .column(self.from_column)
319 .and_then(|c| c.get_value(row))
320 .ok_or_else(|| {
321 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
322 })?;
323
324 let to_id = chunk
325 .column(self.to_column)
326 .and_then(|c| c.get_value(row))
327 .ok_or_else(|| {
328 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
329 })?;
330
331 let from_node_id = match from_id {
333 Value::Int64(id) => NodeId(id as u64),
334 _ => {
335 return Err(OperatorError::TypeMismatch {
336 expected: "Int64 (node ID)".to_string(),
337 found: format!("{from_id:?}"),
338 });
339 }
340 };
341
342 let to_node_id = match to_id {
343 Value::Int64(id) => NodeId(id as u64),
344 _ => {
345 return Err(OperatorError::TypeMismatch {
346 expected: "Int64 (node ID)".to_string(),
347 found: format!("{to_id:?}"),
348 });
349 }
350 };
351
352 let edge_id = self.store.create_edge_versioned(
354 from_node_id,
355 to_node_id,
356 &self.edge_type,
357 epoch,
358 tx,
359 );
360
361 for (prop_name, source) in &self.properties {
363 let value = source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
364 self.store.set_edge_property(edge_id, prop_name, value);
365 }
366
367 for col_idx in 0..chunk.column_count() {
369 if let (Some(src), Some(dst)) =
370 (chunk.column(col_idx), builder.column_mut(col_idx))
371 {
372 if let Some(val) = src.get_value(row) {
373 dst.push_value(val);
374 } else {
375 dst.push_value(Value::Null);
376 }
377 }
378 }
379
380 if let Some(out_col) = self.output_column
382 && let Some(dst) = builder.column_mut(out_col)
383 {
384 dst.push_value(Value::Int64(edge_id.0 as i64));
385 }
386
387 builder.advance_row();
388 }
389
390 return Ok(Some(builder.finish()));
391 }
392 Ok(None)
393 }
394
395 fn reset(&mut self) {
396 self.input.reset();
397 }
398
399 fn name(&self) -> &'static str {
400 "CreateEdge"
401 }
402}
403
404pub struct DeleteNodeOperator {
406 store: Arc<dyn GraphStoreMut>,
408 input: Box<dyn Operator>,
410 node_column: usize,
412 output_schema: Vec<LogicalType>,
414 detach: bool,
416 viewing_epoch: Option<EpochId>,
418 tx_id: Option<TxId>,
420}
421
422impl DeleteNodeOperator {
423 pub fn new(
425 store: Arc<dyn GraphStoreMut>,
426 input: Box<dyn Operator>,
427 node_column: usize,
428 output_schema: Vec<LogicalType>,
429 detach: bool,
430 ) -> Self {
431 Self {
432 store,
433 input,
434 node_column,
435 output_schema,
436 detach,
437 viewing_epoch: None,
438 tx_id: None,
439 }
440 }
441
442 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
444 self.viewing_epoch = Some(epoch);
445 self.tx_id = tx_id;
446 self
447 }
448}
449
450impl Operator for DeleteNodeOperator {
451 fn next(&mut self) -> OperatorResult {
452 let epoch = self
454 .viewing_epoch
455 .unwrap_or_else(|| self.store.current_epoch());
456 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
457
458 if let Some(chunk) = self.input.next()? {
459 let mut deleted_count = 0;
460
461 for row in chunk.selected_indices() {
462 let node_val = chunk
463 .column(self.node_column)
464 .and_then(|c| c.get_value(row))
465 .ok_or_else(|| {
466 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
467 })?;
468
469 let node_id = match node_val {
470 Value::Int64(id) => NodeId(id as u64),
471 _ => {
472 return Err(OperatorError::TypeMismatch {
473 expected: "Int64 (node ID)".to_string(),
474 found: format!("{node_val:?}"),
475 });
476 }
477 };
478
479 if self.detach {
480 self.store.delete_node_edges(node_id);
482 }
483
484 if self.store.delete_node_versioned(node_id, epoch, tx) {
486 deleted_count += 1;
487 }
488 }
489
490 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
492 if let Some(dst) = builder.column_mut(0) {
493 dst.push_value(Value::Int64(deleted_count));
494 }
495 builder.advance_row();
496
497 return Ok(Some(builder.finish()));
498 }
499 Ok(None)
500 }
501
502 fn reset(&mut self) {
503 self.input.reset();
504 }
505
506 fn name(&self) -> &'static str {
507 "DeleteNode"
508 }
509}
510
511pub struct DeleteEdgeOperator {
513 store: Arc<dyn GraphStoreMut>,
515 input: Box<dyn Operator>,
517 edge_column: usize,
519 output_schema: Vec<LogicalType>,
521 viewing_epoch: Option<EpochId>,
523 tx_id: Option<TxId>,
525}
526
527impl DeleteEdgeOperator {
528 pub fn new(
530 store: Arc<dyn GraphStoreMut>,
531 input: Box<dyn Operator>,
532 edge_column: usize,
533 output_schema: Vec<LogicalType>,
534 ) -> Self {
535 Self {
536 store,
537 input,
538 edge_column,
539 output_schema,
540 viewing_epoch: None,
541 tx_id: None,
542 }
543 }
544
545 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
547 self.viewing_epoch = Some(epoch);
548 self.tx_id = tx_id;
549 self
550 }
551}
552
553impl Operator for DeleteEdgeOperator {
554 fn next(&mut self) -> OperatorResult {
555 let epoch = self
557 .viewing_epoch
558 .unwrap_or_else(|| self.store.current_epoch());
559 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
560
561 if let Some(chunk) = self.input.next()? {
562 let mut deleted_count = 0;
563
564 for row in chunk.selected_indices() {
565 let edge_val = chunk
566 .column(self.edge_column)
567 .and_then(|c| c.get_value(row))
568 .ok_or_else(|| {
569 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
570 })?;
571
572 let edge_id = match edge_val {
573 Value::Int64(id) => EdgeId(id as u64),
574 _ => {
575 return Err(OperatorError::TypeMismatch {
576 expected: "Int64 (edge ID)".to_string(),
577 found: format!("{edge_val:?}"),
578 });
579 }
580 };
581
582 if self.store.delete_edge_versioned(edge_id, epoch, tx) {
584 deleted_count += 1;
585 }
586 }
587
588 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
590 if let Some(dst) = builder.column_mut(0) {
591 dst.push_value(Value::Int64(deleted_count));
592 }
593 builder.advance_row();
594
595 return Ok(Some(builder.finish()));
596 }
597 Ok(None)
598 }
599
600 fn reset(&mut self) {
601 self.input.reset();
602 }
603
604 fn name(&self) -> &'static str {
605 "DeleteEdge"
606 }
607}
608
609pub struct AddLabelOperator {
611 store: Arc<dyn GraphStoreMut>,
613 input: Box<dyn Operator>,
615 node_column: usize,
617 labels: Vec<String>,
619 output_schema: Vec<LogicalType>,
621}
622
623impl AddLabelOperator {
624 pub fn new(
626 store: Arc<dyn GraphStoreMut>,
627 input: Box<dyn Operator>,
628 node_column: usize,
629 labels: Vec<String>,
630 output_schema: Vec<LogicalType>,
631 ) -> Self {
632 Self {
633 store,
634 input,
635 node_column,
636 labels,
637 output_schema,
638 }
639 }
640}
641
642impl Operator for AddLabelOperator {
643 fn next(&mut self) -> OperatorResult {
644 if let Some(chunk) = self.input.next()? {
645 let mut updated_count = 0;
646
647 for row in chunk.selected_indices() {
648 let node_val = chunk
649 .column(self.node_column)
650 .and_then(|c| c.get_value(row))
651 .ok_or_else(|| {
652 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
653 })?;
654
655 let node_id = match node_val {
656 Value::Int64(id) => NodeId(id as u64),
657 _ => {
658 return Err(OperatorError::TypeMismatch {
659 expected: "Int64 (node ID)".to_string(),
660 found: format!("{node_val:?}"),
661 });
662 }
663 };
664
665 for label in &self.labels {
667 if self.store.add_label(node_id, label) {
668 updated_count += 1;
669 }
670 }
671 }
672
673 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
675 if let Some(dst) = builder.column_mut(0) {
676 dst.push_value(Value::Int64(updated_count));
677 }
678 builder.advance_row();
679
680 return Ok(Some(builder.finish()));
681 }
682 Ok(None)
683 }
684
685 fn reset(&mut self) {
686 self.input.reset();
687 }
688
689 fn name(&self) -> &'static str {
690 "AddLabel"
691 }
692}
693
694pub struct RemoveLabelOperator {
696 store: Arc<dyn GraphStoreMut>,
698 input: Box<dyn Operator>,
700 node_column: usize,
702 labels: Vec<String>,
704 output_schema: Vec<LogicalType>,
706}
707
708impl RemoveLabelOperator {
709 pub fn new(
711 store: Arc<dyn GraphStoreMut>,
712 input: Box<dyn Operator>,
713 node_column: usize,
714 labels: Vec<String>,
715 output_schema: Vec<LogicalType>,
716 ) -> Self {
717 Self {
718 store,
719 input,
720 node_column,
721 labels,
722 output_schema,
723 }
724 }
725}
726
727impl Operator for RemoveLabelOperator {
728 fn next(&mut self) -> OperatorResult {
729 if let Some(chunk) = self.input.next()? {
730 let mut updated_count = 0;
731
732 for row in chunk.selected_indices() {
733 let node_val = chunk
734 .column(self.node_column)
735 .and_then(|c| c.get_value(row))
736 .ok_or_else(|| {
737 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
738 })?;
739
740 let node_id = match node_val {
741 Value::Int64(id) => NodeId(id as u64),
742 _ => {
743 return Err(OperatorError::TypeMismatch {
744 expected: "Int64 (node ID)".to_string(),
745 found: format!("{node_val:?}"),
746 });
747 }
748 };
749
750 for label in &self.labels {
752 if self.store.remove_label(node_id, label) {
753 updated_count += 1;
754 }
755 }
756 }
757
758 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
760 if let Some(dst) = builder.column_mut(0) {
761 dst.push_value(Value::Int64(updated_count));
762 }
763 builder.advance_row();
764
765 return Ok(Some(builder.finish()));
766 }
767 Ok(None)
768 }
769
770 fn reset(&mut self) {
771 self.input.reset();
772 }
773
774 fn name(&self) -> &'static str {
775 "RemoveLabel"
776 }
777}
778
779pub struct SetPropertyOperator {
784 store: Arc<dyn GraphStoreMut>,
786 input: Box<dyn Operator>,
788 entity_column: usize,
790 is_edge: bool,
792 properties: Vec<(String, PropertySource)>,
794 output_schema: Vec<LogicalType>,
796}
797
798impl SetPropertyOperator {
799 pub fn new_for_node(
801 store: Arc<dyn GraphStoreMut>,
802 input: Box<dyn Operator>,
803 node_column: usize,
804 properties: Vec<(String, PropertySource)>,
805 output_schema: Vec<LogicalType>,
806 ) -> Self {
807 Self {
808 store,
809 input,
810 entity_column: node_column,
811 is_edge: false,
812 properties,
813 output_schema,
814 }
815 }
816
817 pub fn new_for_edge(
819 store: Arc<dyn GraphStoreMut>,
820 input: Box<dyn Operator>,
821 edge_column: usize,
822 properties: Vec<(String, PropertySource)>,
823 output_schema: Vec<LogicalType>,
824 ) -> Self {
825 Self {
826 store,
827 input,
828 entity_column: edge_column,
829 is_edge: true,
830 properties,
831 output_schema,
832 }
833 }
834}
835
836impl Operator for SetPropertyOperator {
837 fn next(&mut self) -> OperatorResult {
838 if let Some(chunk) = self.input.next()? {
839 let mut builder =
840 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
841
842 for row in chunk.selected_indices() {
843 let entity_val = chunk
844 .column(self.entity_column)
845 .and_then(|c| c.get_value(row))
846 .ok_or_else(|| {
847 OperatorError::ColumnNotFound(format!(
848 "entity column {}",
849 self.entity_column
850 ))
851 })?;
852
853 let entity_id = match entity_val {
854 Value::Int64(id) => id as u64,
855 _ => {
856 return Err(OperatorError::TypeMismatch {
857 expected: "Int64 (entity ID)".to_string(),
858 found: format!("{entity_val:?}"),
859 });
860 }
861 };
862
863 for (prop_name, source) in &self.properties {
865 let value = source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
866
867 if self.is_edge {
868 self.store
869 .set_edge_property(EdgeId(entity_id), prop_name, value);
870 } else {
871 self.store
872 .set_node_property(NodeId(entity_id), prop_name, value);
873 }
874 }
875
876 for col_idx in 0..chunk.column_count() {
878 if let (Some(src), Some(dst)) =
879 (chunk.column(col_idx), builder.column_mut(col_idx))
880 {
881 if let Some(val) = src.get_value(row) {
882 dst.push_value(val);
883 } else {
884 dst.push_value(Value::Null);
885 }
886 }
887 }
888
889 builder.advance_row();
890 }
891
892 return Ok(Some(builder.finish()));
893 }
894 Ok(None)
895 }
896
897 fn reset(&mut self) {
898 self.input.reset();
899 }
900
901 fn name(&self) -> &'static str {
902 "SetProperty"
903 }
904}
905
906#[cfg(test)]
907mod tests {
908 use super::*;
909 use crate::execution::DataChunk;
910 use crate::execution::chunk::DataChunkBuilder;
911 use crate::graph::lpg::LpgStore;
912
913 fn create_test_store() -> Arc<dyn GraphStoreMut> {
914 Arc::new(LpgStore::new())
915 }
916
917 #[test]
918 fn test_create_node_standalone() {
919 let store = create_test_store();
920
921 let mut op = CreateNodeOperator::new(
922 Arc::clone(&store),
923 None,
924 vec!["Person".to_string()],
925 vec![(
926 "name".to_string(),
927 PropertySource::Constant(Value::String("Alice".into())),
928 )],
929 vec![LogicalType::Int64],
930 0,
931 );
932
933 let chunk = op.next().unwrap().unwrap();
935 assert_eq!(chunk.row_count(), 1);
936
937 assert!(op.next().unwrap().is_none());
939
940 assert_eq!(store.node_count(), 1);
942 }
943
944 #[test]
945 fn test_create_edge() {
946 let store = create_test_store();
947
948 let node1 = store.create_node(&["Person"]);
950 let node2 = store.create_node(&["Person"]);
951
952 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
954 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
955 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
956 builder.advance_row();
957 let input_chunk = builder.finish();
958
959 struct MockInput {
961 chunk: Option<DataChunk>,
962 }
963 impl Operator for MockInput {
964 fn next(&mut self) -> OperatorResult {
965 Ok(self.chunk.take())
966 }
967 fn reset(&mut self) {}
968 fn name(&self) -> &'static str {
969 "MockInput"
970 }
971 }
972
973 let mut op = CreateEdgeOperator::new(
974 Arc::clone(&store),
975 Box::new(MockInput {
976 chunk: Some(input_chunk),
977 }),
978 0, 1, "KNOWS".to_string(),
981 vec![LogicalType::Int64, LogicalType::Int64],
982 );
983
984 let _chunk = op.next().unwrap().unwrap();
986
987 assert_eq!(store.edge_count(), 1);
989 }
990
991 #[test]
992 fn test_delete_node() {
993 let store = create_test_store();
994
995 let node_id = store.create_node(&["Person"]);
997 assert_eq!(store.node_count(), 1);
998
999 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1001 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
1002 builder.advance_row();
1003 let input_chunk = builder.finish();
1004
1005 struct MockInput {
1006 chunk: Option<DataChunk>,
1007 }
1008 impl Operator for MockInput {
1009 fn next(&mut self) -> OperatorResult {
1010 Ok(self.chunk.take())
1011 }
1012 fn reset(&mut self) {}
1013 fn name(&self) -> &'static str {
1014 "MockInput"
1015 }
1016 }
1017
1018 let mut op = DeleteNodeOperator::new(
1019 Arc::clone(&store),
1020 Box::new(MockInput {
1021 chunk: Some(input_chunk),
1022 }),
1023 0,
1024 vec![LogicalType::Int64],
1025 false,
1026 );
1027
1028 let chunk = op.next().unwrap().unwrap();
1030
1031 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1033 assert_eq!(deleted, 1);
1034 assert_eq!(store.node_count(), 0);
1035 }
1036
1037 struct MockInput {
1040 chunk: Option<DataChunk>,
1041 }
1042
1043 impl MockInput {
1044 fn boxed(chunk: DataChunk) -> Box<Self> {
1045 Box::new(Self { chunk: Some(chunk) })
1046 }
1047 }
1048
1049 impl Operator for MockInput {
1050 fn next(&mut self) -> OperatorResult {
1051 Ok(self.chunk.take())
1052 }
1053 fn reset(&mut self) {}
1054 fn name(&self) -> &'static str {
1055 "MockInput"
1056 }
1057 }
1058
1059 #[test]
1062 fn test_delete_edge() {
1063 let store = create_test_store();
1064
1065 let n1 = store.create_node(&["Person"]);
1066 let n2 = store.create_node(&["Person"]);
1067 let eid = store.create_edge(n1, n2, "KNOWS");
1068 assert_eq!(store.edge_count(), 1);
1069
1070 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1071 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1072 builder.advance_row();
1073
1074 let mut op = DeleteEdgeOperator::new(
1075 Arc::clone(&store),
1076 MockInput::boxed(builder.finish()),
1077 0,
1078 vec![LogicalType::Int64],
1079 );
1080
1081 let chunk = op.next().unwrap().unwrap();
1082 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1083 assert_eq!(deleted, 1);
1084 assert_eq!(store.edge_count(), 0);
1085 }
1086
1087 #[test]
1088 fn test_delete_edge_no_input_returns_none() {
1089 let store = create_test_store();
1090
1091 struct EmptyInput;
1093 impl Operator for EmptyInput {
1094 fn next(&mut self) -> OperatorResult {
1095 Ok(None)
1096 }
1097 fn reset(&mut self) {}
1098 fn name(&self) -> &'static str {
1099 "EmptyInput"
1100 }
1101 }
1102
1103 let mut op = DeleteEdgeOperator::new(
1104 Arc::clone(&store),
1105 Box::new(EmptyInput),
1106 0,
1107 vec![LogicalType::Int64],
1108 );
1109
1110 assert!(op.next().unwrap().is_none());
1111 }
1112
1113 #[test]
1114 fn test_delete_multiple_edges() {
1115 let store = create_test_store();
1116
1117 let n1 = store.create_node(&["N"]);
1118 let n2 = store.create_node(&["N"]);
1119 let e1 = store.create_edge(n1, n2, "R");
1120 let e2 = store.create_edge(n2, n1, "S");
1121 assert_eq!(store.edge_count(), 2);
1122
1123 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1124 builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1125 builder.advance_row();
1126 builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1127 builder.advance_row();
1128
1129 let mut op = DeleteEdgeOperator::new(
1130 Arc::clone(&store),
1131 MockInput::boxed(builder.finish()),
1132 0,
1133 vec![LogicalType::Int64],
1134 );
1135
1136 let chunk = op.next().unwrap().unwrap();
1137 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1138 assert_eq!(deleted, 2);
1139 assert_eq!(store.edge_count(), 0);
1140 }
1141
1142 #[test]
1145 fn test_delete_node_detach() {
1146 let store = create_test_store();
1147
1148 let n1 = store.create_node(&["Person"]);
1149 let n2 = store.create_node(&["Person"]);
1150 store.create_edge(n1, n2, "KNOWS");
1151 store.create_edge(n2, n1, "FOLLOWS");
1152 assert_eq!(store.edge_count(), 2);
1153
1154 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1155 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1156 builder.advance_row();
1157
1158 let mut op = DeleteNodeOperator::new(
1159 Arc::clone(&store),
1160 MockInput::boxed(builder.finish()),
1161 0,
1162 vec![LogicalType::Int64],
1163 true, );
1165
1166 let chunk = op.next().unwrap().unwrap();
1167 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1168 assert_eq!(deleted, 1);
1169 assert_eq!(store.node_count(), 1);
1170 assert_eq!(store.edge_count(), 0); }
1172
1173 #[test]
1176 fn test_add_label() {
1177 let store = create_test_store();
1178
1179 let node = store.create_node(&["Person"]);
1180
1181 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1182 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1183 builder.advance_row();
1184
1185 let mut op = AddLabelOperator::new(
1186 Arc::clone(&store),
1187 MockInput::boxed(builder.finish()),
1188 0,
1189 vec!["Employee".to_string()],
1190 vec![LogicalType::Int64],
1191 );
1192
1193 let chunk = op.next().unwrap().unwrap();
1194 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1195 assert_eq!(updated, 1);
1196
1197 let node_data = store.get_node(node).unwrap();
1199 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1200 assert!(labels.contains(&"Person"));
1201 assert!(labels.contains(&"Employee"));
1202 }
1203
1204 #[test]
1205 fn test_add_multiple_labels() {
1206 let store = create_test_store();
1207
1208 let node = store.create_node(&["Base"]);
1209
1210 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1211 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1212 builder.advance_row();
1213
1214 let mut op = AddLabelOperator::new(
1215 Arc::clone(&store),
1216 MockInput::boxed(builder.finish()),
1217 0,
1218 vec!["LabelA".to_string(), "LabelB".to_string()],
1219 vec![LogicalType::Int64],
1220 );
1221
1222 let chunk = op.next().unwrap().unwrap();
1223 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1224 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1227 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1228 assert!(labels.contains(&"LabelA"));
1229 assert!(labels.contains(&"LabelB"));
1230 }
1231
1232 #[test]
1233 fn test_add_label_no_input_returns_none() {
1234 let store = create_test_store();
1235
1236 struct EmptyInput;
1237 impl Operator for EmptyInput {
1238 fn next(&mut self) -> OperatorResult {
1239 Ok(None)
1240 }
1241 fn reset(&mut self) {}
1242 fn name(&self) -> &'static str {
1243 "EmptyInput"
1244 }
1245 }
1246
1247 let mut op = AddLabelOperator::new(
1248 Arc::clone(&store),
1249 Box::new(EmptyInput),
1250 0,
1251 vec!["Foo".to_string()],
1252 vec![LogicalType::Int64],
1253 );
1254
1255 assert!(op.next().unwrap().is_none());
1256 }
1257
1258 #[test]
1261 fn test_remove_label() {
1262 let store = create_test_store();
1263
1264 let node = store.create_node(&["Person", "Employee"]);
1265
1266 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1267 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1268 builder.advance_row();
1269
1270 let mut op = RemoveLabelOperator::new(
1271 Arc::clone(&store),
1272 MockInput::boxed(builder.finish()),
1273 0,
1274 vec!["Employee".to_string()],
1275 vec![LogicalType::Int64],
1276 );
1277
1278 let chunk = op.next().unwrap().unwrap();
1279 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1280 assert_eq!(updated, 1);
1281
1282 let node_data = store.get_node(node).unwrap();
1284 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1285 assert!(labels.contains(&"Person"));
1286 assert!(!labels.contains(&"Employee"));
1287 }
1288
1289 #[test]
1290 fn test_remove_nonexistent_label() {
1291 let store = create_test_store();
1292
1293 let node = store.create_node(&["Person"]);
1294
1295 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1296 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1297 builder.advance_row();
1298
1299 let mut op = RemoveLabelOperator::new(
1300 Arc::clone(&store),
1301 MockInput::boxed(builder.finish()),
1302 0,
1303 vec!["NonExistent".to_string()],
1304 vec![LogicalType::Int64],
1305 );
1306
1307 let chunk = op.next().unwrap().unwrap();
1308 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1309 assert_eq!(updated, 0); }
1311
1312 #[test]
1315 fn test_set_node_property_constant() {
1316 let store = create_test_store();
1317
1318 let node = store.create_node(&["Person"]);
1319
1320 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1321 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1322 builder.advance_row();
1323
1324 let mut op = SetPropertyOperator::new_for_node(
1325 Arc::clone(&store),
1326 MockInput::boxed(builder.finish()),
1327 0,
1328 vec![(
1329 "name".to_string(),
1330 PropertySource::Constant(Value::String("Alice".into())),
1331 )],
1332 vec![LogicalType::Int64],
1333 );
1334
1335 let chunk = op.next().unwrap().unwrap();
1336 assert_eq!(chunk.row_count(), 1);
1337
1338 let node_data = store.get_node(node).unwrap();
1340 assert_eq!(
1341 node_data
1342 .properties
1343 .get(&grafeo_common::types::PropertyKey::new("name")),
1344 Some(&Value::String("Alice".into()))
1345 );
1346 }
1347
1348 #[test]
1349 fn test_set_node_property_from_column() {
1350 let store = create_test_store();
1351
1352 let node = store.create_node(&["Person"]);
1353
1354 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1356 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1357 builder
1358 .column_mut(1)
1359 .unwrap()
1360 .push_value(Value::String("Bob".into()));
1361 builder.advance_row();
1362
1363 let mut op = SetPropertyOperator::new_for_node(
1364 Arc::clone(&store),
1365 MockInput::boxed(builder.finish()),
1366 0,
1367 vec![("name".to_string(), PropertySource::Column(1))],
1368 vec![LogicalType::Int64, LogicalType::String],
1369 );
1370
1371 let chunk = op.next().unwrap().unwrap();
1372 assert_eq!(chunk.row_count(), 1);
1373
1374 let node_data = store.get_node(node).unwrap();
1375 assert_eq!(
1376 node_data
1377 .properties
1378 .get(&grafeo_common::types::PropertyKey::new("name")),
1379 Some(&Value::String("Bob".into()))
1380 );
1381 }
1382
1383 #[test]
1384 fn test_set_edge_property() {
1385 let store = create_test_store();
1386
1387 let n1 = store.create_node(&["N"]);
1388 let n2 = store.create_node(&["N"]);
1389 let eid = store.create_edge(n1, n2, "KNOWS");
1390
1391 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1392 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1393 builder.advance_row();
1394
1395 let mut op = SetPropertyOperator::new_for_edge(
1396 Arc::clone(&store),
1397 MockInput::boxed(builder.finish()),
1398 0,
1399 vec![(
1400 "weight".to_string(),
1401 PropertySource::Constant(Value::Float64(0.75)),
1402 )],
1403 vec![LogicalType::Int64],
1404 );
1405
1406 let chunk = op.next().unwrap().unwrap();
1407 assert_eq!(chunk.row_count(), 1);
1408
1409 let edge_data = store.get_edge(eid).unwrap();
1410 assert_eq!(
1411 edge_data
1412 .properties
1413 .get(&grafeo_common::types::PropertyKey::new("weight")),
1414 Some(&Value::Float64(0.75))
1415 );
1416 }
1417
1418 #[test]
1419 fn test_set_multiple_properties() {
1420 let store = create_test_store();
1421
1422 let node = store.create_node(&["Person"]);
1423
1424 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1425 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1426 builder.advance_row();
1427
1428 let mut op = SetPropertyOperator::new_for_node(
1429 Arc::clone(&store),
1430 MockInput::boxed(builder.finish()),
1431 0,
1432 vec![
1433 (
1434 "name".to_string(),
1435 PropertySource::Constant(Value::String("Alice".into())),
1436 ),
1437 (
1438 "age".to_string(),
1439 PropertySource::Constant(Value::Int64(30)),
1440 ),
1441 ],
1442 vec![LogicalType::Int64],
1443 );
1444
1445 op.next().unwrap().unwrap();
1446
1447 let node_data = store.get_node(node).unwrap();
1448 assert_eq!(
1449 node_data
1450 .properties
1451 .get(&grafeo_common::types::PropertyKey::new("name")),
1452 Some(&Value::String("Alice".into()))
1453 );
1454 assert_eq!(
1455 node_data
1456 .properties
1457 .get(&grafeo_common::types::PropertyKey::new("age")),
1458 Some(&Value::Int64(30))
1459 );
1460 }
1461
1462 #[test]
1463 fn test_set_property_no_input_returns_none() {
1464 let store = create_test_store();
1465
1466 struct EmptyInput;
1467 impl Operator for EmptyInput {
1468 fn next(&mut self) -> OperatorResult {
1469 Ok(None)
1470 }
1471 fn reset(&mut self) {}
1472 fn name(&self) -> &'static str {
1473 "EmptyInput"
1474 }
1475 }
1476
1477 let mut op = SetPropertyOperator::new_for_node(
1478 Arc::clone(&store),
1479 Box::new(EmptyInput),
1480 0,
1481 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1482 vec![LogicalType::Int64],
1483 );
1484
1485 assert!(op.next().unwrap().is_none());
1486 }
1487
1488 #[test]
1491 fn test_operator_names() {
1492 let store = create_test_store();
1493
1494 struct EmptyInput;
1495 impl Operator for EmptyInput {
1496 fn next(&mut self) -> OperatorResult {
1497 Ok(None)
1498 }
1499 fn reset(&mut self) {}
1500 fn name(&self) -> &'static str {
1501 "EmptyInput"
1502 }
1503 }
1504
1505 let op = DeleteEdgeOperator::new(
1506 Arc::clone(&store),
1507 Box::new(EmptyInput),
1508 0,
1509 vec![LogicalType::Int64],
1510 );
1511 assert_eq!(op.name(), "DeleteEdge");
1512
1513 let op = AddLabelOperator::new(
1514 Arc::clone(&store),
1515 Box::new(EmptyInput),
1516 0,
1517 vec!["L".to_string()],
1518 vec![LogicalType::Int64],
1519 );
1520 assert_eq!(op.name(), "AddLabel");
1521
1522 let op = RemoveLabelOperator::new(
1523 Arc::clone(&store),
1524 Box::new(EmptyInput),
1525 0,
1526 vec!["L".to_string()],
1527 vec![LogicalType::Int64],
1528 );
1529 assert_eq!(op.name(), "RemoveLabel");
1530
1531 let op = SetPropertyOperator::new_for_node(
1532 Arc::clone(&store),
1533 Box::new(EmptyInput),
1534 0,
1535 vec![],
1536 vec![LogicalType::Int64],
1537 );
1538 assert_eq!(op.name(), "SetProperty");
1539 }
1540}