1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
28 &self,
29 labels: &[String],
30 key: &str,
31 value: &Value,
32 ) -> Result<(), OperatorError>;
33
34 fn validate_node_complete(
38 &self,
39 labels: &[String],
40 properties: &[(String, Value)],
41 ) -> Result<(), OperatorError>;
42
43 fn check_unique_node_property(
47 &self,
48 labels: &[String],
49 key: &str,
50 value: &Value,
51 ) -> Result<(), OperatorError>;
52
53 fn validate_edge_property(
55 &self,
56 edge_type: &str,
57 key: &str,
58 value: &Value,
59 ) -> Result<(), OperatorError>;
60
61 fn validate_edge_complete(
63 &self,
64 edge_type: &str,
65 properties: &[(String, Value)],
66 ) -> Result<(), OperatorError>;
67}
68
69pub struct CreateNodeOperator {
74 store: Arc<dyn GraphStoreMut>,
76 input: Option<Box<dyn Operator>>,
78 labels: Vec<String>,
80 properties: Vec<(String, PropertySource)>,
82 output_schema: Vec<LogicalType>,
84 output_column: usize,
86 executed: bool,
88 viewing_epoch: Option<EpochId>,
90 transaction_id: Option<TransactionId>,
92 validator: Option<Arc<dyn ConstraintValidator>>,
94}
95
96#[derive(Debug, Clone)]
98pub enum PropertySource {
99 Column(usize),
101 Constant(Value),
103 PropertyAccess {
105 column: usize,
107 property: String,
109 },
110}
111
112impl PropertySource {
113 pub fn resolve(
115 &self,
116 chunk: &crate::execution::chunk::DataChunk,
117 row: usize,
118 store: &dyn GraphStore,
119 ) -> Value {
120 match self {
121 PropertySource::Column(col_idx) => chunk
122 .column(*col_idx)
123 .and_then(|c| c.get_value(row))
124 .unwrap_or(Value::Null),
125 PropertySource::Constant(v) => v.clone(),
126 PropertySource::PropertyAccess { column, property } => {
127 let Some(col) = chunk.column(*column) else {
128 return Value::Null;
129 };
130 if let Some(node_id) = col.get_node_id(row) {
132 store
133 .get_node(node_id)
134 .and_then(|node| node.get_property(property).cloned())
135 .unwrap_or(Value::Null)
136 } else if let Some(edge_id) = col.get_edge_id(row) {
137 store
138 .get_edge(edge_id)
139 .and_then(|edge| edge.get_property(property).cloned())
140 .unwrap_or(Value::Null)
141 } else if let Some(Value::Map(map)) = col.get_value(row) {
142 let key = PropertyKey::new(property);
143 map.get(&key).cloned().unwrap_or(Value::Null)
144 } else {
145 Value::Null
146 }
147 }
148 }
149 }
150}
151
152impl CreateNodeOperator {
153 pub fn new(
163 store: Arc<dyn GraphStoreMut>,
164 input: Option<Box<dyn Operator>>,
165 labels: Vec<String>,
166 properties: Vec<(String, PropertySource)>,
167 output_schema: Vec<LogicalType>,
168 output_column: usize,
169 ) -> Self {
170 Self {
171 store,
172 input,
173 labels,
174 properties,
175 output_schema,
176 output_column,
177 executed: false,
178 viewing_epoch: None,
179 transaction_id: None,
180 validator: None,
181 }
182 }
183
184 pub fn with_transaction_context(
186 mut self,
187 epoch: EpochId,
188 transaction_id: Option<TransactionId>,
189 ) -> Self {
190 self.viewing_epoch = Some(epoch);
191 self.transaction_id = transaction_id;
192 self
193 }
194
195 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
197 self.validator = Some(validator);
198 self
199 }
200}
201
202impl CreateNodeOperator {
203 fn validate_and_set_properties(
205 &self,
206 node_id: NodeId,
207 resolved_props: &[(String, Value)],
208 ) -> Result<(), OperatorError> {
209 if let Some(ref validator) = self.validator {
211 for (name, value) in resolved_props {
212 validator.validate_node_property(&self.labels, name, value)?;
213 validator.check_unique_node_property(&self.labels, name, value)?;
214 }
215 validator.validate_node_complete(&self.labels, resolved_props)?;
217 }
218
219 for (name, value) in resolved_props {
221 self.store.set_node_property(node_id, name, value.clone());
222 }
223 Ok(())
224 }
225}
226
227impl Operator for CreateNodeOperator {
228 fn next(&mut self) -> OperatorResult {
229 let epoch = self
231 .viewing_epoch
232 .unwrap_or_else(|| self.store.current_epoch());
233 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
234
235 if let Some(ref mut input) = self.input {
236 if let Some(chunk) = input.next()? {
238 let mut builder =
239 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
240
241 for row in chunk.selected_indices() {
242 let resolved_props: Vec<(String, Value)> = self
244 .properties
245 .iter()
246 .map(|(name, source)| {
247 let value =
248 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
249 (name.clone(), value)
250 })
251 .collect();
252
253 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
255 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
256
257 self.validate_and_set_properties(node_id, &resolved_props)?;
259
260 for col_idx in 0..chunk.column_count() {
262 if col_idx < self.output_column
263 && let (Some(src), Some(dst)) =
264 (chunk.column(col_idx), builder.column_mut(col_idx))
265 {
266 if let Some(val) = src.get_value(row) {
267 dst.push_value(val);
268 } else {
269 dst.push_value(Value::Null);
270 }
271 }
272 }
273
274 if let Some(dst) = builder.column_mut(self.output_column) {
276 dst.push_value(Value::Int64(node_id.0 as i64));
277 }
278
279 builder.advance_row();
280 }
281
282 return Ok(Some(builder.finish()));
283 }
284 Ok(None)
285 } else {
286 if self.executed {
288 return Ok(None);
289 }
290 self.executed = true;
291
292 let resolved_props: Vec<(String, Value)> = self
294 .properties
295 .iter()
296 .filter_map(|(name, source)| {
297 if let PropertySource::Constant(value) = source {
298 Some((name.clone(), value.clone()))
299 } else {
300 None
301 }
302 })
303 .collect();
304
305 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
307 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
308
309 self.validate_and_set_properties(node_id, &resolved_props)?;
311
312 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
314 if let Some(dst) = builder.column_mut(self.output_column) {
315 dst.push_value(Value::Int64(node_id.0 as i64));
316 }
317 builder.advance_row();
318
319 Ok(Some(builder.finish()))
320 }
321 }
322
323 fn reset(&mut self) {
324 if let Some(ref mut input) = self.input {
325 input.reset();
326 }
327 self.executed = false;
328 }
329
330 fn name(&self) -> &'static str {
331 "CreateNode"
332 }
333}
334
335pub struct CreateEdgeOperator {
337 store: Arc<dyn GraphStoreMut>,
339 input: Box<dyn Operator>,
341 from_column: usize,
343 to_column: usize,
345 edge_type: String,
347 properties: Vec<(String, PropertySource)>,
349 output_schema: Vec<LogicalType>,
351 output_column: Option<usize>,
353 viewing_epoch: Option<EpochId>,
355 transaction_id: Option<TransactionId>,
357 validator: Option<Arc<dyn ConstraintValidator>>,
359}
360
361impl CreateEdgeOperator {
362 pub fn new(
369 store: Arc<dyn GraphStoreMut>,
370 input: Box<dyn Operator>,
371 from_column: usize,
372 to_column: usize,
373 edge_type: String,
374 output_schema: Vec<LogicalType>,
375 ) -> Self {
376 Self {
377 store,
378 input,
379 from_column,
380 to_column,
381 edge_type,
382 properties: Vec::new(),
383 output_schema,
384 output_column: None,
385 viewing_epoch: None,
386 transaction_id: None,
387 validator: None,
388 }
389 }
390
391 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
393 self.properties = properties;
394 self
395 }
396
397 pub fn with_output_column(mut self, column: usize) -> Self {
399 self.output_column = Some(column);
400 self
401 }
402
403 pub fn with_transaction_context(
405 mut self,
406 epoch: EpochId,
407 transaction_id: Option<TransactionId>,
408 ) -> Self {
409 self.viewing_epoch = Some(epoch);
410 self.transaction_id = transaction_id;
411 self
412 }
413
414 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
416 self.validator = Some(validator);
417 self
418 }
419}
420
421impl Operator for CreateEdgeOperator {
422 fn next(&mut self) -> OperatorResult {
423 let epoch = self
425 .viewing_epoch
426 .unwrap_or_else(|| self.store.current_epoch());
427 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
428
429 if let Some(chunk) = self.input.next()? {
430 let mut builder =
431 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
432
433 for row in chunk.selected_indices() {
434 let from_id = chunk
436 .column(self.from_column)
437 .and_then(|c| c.get_value(row))
438 .ok_or_else(|| {
439 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
440 })?;
441
442 let to_id = chunk
443 .column(self.to_column)
444 .and_then(|c| c.get_value(row))
445 .ok_or_else(|| {
446 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
447 })?;
448
449 let from_node_id = match from_id {
451 Value::Int64(id) => NodeId(id as u64),
452 _ => {
453 return Err(OperatorError::TypeMismatch {
454 expected: "Int64 (node ID)".to_string(),
455 found: format!("{from_id:?}"),
456 });
457 }
458 };
459
460 let to_node_id = match to_id {
461 Value::Int64(id) => NodeId(id as u64),
462 _ => {
463 return Err(OperatorError::TypeMismatch {
464 expected: "Int64 (node ID)".to_string(),
465 found: format!("{to_id:?}"),
466 });
467 }
468 };
469
470 let resolved_props: Vec<(String, Value)> = self
472 .properties
473 .iter()
474 .map(|(name, source)| {
475 let value =
476 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
477 (name.clone(), value)
478 })
479 .collect();
480
481 if let Some(ref validator) = self.validator {
483 for (name, value) in &resolved_props {
484 validator.validate_edge_property(&self.edge_type, name, value)?;
485 }
486 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
487 }
488
489 let edge_id = self.store.create_edge_versioned(
491 from_node_id,
492 to_node_id,
493 &self.edge_type,
494 epoch,
495 tx,
496 );
497
498 for (name, value) in resolved_props {
500 self.store.set_edge_property(edge_id, &name, value);
501 }
502
503 for col_idx in 0..chunk.column_count() {
505 if let (Some(src), Some(dst)) =
506 (chunk.column(col_idx), builder.column_mut(col_idx))
507 {
508 if let Some(val) = src.get_value(row) {
509 dst.push_value(val);
510 } else {
511 dst.push_value(Value::Null);
512 }
513 }
514 }
515
516 if let Some(out_col) = self.output_column
518 && let Some(dst) = builder.column_mut(out_col)
519 {
520 dst.push_value(Value::Int64(edge_id.0 as i64));
521 }
522
523 builder.advance_row();
524 }
525
526 return Ok(Some(builder.finish()));
527 }
528 Ok(None)
529 }
530
531 fn reset(&mut self) {
532 self.input.reset();
533 }
534
535 fn name(&self) -> &'static str {
536 "CreateEdge"
537 }
538}
539
540pub struct DeleteNodeOperator {
542 store: Arc<dyn GraphStoreMut>,
544 input: Box<dyn Operator>,
546 node_column: usize,
548 output_schema: Vec<LogicalType>,
550 detach: bool,
552 viewing_epoch: Option<EpochId>,
554 transaction_id: Option<TransactionId>,
556}
557
558impl DeleteNodeOperator {
559 pub fn new(
561 store: Arc<dyn GraphStoreMut>,
562 input: Box<dyn Operator>,
563 node_column: usize,
564 output_schema: Vec<LogicalType>,
565 detach: bool,
566 ) -> Self {
567 Self {
568 store,
569 input,
570 node_column,
571 output_schema,
572 detach,
573 viewing_epoch: None,
574 transaction_id: None,
575 }
576 }
577
578 pub fn with_transaction_context(
580 mut self,
581 epoch: EpochId,
582 transaction_id: Option<TransactionId>,
583 ) -> Self {
584 self.viewing_epoch = Some(epoch);
585 self.transaction_id = transaction_id;
586 self
587 }
588}
589
590impl Operator for DeleteNodeOperator {
591 fn next(&mut self) -> OperatorResult {
592 let epoch = self
594 .viewing_epoch
595 .unwrap_or_else(|| self.store.current_epoch());
596 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
597
598 if let Some(chunk) = self.input.next()? {
599 let mut deleted_count = 0;
600
601 for row in chunk.selected_indices() {
602 let node_val = chunk
603 .column(self.node_column)
604 .and_then(|c| c.get_value(row))
605 .ok_or_else(|| {
606 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
607 })?;
608
609 let node_id = match node_val {
610 Value::Int64(id) => NodeId(id as u64),
611 _ => {
612 return Err(OperatorError::TypeMismatch {
613 expected: "Int64 (node ID)".to_string(),
614 found: format!("{node_val:?}"),
615 });
616 }
617 };
618
619 if self.detach {
620 self.store.delete_node_edges(node_id);
622 } else {
623 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
625 if degree > 0 {
626 return Err(OperatorError::ConstraintViolation(format!(
627 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
628 degree
629 )));
630 }
631 }
632
633 if self.store.delete_node_versioned(node_id, epoch, tx) {
635 deleted_count += 1;
636 }
637 }
638
639 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
641 if let Some(dst) = builder.column_mut(0) {
642 dst.push_value(Value::Int64(deleted_count));
643 }
644 builder.advance_row();
645
646 return Ok(Some(builder.finish()));
647 }
648 Ok(None)
649 }
650
651 fn reset(&mut self) {
652 self.input.reset();
653 }
654
655 fn name(&self) -> &'static str {
656 "DeleteNode"
657 }
658}
659
660pub struct DeleteEdgeOperator {
662 store: Arc<dyn GraphStoreMut>,
664 input: Box<dyn Operator>,
666 edge_column: usize,
668 output_schema: Vec<LogicalType>,
670 viewing_epoch: Option<EpochId>,
672 transaction_id: Option<TransactionId>,
674}
675
676impl DeleteEdgeOperator {
677 pub fn new(
679 store: Arc<dyn GraphStoreMut>,
680 input: Box<dyn Operator>,
681 edge_column: usize,
682 output_schema: Vec<LogicalType>,
683 ) -> Self {
684 Self {
685 store,
686 input,
687 edge_column,
688 output_schema,
689 viewing_epoch: None,
690 transaction_id: None,
691 }
692 }
693
694 pub fn with_transaction_context(
696 mut self,
697 epoch: EpochId,
698 transaction_id: Option<TransactionId>,
699 ) -> Self {
700 self.viewing_epoch = Some(epoch);
701 self.transaction_id = transaction_id;
702 self
703 }
704}
705
706impl Operator for DeleteEdgeOperator {
707 fn next(&mut self) -> OperatorResult {
708 let epoch = self
710 .viewing_epoch
711 .unwrap_or_else(|| self.store.current_epoch());
712 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
713
714 if let Some(chunk) = self.input.next()? {
715 let mut deleted_count = 0;
716
717 for row in chunk.selected_indices() {
718 let edge_val = chunk
719 .column(self.edge_column)
720 .and_then(|c| c.get_value(row))
721 .ok_or_else(|| {
722 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
723 })?;
724
725 let edge_id = match edge_val {
726 Value::Int64(id) => EdgeId(id as u64),
727 _ => {
728 return Err(OperatorError::TypeMismatch {
729 expected: "Int64 (edge ID)".to_string(),
730 found: format!("{edge_val:?}"),
731 });
732 }
733 };
734
735 if self.store.delete_edge_versioned(edge_id, epoch, tx) {
737 deleted_count += 1;
738 }
739 }
740
741 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
743 if let Some(dst) = builder.column_mut(0) {
744 dst.push_value(Value::Int64(deleted_count));
745 }
746 builder.advance_row();
747
748 return Ok(Some(builder.finish()));
749 }
750 Ok(None)
751 }
752
753 fn reset(&mut self) {
754 self.input.reset();
755 }
756
757 fn name(&self) -> &'static str {
758 "DeleteEdge"
759 }
760}
761
762pub struct AddLabelOperator {
764 store: Arc<dyn GraphStoreMut>,
766 input: Box<dyn Operator>,
768 node_column: usize,
770 labels: Vec<String>,
772 output_schema: Vec<LogicalType>,
774}
775
776impl AddLabelOperator {
777 pub fn new(
779 store: Arc<dyn GraphStoreMut>,
780 input: Box<dyn Operator>,
781 node_column: usize,
782 labels: Vec<String>,
783 output_schema: Vec<LogicalType>,
784 ) -> Self {
785 Self {
786 store,
787 input,
788 node_column,
789 labels,
790 output_schema,
791 }
792 }
793}
794
795impl Operator for AddLabelOperator {
796 fn next(&mut self) -> OperatorResult {
797 if let Some(chunk) = self.input.next()? {
798 let mut updated_count = 0;
799
800 for row in chunk.selected_indices() {
801 let node_val = chunk
802 .column(self.node_column)
803 .and_then(|c| c.get_value(row))
804 .ok_or_else(|| {
805 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
806 })?;
807
808 let node_id = match node_val {
809 Value::Int64(id) => NodeId(id as u64),
810 _ => {
811 return Err(OperatorError::TypeMismatch {
812 expected: "Int64 (node ID)".to_string(),
813 found: format!("{node_val:?}"),
814 });
815 }
816 };
817
818 for label in &self.labels {
820 if self.store.add_label(node_id, label) {
821 updated_count += 1;
822 }
823 }
824 }
825
826 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
828 if let Some(dst) = builder.column_mut(0) {
829 dst.push_value(Value::Int64(updated_count));
830 }
831 builder.advance_row();
832
833 return Ok(Some(builder.finish()));
834 }
835 Ok(None)
836 }
837
838 fn reset(&mut self) {
839 self.input.reset();
840 }
841
842 fn name(&self) -> &'static str {
843 "AddLabel"
844 }
845}
846
847pub struct RemoveLabelOperator {
849 store: Arc<dyn GraphStoreMut>,
851 input: Box<dyn Operator>,
853 node_column: usize,
855 labels: Vec<String>,
857 output_schema: Vec<LogicalType>,
859}
860
861impl RemoveLabelOperator {
862 pub fn new(
864 store: Arc<dyn GraphStoreMut>,
865 input: Box<dyn Operator>,
866 node_column: usize,
867 labels: Vec<String>,
868 output_schema: Vec<LogicalType>,
869 ) -> Self {
870 Self {
871 store,
872 input,
873 node_column,
874 labels,
875 output_schema,
876 }
877 }
878}
879
880impl Operator for RemoveLabelOperator {
881 fn next(&mut self) -> OperatorResult {
882 if let Some(chunk) = self.input.next()? {
883 let mut updated_count = 0;
884
885 for row in chunk.selected_indices() {
886 let node_val = chunk
887 .column(self.node_column)
888 .and_then(|c| c.get_value(row))
889 .ok_or_else(|| {
890 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
891 })?;
892
893 let node_id = match node_val {
894 Value::Int64(id) => NodeId(id as u64),
895 _ => {
896 return Err(OperatorError::TypeMismatch {
897 expected: "Int64 (node ID)".to_string(),
898 found: format!("{node_val:?}"),
899 });
900 }
901 };
902
903 for label in &self.labels {
905 if self.store.remove_label(node_id, label) {
906 updated_count += 1;
907 }
908 }
909 }
910
911 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
913 if let Some(dst) = builder.column_mut(0) {
914 dst.push_value(Value::Int64(updated_count));
915 }
916 builder.advance_row();
917
918 return Ok(Some(builder.finish()));
919 }
920 Ok(None)
921 }
922
923 fn reset(&mut self) {
924 self.input.reset();
925 }
926
927 fn name(&self) -> &'static str {
928 "RemoveLabel"
929 }
930}
931
932pub struct SetPropertyOperator {
937 store: Arc<dyn GraphStoreMut>,
939 input: Box<dyn Operator>,
941 entity_column: usize,
943 is_edge: bool,
945 properties: Vec<(String, PropertySource)>,
947 output_schema: Vec<LogicalType>,
949 replace: bool,
951 validator: Option<Arc<dyn ConstraintValidator>>,
953 labels: Vec<String>,
955 edge_type_name: Option<String>,
957}
958
959impl SetPropertyOperator {
960 pub fn new_for_node(
962 store: Arc<dyn GraphStoreMut>,
963 input: Box<dyn Operator>,
964 node_column: usize,
965 properties: Vec<(String, PropertySource)>,
966 output_schema: Vec<LogicalType>,
967 ) -> Self {
968 Self {
969 store,
970 input,
971 entity_column: node_column,
972 is_edge: false,
973 properties,
974 output_schema,
975 replace: false,
976 validator: None,
977 labels: Vec::new(),
978 edge_type_name: None,
979 }
980 }
981
982 pub fn new_for_edge(
984 store: Arc<dyn GraphStoreMut>,
985 input: Box<dyn Operator>,
986 edge_column: usize,
987 properties: Vec<(String, PropertySource)>,
988 output_schema: Vec<LogicalType>,
989 ) -> Self {
990 Self {
991 store,
992 input,
993 entity_column: edge_column,
994 is_edge: true,
995 properties,
996 output_schema,
997 replace: false,
998 validator: None,
999 labels: Vec::new(),
1000 edge_type_name: None,
1001 }
1002 }
1003
1004 pub fn with_replace(mut self, replace: bool) -> Self {
1006 self.replace = replace;
1007 self
1008 }
1009
1010 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1012 self.validator = Some(validator);
1013 self
1014 }
1015
1016 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1018 self.labels = labels;
1019 self
1020 }
1021
1022 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1024 self.edge_type_name = Some(edge_type);
1025 self
1026 }
1027}
1028
1029impl Operator for SetPropertyOperator {
1030 fn next(&mut self) -> OperatorResult {
1031 if let Some(chunk) = self.input.next()? {
1032 let mut builder =
1033 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1034
1035 for row in chunk.selected_indices() {
1036 let entity_val = chunk
1037 .column(self.entity_column)
1038 .and_then(|c| c.get_value(row))
1039 .ok_or_else(|| {
1040 OperatorError::ColumnNotFound(format!(
1041 "entity column {}",
1042 self.entity_column
1043 ))
1044 })?;
1045
1046 let entity_id = match entity_val {
1047 Value::Int64(id) => id as u64,
1048 _ => {
1049 return Err(OperatorError::TypeMismatch {
1050 expected: "Int64 (entity ID)".to_string(),
1051 found: format!("{entity_val:?}"),
1052 });
1053 }
1054 };
1055
1056 let resolved_props: Vec<(String, Value)> = self
1058 .properties
1059 .iter()
1060 .map(|(name, source)| {
1061 let value =
1062 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1063 (name.clone(), value)
1064 })
1065 .collect();
1066
1067 if let Some(ref validator) = self.validator {
1069 if self.is_edge {
1070 if let Some(ref et) = self.edge_type_name {
1071 for (name, value) in &resolved_props {
1072 validator.validate_edge_property(et, name, value)?;
1073 }
1074 }
1075 } else {
1076 for (name, value) in &resolved_props {
1077 validator.validate_node_property(&self.labels, name, value)?;
1078 validator.check_unique_node_property(&self.labels, name, value)?;
1079 }
1080 }
1081 }
1082
1083 for (prop_name, value) in resolved_props {
1085 if prop_name == "*" {
1086 if let Value::Map(map) = value {
1088 if self.replace {
1089 if self.is_edge {
1091 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1092 let keys: Vec<String> = edge
1093 .properties
1094 .iter()
1095 .map(|(k, _)| k.as_str().to_string())
1096 .collect();
1097 for key in keys {
1098 self.store
1099 .remove_edge_property(EdgeId(entity_id), &key);
1100 }
1101 }
1102 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1103 let keys: Vec<String> = node
1104 .properties
1105 .iter()
1106 .map(|(k, _)| k.as_str().to_string())
1107 .collect();
1108 for key in keys {
1109 self.store.remove_node_property(NodeId(entity_id), &key);
1110 }
1111 }
1112 }
1113 for (key, val) in map.iter() {
1115 if self.is_edge {
1116 self.store.set_edge_property(
1117 EdgeId(entity_id),
1118 key.as_str(),
1119 val.clone(),
1120 );
1121 } else {
1122 self.store.set_node_property(
1123 NodeId(entity_id),
1124 key.as_str(),
1125 val.clone(),
1126 );
1127 }
1128 }
1129 }
1130 } else if self.is_edge {
1131 self.store
1132 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1133 } else {
1134 self.store
1135 .set_node_property(NodeId(entity_id), &prop_name, value);
1136 }
1137 }
1138
1139 for col_idx in 0..chunk.column_count() {
1141 if let (Some(src), Some(dst)) =
1142 (chunk.column(col_idx), builder.column_mut(col_idx))
1143 {
1144 if let Some(val) = src.get_value(row) {
1145 dst.push_value(val);
1146 } else {
1147 dst.push_value(Value::Null);
1148 }
1149 }
1150 }
1151
1152 builder.advance_row();
1153 }
1154
1155 return Ok(Some(builder.finish()));
1156 }
1157 Ok(None)
1158 }
1159
1160 fn reset(&mut self) {
1161 self.input.reset();
1162 }
1163
1164 fn name(&self) -> &'static str {
1165 "SetProperty"
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172 use crate::execution::DataChunk;
1173 use crate::execution::chunk::DataChunkBuilder;
1174 use crate::graph::lpg::LpgStore;
1175
1176 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1179 Arc::new(LpgStore::new().unwrap())
1180 }
1181
1182 struct MockInput {
1183 chunk: Option<DataChunk>,
1184 }
1185
1186 impl MockInput {
1187 fn boxed(chunk: DataChunk) -> Box<Self> {
1188 Box::new(Self { chunk: Some(chunk) })
1189 }
1190 }
1191
1192 impl Operator for MockInput {
1193 fn next(&mut self) -> OperatorResult {
1194 Ok(self.chunk.take())
1195 }
1196 fn reset(&mut self) {}
1197 fn name(&self) -> &'static str {
1198 "MockInput"
1199 }
1200 }
1201
1202 struct EmptyInput;
1203 impl Operator for EmptyInput {
1204 fn next(&mut self) -> OperatorResult {
1205 Ok(None)
1206 }
1207 fn reset(&mut self) {}
1208 fn name(&self) -> &'static str {
1209 "EmptyInput"
1210 }
1211 }
1212
1213 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1214 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1215 for id in ids {
1216 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1217 builder.advance_row();
1218 }
1219 builder.finish()
1220 }
1221
1222 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1223 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1224 for id in ids {
1225 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1226 builder.advance_row();
1227 }
1228 builder.finish()
1229 }
1230
1231 #[test]
1234 fn test_create_node_standalone() {
1235 let store = create_test_store();
1236
1237 let mut op = CreateNodeOperator::new(
1238 Arc::clone(&store),
1239 None,
1240 vec!["Person".to_string()],
1241 vec![(
1242 "name".to_string(),
1243 PropertySource::Constant(Value::String("Alix".into())),
1244 )],
1245 vec![LogicalType::Int64],
1246 0,
1247 );
1248
1249 let chunk = op.next().unwrap().unwrap();
1250 assert_eq!(chunk.row_count(), 1);
1251
1252 assert!(op.next().unwrap().is_none());
1254
1255 assert_eq!(store.node_count(), 1);
1256 }
1257
1258 #[test]
1259 fn test_create_edge() {
1260 let store = create_test_store();
1261
1262 let node1 = store.create_node(&["Person"]);
1263 let node2 = store.create_node(&["Person"]);
1264
1265 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1266 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1267 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1268 builder.advance_row();
1269
1270 let mut op = CreateEdgeOperator::new(
1271 Arc::clone(&store),
1272 MockInput::boxed(builder.finish()),
1273 0,
1274 1,
1275 "KNOWS".to_string(),
1276 vec![LogicalType::Int64, LogicalType::Int64],
1277 );
1278
1279 let _chunk = op.next().unwrap().unwrap();
1280 assert_eq!(store.edge_count(), 1);
1281 }
1282
1283 #[test]
1284 fn test_delete_node() {
1285 let store = create_test_store();
1286
1287 let node_id = store.create_node(&["Person"]);
1288 assert_eq!(store.node_count(), 1);
1289
1290 let mut op = DeleteNodeOperator::new(
1291 Arc::clone(&store),
1292 MockInput::boxed(node_id_chunk(&[node_id])),
1293 0,
1294 vec![LogicalType::Int64],
1295 false,
1296 );
1297
1298 let chunk = op.next().unwrap().unwrap();
1299 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1300 assert_eq!(deleted, 1);
1301 assert_eq!(store.node_count(), 0);
1302 }
1303
1304 #[test]
1307 fn test_delete_edge() {
1308 let store = create_test_store();
1309
1310 let n1 = store.create_node(&["Person"]);
1311 let n2 = store.create_node(&["Person"]);
1312 let eid = store.create_edge(n1, n2, "KNOWS");
1313 assert_eq!(store.edge_count(), 1);
1314
1315 let mut op = DeleteEdgeOperator::new(
1316 Arc::clone(&store),
1317 MockInput::boxed(edge_id_chunk(&[eid])),
1318 0,
1319 vec![LogicalType::Int64],
1320 );
1321
1322 let chunk = op.next().unwrap().unwrap();
1323 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1324 assert_eq!(deleted, 1);
1325 assert_eq!(store.edge_count(), 0);
1326 }
1327
1328 #[test]
1329 fn test_delete_edge_no_input_returns_none() {
1330 let store = create_test_store();
1331
1332 let mut op = DeleteEdgeOperator::new(
1333 Arc::clone(&store),
1334 Box::new(EmptyInput),
1335 0,
1336 vec![LogicalType::Int64],
1337 );
1338
1339 assert!(op.next().unwrap().is_none());
1340 }
1341
1342 #[test]
1343 fn test_delete_multiple_edges() {
1344 let store = create_test_store();
1345
1346 let n1 = store.create_node(&["N"]);
1347 let n2 = store.create_node(&["N"]);
1348 let e1 = store.create_edge(n1, n2, "R");
1349 let e2 = store.create_edge(n2, n1, "S");
1350 assert_eq!(store.edge_count(), 2);
1351
1352 let mut op = DeleteEdgeOperator::new(
1353 Arc::clone(&store),
1354 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1355 0,
1356 vec![LogicalType::Int64],
1357 );
1358
1359 let chunk = op.next().unwrap().unwrap();
1360 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1361 assert_eq!(deleted, 2);
1362 assert_eq!(store.edge_count(), 0);
1363 }
1364
1365 #[test]
1368 fn test_delete_node_detach() {
1369 let store = create_test_store();
1370
1371 let n1 = store.create_node(&["Person"]);
1372 let n2 = store.create_node(&["Person"]);
1373 store.create_edge(n1, n2, "KNOWS");
1374 store.create_edge(n2, n1, "FOLLOWS");
1375 assert_eq!(store.edge_count(), 2);
1376
1377 let mut op = DeleteNodeOperator::new(
1378 Arc::clone(&store),
1379 MockInput::boxed(node_id_chunk(&[n1])),
1380 0,
1381 vec![LogicalType::Int64],
1382 true, );
1384
1385 let chunk = op.next().unwrap().unwrap();
1386 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1387 assert_eq!(deleted, 1);
1388 assert_eq!(store.node_count(), 1);
1389 assert_eq!(store.edge_count(), 0); }
1391
1392 #[test]
1395 fn test_add_label() {
1396 let store = create_test_store();
1397
1398 let node = store.create_node(&["Person"]);
1399
1400 let mut op = AddLabelOperator::new(
1401 Arc::clone(&store),
1402 MockInput::boxed(node_id_chunk(&[node])),
1403 0,
1404 vec!["Employee".to_string()],
1405 vec![LogicalType::Int64],
1406 );
1407
1408 let chunk = op.next().unwrap().unwrap();
1409 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1410 assert_eq!(updated, 1);
1411
1412 let node_data = store.get_node(node).unwrap();
1414 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1415 assert!(labels.contains(&"Person"));
1416 assert!(labels.contains(&"Employee"));
1417 }
1418
1419 #[test]
1420 fn test_add_multiple_labels() {
1421 let store = create_test_store();
1422
1423 let node = store.create_node(&["Base"]);
1424
1425 let mut op = AddLabelOperator::new(
1426 Arc::clone(&store),
1427 MockInput::boxed(node_id_chunk(&[node])),
1428 0,
1429 vec!["LabelA".to_string(), "LabelB".to_string()],
1430 vec![LogicalType::Int64],
1431 );
1432
1433 let chunk = op.next().unwrap().unwrap();
1434 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1435 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1438 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1439 assert!(labels.contains(&"LabelA"));
1440 assert!(labels.contains(&"LabelB"));
1441 }
1442
1443 #[test]
1444 fn test_add_label_no_input_returns_none() {
1445 let store = create_test_store();
1446
1447 let mut op = AddLabelOperator::new(
1448 Arc::clone(&store),
1449 Box::new(EmptyInput),
1450 0,
1451 vec!["Foo".to_string()],
1452 vec![LogicalType::Int64],
1453 );
1454
1455 assert!(op.next().unwrap().is_none());
1456 }
1457
1458 #[test]
1461 fn test_remove_label() {
1462 let store = create_test_store();
1463
1464 let node = store.create_node(&["Person", "Employee"]);
1465
1466 let mut op = RemoveLabelOperator::new(
1467 Arc::clone(&store),
1468 MockInput::boxed(node_id_chunk(&[node])),
1469 0,
1470 vec!["Employee".to_string()],
1471 vec![LogicalType::Int64],
1472 );
1473
1474 let chunk = op.next().unwrap().unwrap();
1475 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1476 assert_eq!(updated, 1);
1477
1478 let node_data = store.get_node(node).unwrap();
1480 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1481 assert!(labels.contains(&"Person"));
1482 assert!(!labels.contains(&"Employee"));
1483 }
1484
1485 #[test]
1486 fn test_remove_nonexistent_label() {
1487 let store = create_test_store();
1488
1489 let node = store.create_node(&["Person"]);
1490
1491 let mut op = RemoveLabelOperator::new(
1492 Arc::clone(&store),
1493 MockInput::boxed(node_id_chunk(&[node])),
1494 0,
1495 vec!["NonExistent".to_string()],
1496 vec![LogicalType::Int64],
1497 );
1498
1499 let chunk = op.next().unwrap().unwrap();
1500 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1501 assert_eq!(updated, 0); }
1503
1504 #[test]
1507 fn test_set_node_property_constant() {
1508 let store = create_test_store();
1509
1510 let node = store.create_node(&["Person"]);
1511
1512 let mut op = SetPropertyOperator::new_for_node(
1513 Arc::clone(&store),
1514 MockInput::boxed(node_id_chunk(&[node])),
1515 0,
1516 vec![(
1517 "name".to_string(),
1518 PropertySource::Constant(Value::String("Alix".into())),
1519 )],
1520 vec![LogicalType::Int64],
1521 );
1522
1523 let chunk = op.next().unwrap().unwrap();
1524 assert_eq!(chunk.row_count(), 1);
1525
1526 let node_data = store.get_node(node).unwrap();
1528 assert_eq!(
1529 node_data
1530 .properties
1531 .get(&grafeo_common::types::PropertyKey::new("name")),
1532 Some(&Value::String("Alix".into()))
1533 );
1534 }
1535
1536 #[test]
1537 fn test_set_node_property_from_column() {
1538 let store = create_test_store();
1539
1540 let node = store.create_node(&["Person"]);
1541
1542 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1544 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1545 builder
1546 .column_mut(1)
1547 .unwrap()
1548 .push_value(Value::String("Gus".into()));
1549 builder.advance_row();
1550
1551 let mut op = SetPropertyOperator::new_for_node(
1552 Arc::clone(&store),
1553 MockInput::boxed(builder.finish()),
1554 0,
1555 vec![("name".to_string(), PropertySource::Column(1))],
1556 vec![LogicalType::Int64, LogicalType::String],
1557 );
1558
1559 let chunk = op.next().unwrap().unwrap();
1560 assert_eq!(chunk.row_count(), 1);
1561
1562 let node_data = store.get_node(node).unwrap();
1563 assert_eq!(
1564 node_data
1565 .properties
1566 .get(&grafeo_common::types::PropertyKey::new("name")),
1567 Some(&Value::String("Gus".into()))
1568 );
1569 }
1570
1571 #[test]
1572 fn test_set_edge_property() {
1573 let store = create_test_store();
1574
1575 let n1 = store.create_node(&["N"]);
1576 let n2 = store.create_node(&["N"]);
1577 let eid = store.create_edge(n1, n2, "KNOWS");
1578
1579 let mut op = SetPropertyOperator::new_for_edge(
1580 Arc::clone(&store),
1581 MockInput::boxed(edge_id_chunk(&[eid])),
1582 0,
1583 vec![(
1584 "weight".to_string(),
1585 PropertySource::Constant(Value::Float64(0.75)),
1586 )],
1587 vec![LogicalType::Int64],
1588 );
1589
1590 let chunk = op.next().unwrap().unwrap();
1591 assert_eq!(chunk.row_count(), 1);
1592
1593 let edge_data = store.get_edge(eid).unwrap();
1594 assert_eq!(
1595 edge_data
1596 .properties
1597 .get(&grafeo_common::types::PropertyKey::new("weight")),
1598 Some(&Value::Float64(0.75))
1599 );
1600 }
1601
1602 #[test]
1603 fn test_set_multiple_properties() {
1604 let store = create_test_store();
1605
1606 let node = store.create_node(&["Person"]);
1607
1608 let mut op = SetPropertyOperator::new_for_node(
1609 Arc::clone(&store),
1610 MockInput::boxed(node_id_chunk(&[node])),
1611 0,
1612 vec![
1613 (
1614 "name".to_string(),
1615 PropertySource::Constant(Value::String("Alix".into())),
1616 ),
1617 (
1618 "age".to_string(),
1619 PropertySource::Constant(Value::Int64(30)),
1620 ),
1621 ],
1622 vec![LogicalType::Int64],
1623 );
1624
1625 op.next().unwrap().unwrap();
1626
1627 let node_data = store.get_node(node).unwrap();
1628 assert_eq!(
1629 node_data
1630 .properties
1631 .get(&grafeo_common::types::PropertyKey::new("name")),
1632 Some(&Value::String("Alix".into()))
1633 );
1634 assert_eq!(
1635 node_data
1636 .properties
1637 .get(&grafeo_common::types::PropertyKey::new("age")),
1638 Some(&Value::Int64(30))
1639 );
1640 }
1641
1642 #[test]
1643 fn test_set_property_no_input_returns_none() {
1644 let store = create_test_store();
1645
1646 let mut op = SetPropertyOperator::new_for_node(
1647 Arc::clone(&store),
1648 Box::new(EmptyInput),
1649 0,
1650 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1651 vec![LogicalType::Int64],
1652 );
1653
1654 assert!(op.next().unwrap().is_none());
1655 }
1656
1657 #[test]
1660 fn test_delete_node_without_detach_errors_when_edges_exist() {
1661 let store = create_test_store();
1662
1663 let n1 = store.create_node(&["Person"]);
1664 let n2 = store.create_node(&["Person"]);
1665 store.create_edge(n1, n2, "KNOWS");
1666
1667 let mut op = DeleteNodeOperator::new(
1668 Arc::clone(&store),
1669 MockInput::boxed(node_id_chunk(&[n1])),
1670 0,
1671 vec![LogicalType::Int64],
1672 false, );
1674
1675 let err = op.next().unwrap_err();
1676 match err {
1677 OperatorError::ConstraintViolation(msg) => {
1678 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1679 }
1680 other => panic!("expected ConstraintViolation, got {other:?}"),
1681 }
1682 assert_eq!(store.node_count(), 2);
1684 }
1685
1686 #[test]
1689 fn test_create_node_with_input_operator() {
1690 let store = create_test_store();
1691
1692 let existing = store.create_node(&["Seed"]);
1694
1695 let mut op = CreateNodeOperator::new(
1696 Arc::clone(&store),
1697 Some(MockInput::boxed(node_id_chunk(&[existing]))),
1698 vec!["Created".to_string()],
1699 vec![(
1700 "source".to_string(),
1701 PropertySource::Constant(Value::String("from_input".into())),
1702 )],
1703 vec![LogicalType::Int64, LogicalType::Int64], 1, );
1706
1707 let chunk = op.next().unwrap().unwrap();
1708 assert_eq!(chunk.row_count(), 1);
1709
1710 assert_eq!(store.node_count(), 2);
1712
1713 assert!(op.next().unwrap().is_none());
1715 }
1716
1717 #[test]
1720 fn test_create_edge_with_properties_and_output_column() {
1721 let store = create_test_store();
1722
1723 let n1 = store.create_node(&["Person"]);
1724 let n2 = store.create_node(&["Person"]);
1725
1726 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1727 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1728 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1729 builder.advance_row();
1730
1731 let mut op = CreateEdgeOperator::new(
1732 Arc::clone(&store),
1733 MockInput::boxed(builder.finish()),
1734 0,
1735 1,
1736 "KNOWS".to_string(),
1737 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1738 )
1739 .with_properties(vec![(
1740 "since".to_string(),
1741 PropertySource::Constant(Value::Int64(2024)),
1742 )])
1743 .with_output_column(2);
1744
1745 let chunk = op.next().unwrap().unwrap();
1746 assert_eq!(chunk.row_count(), 1);
1747 assert_eq!(store.edge_count(), 1);
1748
1749 let edge_id_raw = chunk
1751 .column(2)
1752 .and_then(|c| c.get_int64(0))
1753 .expect("edge ID should be in output column 2");
1754 let edge_id = EdgeId(edge_id_raw as u64);
1755
1756 let edge = store.get_edge(edge_id).expect("edge should exist");
1758 assert_eq!(
1759 edge.properties
1760 .get(&grafeo_common::types::PropertyKey::new("since")),
1761 Some(&Value::Int64(2024))
1762 );
1763 }
1764
1765 #[test]
1768 fn test_set_property_map_replace() {
1769 use std::collections::BTreeMap;
1770
1771 let store = create_test_store();
1772
1773 let node = store.create_node(&["Person"]);
1774 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1775
1776 let mut map = BTreeMap::new();
1777 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1778
1779 let mut op = SetPropertyOperator::new_for_node(
1780 Arc::clone(&store),
1781 MockInput::boxed(node_id_chunk(&[node])),
1782 0,
1783 vec![(
1784 "*".to_string(),
1785 PropertySource::Constant(Value::Map(Arc::new(map))),
1786 )],
1787 vec![LogicalType::Int64],
1788 )
1789 .with_replace(true);
1790
1791 op.next().unwrap().unwrap();
1792
1793 let node_data = store.get_node(node).unwrap();
1794 assert!(
1796 node_data
1797 .properties
1798 .get(&PropertyKey::new("old_prop"))
1799 .is_none()
1800 );
1801 assert_eq!(
1803 node_data.properties.get(&PropertyKey::new("new_key")),
1804 Some(&Value::String("new_val".into()))
1805 );
1806 }
1807
1808 #[test]
1811 fn test_set_property_map_merge() {
1812 use std::collections::BTreeMap;
1813
1814 let store = create_test_store();
1815
1816 let node = store.create_node(&["Person"]);
1817 store.set_node_property(node, "existing", Value::Int64(42));
1818
1819 let mut map = BTreeMap::new();
1820 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1821
1822 let mut op = SetPropertyOperator::new_for_node(
1823 Arc::clone(&store),
1824 MockInput::boxed(node_id_chunk(&[node])),
1825 0,
1826 vec![(
1827 "*".to_string(),
1828 PropertySource::Constant(Value::Map(Arc::new(map))),
1829 )],
1830 vec![LogicalType::Int64],
1831 ); op.next().unwrap().unwrap();
1834
1835 let node_data = store.get_node(node).unwrap();
1836 assert_eq!(
1838 node_data.properties.get(&PropertyKey::new("existing")),
1839 Some(&Value::Int64(42))
1840 );
1841 assert_eq!(
1843 node_data.properties.get(&PropertyKey::new("added")),
1844 Some(&Value::String("hello".into()))
1845 );
1846 }
1847
1848 #[test]
1851 fn test_property_source_property_access() {
1852 let store = create_test_store();
1853
1854 let source_node = store.create_node(&["Source"]);
1855 store.set_node_property(source_node, "name", Value::String("Alix".into()));
1856
1857 let target_node = store.create_node(&["Target"]);
1858
1859 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1861 builder.column_mut(0).unwrap().push_node_id(source_node);
1862 builder
1863 .column_mut(1)
1864 .unwrap()
1865 .push_int64(target_node.0 as i64);
1866 builder.advance_row();
1867
1868 let mut op = SetPropertyOperator::new_for_node(
1869 Arc::clone(&store),
1870 MockInput::boxed(builder.finish()),
1871 1, vec![(
1873 "copied_name".to_string(),
1874 PropertySource::PropertyAccess {
1875 column: 0,
1876 property: "name".to_string(),
1877 },
1878 )],
1879 vec![LogicalType::Node, LogicalType::Int64],
1880 );
1881
1882 op.next().unwrap().unwrap();
1883
1884 let target_data = store.get_node(target_node).unwrap();
1885 assert_eq!(
1886 target_data.properties.get(&PropertyKey::new("copied_name")),
1887 Some(&Value::String("Alix".into()))
1888 );
1889 }
1890
1891 #[test]
1894 fn test_create_node_with_constraint_validator() {
1895 let store = create_test_store();
1896
1897 struct RejectAgeValidator;
1898 impl ConstraintValidator for RejectAgeValidator {
1899 fn validate_node_property(
1900 &self,
1901 _labels: &[String],
1902 key: &str,
1903 _value: &Value,
1904 ) -> Result<(), OperatorError> {
1905 if key == "forbidden" {
1906 return Err(OperatorError::ConstraintViolation(
1907 "property 'forbidden' is not allowed".to_string(),
1908 ));
1909 }
1910 Ok(())
1911 }
1912 fn validate_node_complete(
1913 &self,
1914 _labels: &[String],
1915 _properties: &[(String, Value)],
1916 ) -> Result<(), OperatorError> {
1917 Ok(())
1918 }
1919 fn check_unique_node_property(
1920 &self,
1921 _labels: &[String],
1922 _key: &str,
1923 _value: &Value,
1924 ) -> Result<(), OperatorError> {
1925 Ok(())
1926 }
1927 fn validate_edge_property(
1928 &self,
1929 _edge_type: &str,
1930 _key: &str,
1931 _value: &Value,
1932 ) -> Result<(), OperatorError> {
1933 Ok(())
1934 }
1935 fn validate_edge_complete(
1936 &self,
1937 _edge_type: &str,
1938 _properties: &[(String, Value)],
1939 ) -> Result<(), OperatorError> {
1940 Ok(())
1941 }
1942 }
1943
1944 let mut op = CreateNodeOperator::new(
1946 Arc::clone(&store),
1947 None,
1948 vec!["Thing".to_string()],
1949 vec![(
1950 "name".to_string(),
1951 PropertySource::Constant(Value::String("ok".into())),
1952 )],
1953 vec![LogicalType::Int64],
1954 0,
1955 )
1956 .with_validator(Arc::new(RejectAgeValidator));
1957
1958 assert!(op.next().is_ok());
1959 assert_eq!(store.node_count(), 1);
1960
1961 let mut op = CreateNodeOperator::new(
1963 Arc::clone(&store),
1964 None,
1965 vec!["Thing".to_string()],
1966 vec![(
1967 "forbidden".to_string(),
1968 PropertySource::Constant(Value::Int64(1)),
1969 )],
1970 vec![LogicalType::Int64],
1971 0,
1972 )
1973 .with_validator(Arc::new(RejectAgeValidator));
1974
1975 let err = op.next().unwrap_err();
1976 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
1977 }
1980
1981 #[test]
1984 fn test_create_node_reset_allows_re_execution() {
1985 let store = create_test_store();
1986
1987 let mut op = CreateNodeOperator::new(
1988 Arc::clone(&store),
1989 None,
1990 vec!["Person".to_string()],
1991 vec![],
1992 vec![LogicalType::Int64],
1993 0,
1994 );
1995
1996 assert!(op.next().unwrap().is_some());
1998 assert!(op.next().unwrap().is_none());
1999
2000 op.reset();
2002 assert!(op.next().unwrap().is_some());
2003
2004 assert_eq!(store.node_count(), 2);
2005 }
2006
2007 #[test]
2010 fn test_operator_names() {
2011 let store = create_test_store();
2012
2013 let op = CreateNodeOperator::new(
2014 Arc::clone(&store),
2015 None,
2016 vec![],
2017 vec![],
2018 vec![LogicalType::Int64],
2019 0,
2020 );
2021 assert_eq!(op.name(), "CreateNode");
2022
2023 let op = CreateEdgeOperator::new(
2024 Arc::clone(&store),
2025 Box::new(EmptyInput),
2026 0,
2027 1,
2028 "R".to_string(),
2029 vec![LogicalType::Int64],
2030 );
2031 assert_eq!(op.name(), "CreateEdge");
2032
2033 let op = DeleteNodeOperator::new(
2034 Arc::clone(&store),
2035 Box::new(EmptyInput),
2036 0,
2037 vec![LogicalType::Int64],
2038 false,
2039 );
2040 assert_eq!(op.name(), "DeleteNode");
2041
2042 let op = DeleteEdgeOperator::new(
2043 Arc::clone(&store),
2044 Box::new(EmptyInput),
2045 0,
2046 vec![LogicalType::Int64],
2047 );
2048 assert_eq!(op.name(), "DeleteEdge");
2049
2050 let op = AddLabelOperator::new(
2051 Arc::clone(&store),
2052 Box::new(EmptyInput),
2053 0,
2054 vec!["L".to_string()],
2055 vec![LogicalType::Int64],
2056 );
2057 assert_eq!(op.name(), "AddLabel");
2058
2059 let op = RemoveLabelOperator::new(
2060 Arc::clone(&store),
2061 Box::new(EmptyInput),
2062 0,
2063 vec!["L".to_string()],
2064 vec![LogicalType::Int64],
2065 );
2066 assert_eq!(op.name(), "RemoveLabel");
2067
2068 let op = SetPropertyOperator::new_for_node(
2069 Arc::clone(&store),
2070 Box::new(EmptyInput),
2071 0,
2072 vec![],
2073 vec![LogicalType::Int64],
2074 );
2075 assert_eq!(op.name(), "SetProperty");
2076 }
2077}