1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17pub trait ConstraintValidator: Send + Sync {
22 fn validate_node_property(
26 &self,
27 labels: &[String],
28 key: &str,
29 value: &Value,
30 ) -> Result<(), OperatorError>;
31
32 fn validate_node_complete(
36 &self,
37 labels: &[String],
38 properties: &[(String, Value)],
39 ) -> Result<(), OperatorError>;
40
41 fn check_unique_node_property(
45 &self,
46 labels: &[String],
47 key: &str,
48 value: &Value,
49 ) -> Result<(), OperatorError>;
50
51 fn validate_edge_property(
53 &self,
54 edge_type: &str,
55 key: &str,
56 value: &Value,
57 ) -> Result<(), OperatorError>;
58
59 fn validate_edge_complete(
61 &self,
62 edge_type: &str,
63 properties: &[(String, Value)],
64 ) -> Result<(), OperatorError>;
65}
66
67pub struct CreateNodeOperator {
72 store: Arc<dyn GraphStoreMut>,
74 input: Option<Box<dyn Operator>>,
76 labels: Vec<String>,
78 properties: Vec<(String, PropertySource)>,
80 output_schema: Vec<LogicalType>,
82 output_column: usize,
84 executed: bool,
86 viewing_epoch: Option<EpochId>,
88 tx_id: Option<TxId>,
90 validator: Option<Arc<dyn ConstraintValidator>>,
92}
93
94#[derive(Debug, Clone)]
96pub enum PropertySource {
97 Column(usize),
99 Constant(Value),
101 PropertyAccess {
103 column: usize,
105 property: String,
107 },
108}
109
110impl PropertySource {
111 pub fn resolve(
113 &self,
114 chunk: &crate::execution::chunk::DataChunk,
115 row: usize,
116 store: &dyn GraphStore,
117 ) -> Value {
118 match self {
119 PropertySource::Column(col_idx) => chunk
120 .column(*col_idx)
121 .and_then(|c| c.get_value(row))
122 .unwrap_or(Value::Null),
123 PropertySource::Constant(v) => v.clone(),
124 PropertySource::PropertyAccess { column, property } => {
125 let Some(col) = chunk.column(*column) else {
126 return Value::Null;
127 };
128 if let Some(node_id) = col.get_node_id(row) {
130 store
131 .get_node(node_id)
132 .and_then(|node| node.get_property(property).cloned())
133 .unwrap_or(Value::Null)
134 } else if let Some(edge_id) = col.get_edge_id(row) {
135 store
136 .get_edge(edge_id)
137 .and_then(|edge| edge.get_property(property).cloned())
138 .unwrap_or(Value::Null)
139 } else if let Some(Value::Map(map)) = col.get_value(row) {
140 let key = PropertyKey::new(property);
141 map.get(&key).cloned().unwrap_or(Value::Null)
142 } else {
143 Value::Null
144 }
145 }
146 }
147 }
148}
149
150impl CreateNodeOperator {
151 pub fn new(
161 store: Arc<dyn GraphStoreMut>,
162 input: Option<Box<dyn Operator>>,
163 labels: Vec<String>,
164 properties: Vec<(String, PropertySource)>,
165 output_schema: Vec<LogicalType>,
166 output_column: usize,
167 ) -> Self {
168 Self {
169 store,
170 input,
171 labels,
172 properties,
173 output_schema,
174 output_column,
175 executed: false,
176 viewing_epoch: None,
177 tx_id: None,
178 validator: None,
179 }
180 }
181
182 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
184 self.viewing_epoch = Some(epoch);
185 self.tx_id = tx_id;
186 self
187 }
188
189 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
191 self.validator = Some(validator);
192 self
193 }
194}
195
196impl CreateNodeOperator {
197 fn validate_and_set_properties(
199 &self,
200 node_id: NodeId,
201 resolved_props: &[(String, Value)],
202 ) -> Result<(), OperatorError> {
203 if let Some(ref validator) = self.validator {
205 for (name, value) in resolved_props {
206 validator.validate_node_property(&self.labels, name, value)?;
207 validator.check_unique_node_property(&self.labels, name, value)?;
208 }
209 validator.validate_node_complete(&self.labels, resolved_props)?;
211 }
212
213 for (name, value) in resolved_props {
215 self.store.set_node_property(node_id, name, value.clone());
216 }
217 Ok(())
218 }
219}
220
221impl Operator for CreateNodeOperator {
222 fn next(&mut self) -> OperatorResult {
223 let epoch = self
225 .viewing_epoch
226 .unwrap_or_else(|| self.store.current_epoch());
227 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
228
229 if let Some(ref mut input) = self.input {
230 if let Some(chunk) = input.next()? {
232 let mut builder =
233 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
234
235 for row in chunk.selected_indices() {
236 let resolved_props: Vec<(String, Value)> = self
238 .properties
239 .iter()
240 .map(|(name, source)| {
241 let value =
242 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
243 (name.clone(), value)
244 })
245 .collect();
246
247 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
249 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
250
251 self.validate_and_set_properties(node_id, &resolved_props)?;
253
254 for col_idx in 0..chunk.column_count() {
256 if col_idx < self.output_column
257 && let (Some(src), Some(dst)) =
258 (chunk.column(col_idx), builder.column_mut(col_idx))
259 {
260 if let Some(val) = src.get_value(row) {
261 dst.push_value(val);
262 } else {
263 dst.push_value(Value::Null);
264 }
265 }
266 }
267
268 if let Some(dst) = builder.column_mut(self.output_column) {
270 dst.push_value(Value::Int64(node_id.0 as i64));
271 }
272
273 builder.advance_row();
274 }
275
276 return Ok(Some(builder.finish()));
277 }
278 Ok(None)
279 } else {
280 if self.executed {
282 return Ok(None);
283 }
284 self.executed = true;
285
286 let resolved_props: Vec<(String, Value)> = self
288 .properties
289 .iter()
290 .filter_map(|(name, source)| {
291 if let PropertySource::Constant(value) = source {
292 Some((name.clone(), value.clone()))
293 } else {
294 None
295 }
296 })
297 .collect();
298
299 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
301 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
302
303 self.validate_and_set_properties(node_id, &resolved_props)?;
305
306 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
308 if let Some(dst) = builder.column_mut(self.output_column) {
309 dst.push_value(Value::Int64(node_id.0 as i64));
310 }
311 builder.advance_row();
312
313 Ok(Some(builder.finish()))
314 }
315 }
316
317 fn reset(&mut self) {
318 if let Some(ref mut input) = self.input {
319 input.reset();
320 }
321 self.executed = false;
322 }
323
324 fn name(&self) -> &'static str {
325 "CreateNode"
326 }
327}
328
329pub struct CreateEdgeOperator {
331 store: Arc<dyn GraphStoreMut>,
333 input: Box<dyn Operator>,
335 from_column: usize,
337 to_column: usize,
339 edge_type: String,
341 properties: Vec<(String, PropertySource)>,
343 output_schema: Vec<LogicalType>,
345 output_column: Option<usize>,
347 viewing_epoch: Option<EpochId>,
349 tx_id: Option<TxId>,
351 validator: Option<Arc<dyn ConstraintValidator>>,
353}
354
355impl CreateEdgeOperator {
356 pub fn new(
363 store: Arc<dyn GraphStoreMut>,
364 input: Box<dyn Operator>,
365 from_column: usize,
366 to_column: usize,
367 edge_type: String,
368 output_schema: Vec<LogicalType>,
369 ) -> Self {
370 Self {
371 store,
372 input,
373 from_column,
374 to_column,
375 edge_type,
376 properties: Vec::new(),
377 output_schema,
378 output_column: None,
379 viewing_epoch: None,
380 tx_id: None,
381 validator: None,
382 }
383 }
384
385 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
387 self.properties = properties;
388 self
389 }
390
391 pub fn with_output_column(mut self, column: usize) -> Self {
393 self.output_column = Some(column);
394 self
395 }
396
397 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
399 self.viewing_epoch = Some(epoch);
400 self.tx_id = tx_id;
401 self
402 }
403
404 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
406 self.validator = Some(validator);
407 self
408 }
409}
410
411impl Operator for CreateEdgeOperator {
412 fn next(&mut self) -> OperatorResult {
413 let epoch = self
415 .viewing_epoch
416 .unwrap_or_else(|| self.store.current_epoch());
417 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
418
419 if let Some(chunk) = self.input.next()? {
420 let mut builder =
421 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
422
423 for row in chunk.selected_indices() {
424 let from_id = chunk
426 .column(self.from_column)
427 .and_then(|c| c.get_value(row))
428 .ok_or_else(|| {
429 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
430 })?;
431
432 let to_id = chunk
433 .column(self.to_column)
434 .and_then(|c| c.get_value(row))
435 .ok_or_else(|| {
436 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
437 })?;
438
439 let from_node_id = match from_id {
441 Value::Int64(id) => NodeId(id as u64),
442 _ => {
443 return Err(OperatorError::TypeMismatch {
444 expected: "Int64 (node ID)".to_string(),
445 found: format!("{from_id:?}"),
446 });
447 }
448 };
449
450 let to_node_id = match to_id {
451 Value::Int64(id) => NodeId(id as u64),
452 _ => {
453 return Err(OperatorError::TypeMismatch {
454 expected: "Int64 (node ID)".to_string(),
455 found: format!("{to_id:?}"),
456 });
457 }
458 };
459
460 let resolved_props: Vec<(String, Value)> = self
462 .properties
463 .iter()
464 .map(|(name, source)| {
465 let value =
466 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
467 (name.clone(), value)
468 })
469 .collect();
470
471 if let Some(ref validator) = self.validator {
473 for (name, value) in &resolved_props {
474 validator.validate_edge_property(&self.edge_type, name, value)?;
475 }
476 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
477 }
478
479 let edge_id = self.store.create_edge_versioned(
481 from_node_id,
482 to_node_id,
483 &self.edge_type,
484 epoch,
485 tx,
486 );
487
488 for (name, value) in resolved_props {
490 self.store.set_edge_property(edge_id, &name, value);
491 }
492
493 for col_idx in 0..chunk.column_count() {
495 if let (Some(src), Some(dst)) =
496 (chunk.column(col_idx), builder.column_mut(col_idx))
497 {
498 if let Some(val) = src.get_value(row) {
499 dst.push_value(val);
500 } else {
501 dst.push_value(Value::Null);
502 }
503 }
504 }
505
506 if let Some(out_col) = self.output_column
508 && let Some(dst) = builder.column_mut(out_col)
509 {
510 dst.push_value(Value::Int64(edge_id.0 as i64));
511 }
512
513 builder.advance_row();
514 }
515
516 return Ok(Some(builder.finish()));
517 }
518 Ok(None)
519 }
520
521 fn reset(&mut self) {
522 self.input.reset();
523 }
524
525 fn name(&self) -> &'static str {
526 "CreateEdge"
527 }
528}
529
530pub struct DeleteNodeOperator {
532 store: Arc<dyn GraphStoreMut>,
534 input: Box<dyn Operator>,
536 node_column: usize,
538 output_schema: Vec<LogicalType>,
540 detach: bool,
542 viewing_epoch: Option<EpochId>,
544 tx_id: Option<TxId>,
546}
547
548impl DeleteNodeOperator {
549 pub fn new(
551 store: Arc<dyn GraphStoreMut>,
552 input: Box<dyn Operator>,
553 node_column: usize,
554 output_schema: Vec<LogicalType>,
555 detach: bool,
556 ) -> Self {
557 Self {
558 store,
559 input,
560 node_column,
561 output_schema,
562 detach,
563 viewing_epoch: None,
564 tx_id: None,
565 }
566 }
567
568 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
570 self.viewing_epoch = Some(epoch);
571 self.tx_id = tx_id;
572 self
573 }
574}
575
576impl Operator for DeleteNodeOperator {
577 fn next(&mut self) -> OperatorResult {
578 let epoch = self
580 .viewing_epoch
581 .unwrap_or_else(|| self.store.current_epoch());
582 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
583
584 if let Some(chunk) = self.input.next()? {
585 let mut deleted_count = 0;
586
587 for row in chunk.selected_indices() {
588 let node_val = chunk
589 .column(self.node_column)
590 .and_then(|c| c.get_value(row))
591 .ok_or_else(|| {
592 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
593 })?;
594
595 let node_id = match node_val {
596 Value::Int64(id) => NodeId(id as u64),
597 _ => {
598 return Err(OperatorError::TypeMismatch {
599 expected: "Int64 (node ID)".to_string(),
600 found: format!("{node_val:?}"),
601 });
602 }
603 };
604
605 if self.detach {
606 self.store.delete_node_edges(node_id);
608 } else {
609 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
611 if degree > 0 {
612 return Err(OperatorError::ConstraintViolation(format!(
613 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
614 degree
615 )));
616 }
617 }
618
619 if self.store.delete_node_versioned(node_id, epoch, tx) {
621 deleted_count += 1;
622 }
623 }
624
625 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627 if let Some(dst) = builder.column_mut(0) {
628 dst.push_value(Value::Int64(deleted_count));
629 }
630 builder.advance_row();
631
632 return Ok(Some(builder.finish()));
633 }
634 Ok(None)
635 }
636
637 fn reset(&mut self) {
638 self.input.reset();
639 }
640
641 fn name(&self) -> &'static str {
642 "DeleteNode"
643 }
644}
645
646pub struct DeleteEdgeOperator {
648 store: Arc<dyn GraphStoreMut>,
650 input: Box<dyn Operator>,
652 edge_column: usize,
654 output_schema: Vec<LogicalType>,
656 viewing_epoch: Option<EpochId>,
658 tx_id: Option<TxId>,
660}
661
662impl DeleteEdgeOperator {
663 pub fn new(
665 store: Arc<dyn GraphStoreMut>,
666 input: Box<dyn Operator>,
667 edge_column: usize,
668 output_schema: Vec<LogicalType>,
669 ) -> Self {
670 Self {
671 store,
672 input,
673 edge_column,
674 output_schema,
675 viewing_epoch: None,
676 tx_id: None,
677 }
678 }
679
680 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
682 self.viewing_epoch = Some(epoch);
683 self.tx_id = tx_id;
684 self
685 }
686}
687
688impl Operator for DeleteEdgeOperator {
689 fn next(&mut self) -> OperatorResult {
690 let epoch = self
692 .viewing_epoch
693 .unwrap_or_else(|| self.store.current_epoch());
694 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
695
696 if let Some(chunk) = self.input.next()? {
697 let mut deleted_count = 0;
698
699 for row in chunk.selected_indices() {
700 let edge_val = chunk
701 .column(self.edge_column)
702 .and_then(|c| c.get_value(row))
703 .ok_or_else(|| {
704 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
705 })?;
706
707 let edge_id = match edge_val {
708 Value::Int64(id) => EdgeId(id as u64),
709 _ => {
710 return Err(OperatorError::TypeMismatch {
711 expected: "Int64 (edge ID)".to_string(),
712 found: format!("{edge_val:?}"),
713 });
714 }
715 };
716
717 if self.store.delete_edge_versioned(edge_id, epoch, tx) {
719 deleted_count += 1;
720 }
721 }
722
723 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725 if let Some(dst) = builder.column_mut(0) {
726 dst.push_value(Value::Int64(deleted_count));
727 }
728 builder.advance_row();
729
730 return Ok(Some(builder.finish()));
731 }
732 Ok(None)
733 }
734
735 fn reset(&mut self) {
736 self.input.reset();
737 }
738
739 fn name(&self) -> &'static str {
740 "DeleteEdge"
741 }
742}
743
744pub struct AddLabelOperator {
746 store: Arc<dyn GraphStoreMut>,
748 input: Box<dyn Operator>,
750 node_column: usize,
752 labels: Vec<String>,
754 output_schema: Vec<LogicalType>,
756}
757
758impl AddLabelOperator {
759 pub fn new(
761 store: Arc<dyn GraphStoreMut>,
762 input: Box<dyn Operator>,
763 node_column: usize,
764 labels: Vec<String>,
765 output_schema: Vec<LogicalType>,
766 ) -> Self {
767 Self {
768 store,
769 input,
770 node_column,
771 labels,
772 output_schema,
773 }
774 }
775}
776
777impl Operator for AddLabelOperator {
778 fn next(&mut self) -> OperatorResult {
779 if let Some(chunk) = self.input.next()? {
780 let mut updated_count = 0;
781
782 for row in chunk.selected_indices() {
783 let node_val = chunk
784 .column(self.node_column)
785 .and_then(|c| c.get_value(row))
786 .ok_or_else(|| {
787 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
788 })?;
789
790 let node_id = match node_val {
791 Value::Int64(id) => NodeId(id as u64),
792 _ => {
793 return Err(OperatorError::TypeMismatch {
794 expected: "Int64 (node ID)".to_string(),
795 found: format!("{node_val:?}"),
796 });
797 }
798 };
799
800 for label in &self.labels {
802 if self.store.add_label(node_id, label) {
803 updated_count += 1;
804 }
805 }
806 }
807
808 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
810 if let Some(dst) = builder.column_mut(0) {
811 dst.push_value(Value::Int64(updated_count));
812 }
813 builder.advance_row();
814
815 return Ok(Some(builder.finish()));
816 }
817 Ok(None)
818 }
819
820 fn reset(&mut self) {
821 self.input.reset();
822 }
823
824 fn name(&self) -> &'static str {
825 "AddLabel"
826 }
827}
828
829pub struct RemoveLabelOperator {
831 store: Arc<dyn GraphStoreMut>,
833 input: Box<dyn Operator>,
835 node_column: usize,
837 labels: Vec<String>,
839 output_schema: Vec<LogicalType>,
841}
842
843impl RemoveLabelOperator {
844 pub fn new(
846 store: Arc<dyn GraphStoreMut>,
847 input: Box<dyn Operator>,
848 node_column: usize,
849 labels: Vec<String>,
850 output_schema: Vec<LogicalType>,
851 ) -> Self {
852 Self {
853 store,
854 input,
855 node_column,
856 labels,
857 output_schema,
858 }
859 }
860}
861
862impl Operator for RemoveLabelOperator {
863 fn next(&mut self) -> OperatorResult {
864 if let Some(chunk) = self.input.next()? {
865 let mut updated_count = 0;
866
867 for row in chunk.selected_indices() {
868 let node_val = chunk
869 .column(self.node_column)
870 .and_then(|c| c.get_value(row))
871 .ok_or_else(|| {
872 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
873 })?;
874
875 let node_id = match node_val {
876 Value::Int64(id) => NodeId(id as u64),
877 _ => {
878 return Err(OperatorError::TypeMismatch {
879 expected: "Int64 (node ID)".to_string(),
880 found: format!("{node_val:?}"),
881 });
882 }
883 };
884
885 for label in &self.labels {
887 if self.store.remove_label(node_id, label) {
888 updated_count += 1;
889 }
890 }
891 }
892
893 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
895 if let Some(dst) = builder.column_mut(0) {
896 dst.push_value(Value::Int64(updated_count));
897 }
898 builder.advance_row();
899
900 return Ok(Some(builder.finish()));
901 }
902 Ok(None)
903 }
904
905 fn reset(&mut self) {
906 self.input.reset();
907 }
908
909 fn name(&self) -> &'static str {
910 "RemoveLabel"
911 }
912}
913
914pub struct SetPropertyOperator {
919 store: Arc<dyn GraphStoreMut>,
921 input: Box<dyn Operator>,
923 entity_column: usize,
925 is_edge: bool,
927 properties: Vec<(String, PropertySource)>,
929 output_schema: Vec<LogicalType>,
931 replace: bool,
933 validator: Option<Arc<dyn ConstraintValidator>>,
935 labels: Vec<String>,
937 edge_type_name: Option<String>,
939}
940
941impl SetPropertyOperator {
942 pub fn new_for_node(
944 store: Arc<dyn GraphStoreMut>,
945 input: Box<dyn Operator>,
946 node_column: usize,
947 properties: Vec<(String, PropertySource)>,
948 output_schema: Vec<LogicalType>,
949 ) -> Self {
950 Self {
951 store,
952 input,
953 entity_column: node_column,
954 is_edge: false,
955 properties,
956 output_schema,
957 replace: false,
958 validator: None,
959 labels: Vec::new(),
960 edge_type_name: None,
961 }
962 }
963
964 pub fn new_for_edge(
966 store: Arc<dyn GraphStoreMut>,
967 input: Box<dyn Operator>,
968 edge_column: usize,
969 properties: Vec<(String, PropertySource)>,
970 output_schema: Vec<LogicalType>,
971 ) -> Self {
972 Self {
973 store,
974 input,
975 entity_column: edge_column,
976 is_edge: true,
977 properties,
978 output_schema,
979 replace: false,
980 validator: None,
981 labels: Vec::new(),
982 edge_type_name: None,
983 }
984 }
985
986 pub fn with_replace(mut self, replace: bool) -> Self {
988 self.replace = replace;
989 self
990 }
991
992 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
994 self.validator = Some(validator);
995 self
996 }
997
998 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1000 self.labels = labels;
1001 self
1002 }
1003
1004 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1006 self.edge_type_name = Some(edge_type);
1007 self
1008 }
1009}
1010
1011impl Operator for SetPropertyOperator {
1012 fn next(&mut self) -> OperatorResult {
1013 if let Some(chunk) = self.input.next()? {
1014 let mut builder =
1015 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1016
1017 for row in chunk.selected_indices() {
1018 let entity_val = chunk
1019 .column(self.entity_column)
1020 .and_then(|c| c.get_value(row))
1021 .ok_or_else(|| {
1022 OperatorError::ColumnNotFound(format!(
1023 "entity column {}",
1024 self.entity_column
1025 ))
1026 })?;
1027
1028 let entity_id = match entity_val {
1029 Value::Int64(id) => id as u64,
1030 _ => {
1031 return Err(OperatorError::TypeMismatch {
1032 expected: "Int64 (entity ID)".to_string(),
1033 found: format!("{entity_val:?}"),
1034 });
1035 }
1036 };
1037
1038 let resolved_props: Vec<(String, Value)> = self
1040 .properties
1041 .iter()
1042 .map(|(name, source)| {
1043 let value =
1044 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1045 (name.clone(), value)
1046 })
1047 .collect();
1048
1049 if let Some(ref validator) = self.validator {
1051 if self.is_edge {
1052 if let Some(ref et) = self.edge_type_name {
1053 for (name, value) in &resolved_props {
1054 validator.validate_edge_property(et, name, value)?;
1055 }
1056 }
1057 } else {
1058 for (name, value) in &resolved_props {
1059 validator.validate_node_property(&self.labels, name, value)?;
1060 validator.check_unique_node_property(&self.labels, name, value)?;
1061 }
1062 }
1063 }
1064
1065 for (prop_name, value) in resolved_props {
1067 if prop_name == "*" {
1068 if let Value::Map(map) = value {
1070 if self.replace {
1071 if self.is_edge {
1073 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1074 let keys: Vec<String> = edge
1075 .properties
1076 .iter()
1077 .map(|(k, _)| k.as_str().to_string())
1078 .collect();
1079 for key in keys {
1080 self.store
1081 .remove_edge_property(EdgeId(entity_id), &key);
1082 }
1083 }
1084 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1085 let keys: Vec<String> = node
1086 .properties
1087 .iter()
1088 .map(|(k, _)| k.as_str().to_string())
1089 .collect();
1090 for key in keys {
1091 self.store.remove_node_property(NodeId(entity_id), &key);
1092 }
1093 }
1094 }
1095 for (key, val) in map.iter() {
1097 if self.is_edge {
1098 self.store.set_edge_property(
1099 EdgeId(entity_id),
1100 key.as_str(),
1101 val.clone(),
1102 );
1103 } else {
1104 self.store.set_node_property(
1105 NodeId(entity_id),
1106 key.as_str(),
1107 val.clone(),
1108 );
1109 }
1110 }
1111 }
1112 } else if self.is_edge {
1113 self.store
1114 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1115 } else {
1116 self.store
1117 .set_node_property(NodeId(entity_id), &prop_name, value);
1118 }
1119 }
1120
1121 for col_idx in 0..chunk.column_count() {
1123 if let (Some(src), Some(dst)) =
1124 (chunk.column(col_idx), builder.column_mut(col_idx))
1125 {
1126 if let Some(val) = src.get_value(row) {
1127 dst.push_value(val);
1128 } else {
1129 dst.push_value(Value::Null);
1130 }
1131 }
1132 }
1133
1134 builder.advance_row();
1135 }
1136
1137 return Ok(Some(builder.finish()));
1138 }
1139 Ok(None)
1140 }
1141
1142 fn reset(&mut self) {
1143 self.input.reset();
1144 }
1145
1146 fn name(&self) -> &'static str {
1147 "SetProperty"
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154 use crate::execution::DataChunk;
1155 use crate::execution::chunk::DataChunkBuilder;
1156 use crate::graph::lpg::LpgStore;
1157
1158 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1159 Arc::new(LpgStore::new())
1160 }
1161
1162 #[test]
1163 fn test_create_node_standalone() {
1164 let store = create_test_store();
1165
1166 let mut op = CreateNodeOperator::new(
1167 Arc::clone(&store),
1168 None,
1169 vec!["Person".to_string()],
1170 vec![(
1171 "name".to_string(),
1172 PropertySource::Constant(Value::String("Alice".into())),
1173 )],
1174 vec![LogicalType::Int64],
1175 0,
1176 );
1177
1178 let chunk = op.next().unwrap().unwrap();
1180 assert_eq!(chunk.row_count(), 1);
1181
1182 assert!(op.next().unwrap().is_none());
1184
1185 assert_eq!(store.node_count(), 1);
1187 }
1188
1189 #[test]
1190 fn test_create_edge() {
1191 let store = create_test_store();
1192
1193 let node1 = store.create_node(&["Person"]);
1195 let node2 = store.create_node(&["Person"]);
1196
1197 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1199 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1200 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1201 builder.advance_row();
1202 let input_chunk = builder.finish();
1203
1204 struct MockInput {
1206 chunk: Option<DataChunk>,
1207 }
1208 impl Operator for MockInput {
1209 fn next(&mut self) -> OperatorResult {
1210 Ok(self.chunk.take())
1211 }
1212 fn reset(&mut self) {}
1213 fn name(&self) -> &'static str {
1214 "MockInput"
1215 }
1216 }
1217
1218 let mut op = CreateEdgeOperator::new(
1219 Arc::clone(&store),
1220 Box::new(MockInput {
1221 chunk: Some(input_chunk),
1222 }),
1223 0, 1, "KNOWS".to_string(),
1226 vec![LogicalType::Int64, LogicalType::Int64],
1227 );
1228
1229 let _chunk = op.next().unwrap().unwrap();
1231
1232 assert_eq!(store.edge_count(), 1);
1234 }
1235
1236 #[test]
1237 fn test_delete_node() {
1238 let store = create_test_store();
1239
1240 let node_id = store.create_node(&["Person"]);
1242 assert_eq!(store.node_count(), 1);
1243
1244 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1246 builder.column_mut(0).unwrap().push_int64(node_id.0 as i64);
1247 builder.advance_row();
1248 let input_chunk = builder.finish();
1249
1250 struct MockInput {
1251 chunk: Option<DataChunk>,
1252 }
1253 impl Operator for MockInput {
1254 fn next(&mut self) -> OperatorResult {
1255 Ok(self.chunk.take())
1256 }
1257 fn reset(&mut self) {}
1258 fn name(&self) -> &'static str {
1259 "MockInput"
1260 }
1261 }
1262
1263 let mut op = DeleteNodeOperator::new(
1264 Arc::clone(&store),
1265 Box::new(MockInput {
1266 chunk: Some(input_chunk),
1267 }),
1268 0,
1269 vec![LogicalType::Int64],
1270 false,
1271 );
1272
1273 let chunk = op.next().unwrap().unwrap();
1275
1276 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1278 assert_eq!(deleted, 1);
1279 assert_eq!(store.node_count(), 0);
1280 }
1281
1282 struct MockInput {
1285 chunk: Option<DataChunk>,
1286 }
1287
1288 impl MockInput {
1289 fn boxed(chunk: DataChunk) -> Box<Self> {
1290 Box::new(Self { chunk: Some(chunk) })
1291 }
1292 }
1293
1294 impl Operator for MockInput {
1295 fn next(&mut self) -> OperatorResult {
1296 Ok(self.chunk.take())
1297 }
1298 fn reset(&mut self) {}
1299 fn name(&self) -> &'static str {
1300 "MockInput"
1301 }
1302 }
1303
1304 #[test]
1307 fn test_delete_edge() {
1308 let store = create_test_store();
1309
1310 let n1 = store.create_node(&["Person"]);
1311 let n2 = store.create_node(&["Person"]);
1312 let eid = store.create_edge(n1, n2, "KNOWS");
1313 assert_eq!(store.edge_count(), 1);
1314
1315 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1316 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1317 builder.advance_row();
1318
1319 let mut op = DeleteEdgeOperator::new(
1320 Arc::clone(&store),
1321 MockInput::boxed(builder.finish()),
1322 0,
1323 vec![LogicalType::Int64],
1324 );
1325
1326 let chunk = op.next().unwrap().unwrap();
1327 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1328 assert_eq!(deleted, 1);
1329 assert_eq!(store.edge_count(), 0);
1330 }
1331
1332 #[test]
1333 fn test_delete_edge_no_input_returns_none() {
1334 let store = create_test_store();
1335
1336 struct EmptyInput;
1338 impl Operator for EmptyInput {
1339 fn next(&mut self) -> OperatorResult {
1340 Ok(None)
1341 }
1342 fn reset(&mut self) {}
1343 fn name(&self) -> &'static str {
1344 "EmptyInput"
1345 }
1346 }
1347
1348 let mut op = DeleteEdgeOperator::new(
1349 Arc::clone(&store),
1350 Box::new(EmptyInput),
1351 0,
1352 vec![LogicalType::Int64],
1353 );
1354
1355 assert!(op.next().unwrap().is_none());
1356 }
1357
1358 #[test]
1359 fn test_delete_multiple_edges() {
1360 let store = create_test_store();
1361
1362 let n1 = store.create_node(&["N"]);
1363 let n2 = store.create_node(&["N"]);
1364 let e1 = store.create_edge(n1, n2, "R");
1365 let e2 = store.create_edge(n2, n1, "S");
1366 assert_eq!(store.edge_count(), 2);
1367
1368 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1369 builder.column_mut(0).unwrap().push_int64(e1.0 as i64);
1370 builder.advance_row();
1371 builder.column_mut(0).unwrap().push_int64(e2.0 as i64);
1372 builder.advance_row();
1373
1374 let mut op = DeleteEdgeOperator::new(
1375 Arc::clone(&store),
1376 MockInput::boxed(builder.finish()),
1377 0,
1378 vec![LogicalType::Int64],
1379 );
1380
1381 let chunk = op.next().unwrap().unwrap();
1382 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1383 assert_eq!(deleted, 2);
1384 assert_eq!(store.edge_count(), 0);
1385 }
1386
1387 #[test]
1390 fn test_delete_node_detach() {
1391 let store = create_test_store();
1392
1393 let n1 = store.create_node(&["Person"]);
1394 let n2 = store.create_node(&["Person"]);
1395 store.create_edge(n1, n2, "KNOWS");
1396 store.create_edge(n2, n1, "FOLLOWS");
1397 assert_eq!(store.edge_count(), 2);
1398
1399 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1400 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1401 builder.advance_row();
1402
1403 let mut op = DeleteNodeOperator::new(
1404 Arc::clone(&store),
1405 MockInput::boxed(builder.finish()),
1406 0,
1407 vec![LogicalType::Int64],
1408 true, );
1410
1411 let chunk = op.next().unwrap().unwrap();
1412 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1413 assert_eq!(deleted, 1);
1414 assert_eq!(store.node_count(), 1);
1415 assert_eq!(store.edge_count(), 0); }
1417
1418 #[test]
1421 fn test_add_label() {
1422 let store = create_test_store();
1423
1424 let node = store.create_node(&["Person"]);
1425
1426 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1427 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1428 builder.advance_row();
1429
1430 let mut op = AddLabelOperator::new(
1431 Arc::clone(&store),
1432 MockInput::boxed(builder.finish()),
1433 0,
1434 vec!["Employee".to_string()],
1435 vec![LogicalType::Int64],
1436 );
1437
1438 let chunk = op.next().unwrap().unwrap();
1439 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1440 assert_eq!(updated, 1);
1441
1442 let node_data = store.get_node(node).unwrap();
1444 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1445 assert!(labels.contains(&"Person"));
1446 assert!(labels.contains(&"Employee"));
1447 }
1448
1449 #[test]
1450 fn test_add_multiple_labels() {
1451 let store = create_test_store();
1452
1453 let node = store.create_node(&["Base"]);
1454
1455 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1456 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1457 builder.advance_row();
1458
1459 let mut op = AddLabelOperator::new(
1460 Arc::clone(&store),
1461 MockInput::boxed(builder.finish()),
1462 0,
1463 vec!["LabelA".to_string(), "LabelB".to_string()],
1464 vec![LogicalType::Int64],
1465 );
1466
1467 let chunk = op.next().unwrap().unwrap();
1468 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1469 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1472 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1473 assert!(labels.contains(&"LabelA"));
1474 assert!(labels.contains(&"LabelB"));
1475 }
1476
1477 #[test]
1478 fn test_add_label_no_input_returns_none() {
1479 let store = create_test_store();
1480
1481 struct EmptyInput;
1482 impl Operator for EmptyInput {
1483 fn next(&mut self) -> OperatorResult {
1484 Ok(None)
1485 }
1486 fn reset(&mut self) {}
1487 fn name(&self) -> &'static str {
1488 "EmptyInput"
1489 }
1490 }
1491
1492 let mut op = AddLabelOperator::new(
1493 Arc::clone(&store),
1494 Box::new(EmptyInput),
1495 0,
1496 vec!["Foo".to_string()],
1497 vec![LogicalType::Int64],
1498 );
1499
1500 assert!(op.next().unwrap().is_none());
1501 }
1502
1503 #[test]
1506 fn test_remove_label() {
1507 let store = create_test_store();
1508
1509 let node = store.create_node(&["Person", "Employee"]);
1510
1511 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1512 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1513 builder.advance_row();
1514
1515 let mut op = RemoveLabelOperator::new(
1516 Arc::clone(&store),
1517 MockInput::boxed(builder.finish()),
1518 0,
1519 vec!["Employee".to_string()],
1520 vec![LogicalType::Int64],
1521 );
1522
1523 let chunk = op.next().unwrap().unwrap();
1524 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1525 assert_eq!(updated, 1);
1526
1527 let node_data = store.get_node(node).unwrap();
1529 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1530 assert!(labels.contains(&"Person"));
1531 assert!(!labels.contains(&"Employee"));
1532 }
1533
1534 #[test]
1535 fn test_remove_nonexistent_label() {
1536 let store = create_test_store();
1537
1538 let node = store.create_node(&["Person"]);
1539
1540 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1541 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1542 builder.advance_row();
1543
1544 let mut op = RemoveLabelOperator::new(
1545 Arc::clone(&store),
1546 MockInput::boxed(builder.finish()),
1547 0,
1548 vec!["NonExistent".to_string()],
1549 vec![LogicalType::Int64],
1550 );
1551
1552 let chunk = op.next().unwrap().unwrap();
1553 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1554 assert_eq!(updated, 0); }
1556
1557 #[test]
1560 fn test_set_node_property_constant() {
1561 let store = create_test_store();
1562
1563 let node = store.create_node(&["Person"]);
1564
1565 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1566 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1567 builder.advance_row();
1568
1569 let mut op = SetPropertyOperator::new_for_node(
1570 Arc::clone(&store),
1571 MockInput::boxed(builder.finish()),
1572 0,
1573 vec![(
1574 "name".to_string(),
1575 PropertySource::Constant(Value::String("Alice".into())),
1576 )],
1577 vec![LogicalType::Int64],
1578 );
1579
1580 let chunk = op.next().unwrap().unwrap();
1581 assert_eq!(chunk.row_count(), 1);
1582
1583 let node_data = store.get_node(node).unwrap();
1585 assert_eq!(
1586 node_data
1587 .properties
1588 .get(&grafeo_common::types::PropertyKey::new("name")),
1589 Some(&Value::String("Alice".into()))
1590 );
1591 }
1592
1593 #[test]
1594 fn test_set_node_property_from_column() {
1595 let store = create_test_store();
1596
1597 let node = store.create_node(&["Person"]);
1598
1599 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1601 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1602 builder
1603 .column_mut(1)
1604 .unwrap()
1605 .push_value(Value::String("Bob".into()));
1606 builder.advance_row();
1607
1608 let mut op = SetPropertyOperator::new_for_node(
1609 Arc::clone(&store),
1610 MockInput::boxed(builder.finish()),
1611 0,
1612 vec![("name".to_string(), PropertySource::Column(1))],
1613 vec![LogicalType::Int64, LogicalType::String],
1614 );
1615
1616 let chunk = op.next().unwrap().unwrap();
1617 assert_eq!(chunk.row_count(), 1);
1618
1619 let node_data = store.get_node(node).unwrap();
1620 assert_eq!(
1621 node_data
1622 .properties
1623 .get(&grafeo_common::types::PropertyKey::new("name")),
1624 Some(&Value::String("Bob".into()))
1625 );
1626 }
1627
1628 #[test]
1629 fn test_set_edge_property() {
1630 let store = create_test_store();
1631
1632 let n1 = store.create_node(&["N"]);
1633 let n2 = store.create_node(&["N"]);
1634 let eid = store.create_edge(n1, n2, "KNOWS");
1635
1636 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1637 builder.column_mut(0).unwrap().push_int64(eid.0 as i64);
1638 builder.advance_row();
1639
1640 let mut op = SetPropertyOperator::new_for_edge(
1641 Arc::clone(&store),
1642 MockInput::boxed(builder.finish()),
1643 0,
1644 vec![(
1645 "weight".to_string(),
1646 PropertySource::Constant(Value::Float64(0.75)),
1647 )],
1648 vec![LogicalType::Int64],
1649 );
1650
1651 let chunk = op.next().unwrap().unwrap();
1652 assert_eq!(chunk.row_count(), 1);
1653
1654 let edge_data = store.get_edge(eid).unwrap();
1655 assert_eq!(
1656 edge_data
1657 .properties
1658 .get(&grafeo_common::types::PropertyKey::new("weight")),
1659 Some(&Value::Float64(0.75))
1660 );
1661 }
1662
1663 #[test]
1664 fn test_set_multiple_properties() {
1665 let store = create_test_store();
1666
1667 let node = store.create_node(&["Person"]);
1668
1669 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1670 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1671 builder.advance_row();
1672
1673 let mut op = SetPropertyOperator::new_for_node(
1674 Arc::clone(&store),
1675 MockInput::boxed(builder.finish()),
1676 0,
1677 vec![
1678 (
1679 "name".to_string(),
1680 PropertySource::Constant(Value::String("Alice".into())),
1681 ),
1682 (
1683 "age".to_string(),
1684 PropertySource::Constant(Value::Int64(30)),
1685 ),
1686 ],
1687 vec![LogicalType::Int64],
1688 );
1689
1690 op.next().unwrap().unwrap();
1691
1692 let node_data = store.get_node(node).unwrap();
1693 assert_eq!(
1694 node_data
1695 .properties
1696 .get(&grafeo_common::types::PropertyKey::new("name")),
1697 Some(&Value::String("Alice".into()))
1698 );
1699 assert_eq!(
1700 node_data
1701 .properties
1702 .get(&grafeo_common::types::PropertyKey::new("age")),
1703 Some(&Value::Int64(30))
1704 );
1705 }
1706
1707 #[test]
1708 fn test_set_property_no_input_returns_none() {
1709 let store = create_test_store();
1710
1711 struct EmptyInput;
1712 impl Operator for EmptyInput {
1713 fn next(&mut self) -> OperatorResult {
1714 Ok(None)
1715 }
1716 fn reset(&mut self) {}
1717 fn name(&self) -> &'static str {
1718 "EmptyInput"
1719 }
1720 }
1721
1722 let mut op = SetPropertyOperator::new_for_node(
1723 Arc::clone(&store),
1724 Box::new(EmptyInput),
1725 0,
1726 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1727 vec![LogicalType::Int64],
1728 );
1729
1730 assert!(op.next().unwrap().is_none());
1731 }
1732
1733 #[test]
1736 fn test_operator_names() {
1737 let store = create_test_store();
1738
1739 struct EmptyInput;
1740 impl Operator for EmptyInput {
1741 fn next(&mut self) -> OperatorResult {
1742 Ok(None)
1743 }
1744 fn reset(&mut self) {}
1745 fn name(&self) -> &'static str {
1746 "EmptyInput"
1747 }
1748 }
1749
1750 let op = DeleteEdgeOperator::new(
1751 Arc::clone(&store),
1752 Box::new(EmptyInput),
1753 0,
1754 vec![LogicalType::Int64],
1755 );
1756 assert_eq!(op.name(), "DeleteEdge");
1757
1758 let op = AddLabelOperator::new(
1759 Arc::clone(&store),
1760 Box::new(EmptyInput),
1761 0,
1762 vec!["L".to_string()],
1763 vec![LogicalType::Int64],
1764 );
1765 assert_eq!(op.name(), "AddLabel");
1766
1767 let op = RemoveLabelOperator::new(
1768 Arc::clone(&store),
1769 Box::new(EmptyInput),
1770 0,
1771 vec!["L".to_string()],
1772 vec![LogicalType::Int64],
1773 );
1774 assert_eq!(op.name(), "RemoveLabel");
1775
1776 let op = SetPropertyOperator::new_for_node(
1777 Arc::clone(&store),
1778 Box::new(EmptyInput),
1779 0,
1780 vec![],
1781 vec![LogicalType::Int64],
1782 );
1783 assert_eq!(op.name(), "SetProperty");
1784 }
1785}