1use std::sync::Arc;
10
11use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TxId, Value};
12
13use super::{Operator, OperatorError, OperatorResult};
14use crate::execution::chunk::DataChunkBuilder;
15use crate::graph::{GraphStore, GraphStoreMut};
16
17pub trait ConstraintValidator: Send + Sync {
22 fn validate_node_property(
26 &self,
27 labels: &[String],
28 key: &str,
29 value: &Value,
30 ) -> Result<(), OperatorError>;
31
32 fn validate_node_complete(
36 &self,
37 labels: &[String],
38 properties: &[(String, Value)],
39 ) -> Result<(), OperatorError>;
40
41 fn check_unique_node_property(
45 &self,
46 labels: &[String],
47 key: &str,
48 value: &Value,
49 ) -> Result<(), OperatorError>;
50
51 fn validate_edge_property(
53 &self,
54 edge_type: &str,
55 key: &str,
56 value: &Value,
57 ) -> Result<(), OperatorError>;
58
59 fn validate_edge_complete(
61 &self,
62 edge_type: &str,
63 properties: &[(String, Value)],
64 ) -> Result<(), OperatorError>;
65}
66
67pub struct CreateNodeOperator {
72 store: Arc<dyn GraphStoreMut>,
74 input: Option<Box<dyn Operator>>,
76 labels: Vec<String>,
78 properties: Vec<(String, PropertySource)>,
80 output_schema: Vec<LogicalType>,
82 output_column: usize,
84 executed: bool,
86 viewing_epoch: Option<EpochId>,
88 tx_id: Option<TxId>,
90 validator: Option<Arc<dyn ConstraintValidator>>,
92}
93
94#[derive(Debug, Clone)]
96pub enum PropertySource {
97 Column(usize),
99 Constant(Value),
101 PropertyAccess {
103 column: usize,
105 property: String,
107 },
108}
109
110impl PropertySource {
111 pub fn resolve(
113 &self,
114 chunk: &crate::execution::chunk::DataChunk,
115 row: usize,
116 store: &dyn GraphStore,
117 ) -> Value {
118 match self {
119 PropertySource::Column(col_idx) => chunk
120 .column(*col_idx)
121 .and_then(|c| c.get_value(row))
122 .unwrap_or(Value::Null),
123 PropertySource::Constant(v) => v.clone(),
124 PropertySource::PropertyAccess { column, property } => {
125 let Some(col) = chunk.column(*column) else {
126 return Value::Null;
127 };
128 if let Some(node_id) = col.get_node_id(row) {
130 store
131 .get_node(node_id)
132 .and_then(|node| node.get_property(property).cloned())
133 .unwrap_or(Value::Null)
134 } else if let Some(edge_id) = col.get_edge_id(row) {
135 store
136 .get_edge(edge_id)
137 .and_then(|edge| edge.get_property(property).cloned())
138 .unwrap_or(Value::Null)
139 } else if let Some(Value::Map(map)) = col.get_value(row) {
140 let key = PropertyKey::new(property);
141 map.get(&key).cloned().unwrap_or(Value::Null)
142 } else {
143 Value::Null
144 }
145 }
146 }
147 }
148}
149
150impl CreateNodeOperator {
151 pub fn new(
161 store: Arc<dyn GraphStoreMut>,
162 input: Option<Box<dyn Operator>>,
163 labels: Vec<String>,
164 properties: Vec<(String, PropertySource)>,
165 output_schema: Vec<LogicalType>,
166 output_column: usize,
167 ) -> Self {
168 Self {
169 store,
170 input,
171 labels,
172 properties,
173 output_schema,
174 output_column,
175 executed: false,
176 viewing_epoch: None,
177 tx_id: None,
178 validator: None,
179 }
180 }
181
182 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
184 self.viewing_epoch = Some(epoch);
185 self.tx_id = tx_id;
186 self
187 }
188
189 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
191 self.validator = Some(validator);
192 self
193 }
194}
195
196impl CreateNodeOperator {
197 fn validate_and_set_properties(
199 &self,
200 node_id: NodeId,
201 resolved_props: &[(String, Value)],
202 ) -> Result<(), OperatorError> {
203 if let Some(ref validator) = self.validator {
205 for (name, value) in resolved_props {
206 validator.validate_node_property(&self.labels, name, value)?;
207 validator.check_unique_node_property(&self.labels, name, value)?;
208 }
209 validator.validate_node_complete(&self.labels, resolved_props)?;
211 }
212
213 for (name, value) in resolved_props {
215 self.store.set_node_property(node_id, name, value.clone());
216 }
217 Ok(())
218 }
219}
220
221impl Operator for CreateNodeOperator {
222 fn next(&mut self) -> OperatorResult {
223 let epoch = self
225 .viewing_epoch
226 .unwrap_or_else(|| self.store.current_epoch());
227 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
228
229 if let Some(ref mut input) = self.input {
230 if let Some(chunk) = input.next()? {
232 let mut builder =
233 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
234
235 for row in chunk.selected_indices() {
236 let resolved_props: Vec<(String, Value)> = self
238 .properties
239 .iter()
240 .map(|(name, source)| {
241 let value =
242 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
243 (name.clone(), value)
244 })
245 .collect();
246
247 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
249 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
250
251 self.validate_and_set_properties(node_id, &resolved_props)?;
253
254 for col_idx in 0..chunk.column_count() {
256 if col_idx < self.output_column
257 && let (Some(src), Some(dst)) =
258 (chunk.column(col_idx), builder.column_mut(col_idx))
259 {
260 if let Some(val) = src.get_value(row) {
261 dst.push_value(val);
262 } else {
263 dst.push_value(Value::Null);
264 }
265 }
266 }
267
268 if let Some(dst) = builder.column_mut(self.output_column) {
270 dst.push_value(Value::Int64(node_id.0 as i64));
271 }
272
273 builder.advance_row();
274 }
275
276 return Ok(Some(builder.finish()));
277 }
278 Ok(None)
279 } else {
280 if self.executed {
282 return Ok(None);
283 }
284 self.executed = true;
285
286 let resolved_props: Vec<(String, Value)> = self
288 .properties
289 .iter()
290 .filter_map(|(name, source)| {
291 if let PropertySource::Constant(value) = source {
292 Some((name.clone(), value.clone()))
293 } else {
294 None
295 }
296 })
297 .collect();
298
299 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
301 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
302
303 self.validate_and_set_properties(node_id, &resolved_props)?;
305
306 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
308 if let Some(dst) = builder.column_mut(self.output_column) {
309 dst.push_value(Value::Int64(node_id.0 as i64));
310 }
311 builder.advance_row();
312
313 Ok(Some(builder.finish()))
314 }
315 }
316
317 fn reset(&mut self) {
318 if let Some(ref mut input) = self.input {
319 input.reset();
320 }
321 self.executed = false;
322 }
323
324 fn name(&self) -> &'static str {
325 "CreateNode"
326 }
327}
328
329pub struct CreateEdgeOperator {
331 store: Arc<dyn GraphStoreMut>,
333 input: Box<dyn Operator>,
335 from_column: usize,
337 to_column: usize,
339 edge_type: String,
341 properties: Vec<(String, PropertySource)>,
343 output_schema: Vec<LogicalType>,
345 output_column: Option<usize>,
347 viewing_epoch: Option<EpochId>,
349 tx_id: Option<TxId>,
351 validator: Option<Arc<dyn ConstraintValidator>>,
353}
354
355impl CreateEdgeOperator {
356 pub fn new(
363 store: Arc<dyn GraphStoreMut>,
364 input: Box<dyn Operator>,
365 from_column: usize,
366 to_column: usize,
367 edge_type: String,
368 output_schema: Vec<LogicalType>,
369 ) -> Self {
370 Self {
371 store,
372 input,
373 from_column,
374 to_column,
375 edge_type,
376 properties: Vec::new(),
377 output_schema,
378 output_column: None,
379 viewing_epoch: None,
380 tx_id: None,
381 validator: None,
382 }
383 }
384
385 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
387 self.properties = properties;
388 self
389 }
390
391 pub fn with_output_column(mut self, column: usize) -> Self {
393 self.output_column = Some(column);
394 self
395 }
396
397 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
399 self.viewing_epoch = Some(epoch);
400 self.tx_id = tx_id;
401 self
402 }
403
404 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
406 self.validator = Some(validator);
407 self
408 }
409}
410
411impl Operator for CreateEdgeOperator {
412 fn next(&mut self) -> OperatorResult {
413 let epoch = self
415 .viewing_epoch
416 .unwrap_or_else(|| self.store.current_epoch());
417 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
418
419 if let Some(chunk) = self.input.next()? {
420 let mut builder =
421 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
422
423 for row in chunk.selected_indices() {
424 let from_id = chunk
426 .column(self.from_column)
427 .and_then(|c| c.get_value(row))
428 .ok_or_else(|| {
429 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
430 })?;
431
432 let to_id = chunk
433 .column(self.to_column)
434 .and_then(|c| c.get_value(row))
435 .ok_or_else(|| {
436 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
437 })?;
438
439 let from_node_id = match from_id {
441 Value::Int64(id) => NodeId(id as u64),
442 _ => {
443 return Err(OperatorError::TypeMismatch {
444 expected: "Int64 (node ID)".to_string(),
445 found: format!("{from_id:?}"),
446 });
447 }
448 };
449
450 let to_node_id = match to_id {
451 Value::Int64(id) => NodeId(id as u64),
452 _ => {
453 return Err(OperatorError::TypeMismatch {
454 expected: "Int64 (node ID)".to_string(),
455 found: format!("{to_id:?}"),
456 });
457 }
458 };
459
460 let resolved_props: Vec<(String, Value)> = self
462 .properties
463 .iter()
464 .map(|(name, source)| {
465 let value =
466 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
467 (name.clone(), value)
468 })
469 .collect();
470
471 if let Some(ref validator) = self.validator {
473 for (name, value) in &resolved_props {
474 validator.validate_edge_property(&self.edge_type, name, value)?;
475 }
476 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
477 }
478
479 let edge_id = self.store.create_edge_versioned(
481 from_node_id,
482 to_node_id,
483 &self.edge_type,
484 epoch,
485 tx,
486 );
487
488 for (name, value) in resolved_props {
490 self.store.set_edge_property(edge_id, &name, value);
491 }
492
493 for col_idx in 0..chunk.column_count() {
495 if let (Some(src), Some(dst)) =
496 (chunk.column(col_idx), builder.column_mut(col_idx))
497 {
498 if let Some(val) = src.get_value(row) {
499 dst.push_value(val);
500 } else {
501 dst.push_value(Value::Null);
502 }
503 }
504 }
505
506 if let Some(out_col) = self.output_column
508 && let Some(dst) = builder.column_mut(out_col)
509 {
510 dst.push_value(Value::Int64(edge_id.0 as i64));
511 }
512
513 builder.advance_row();
514 }
515
516 return Ok(Some(builder.finish()));
517 }
518 Ok(None)
519 }
520
521 fn reset(&mut self) {
522 self.input.reset();
523 }
524
525 fn name(&self) -> &'static str {
526 "CreateEdge"
527 }
528}
529
530pub struct DeleteNodeOperator {
532 store: Arc<dyn GraphStoreMut>,
534 input: Box<dyn Operator>,
536 node_column: usize,
538 output_schema: Vec<LogicalType>,
540 detach: bool,
542 viewing_epoch: Option<EpochId>,
544 tx_id: Option<TxId>,
546}
547
548impl DeleteNodeOperator {
549 pub fn new(
551 store: Arc<dyn GraphStoreMut>,
552 input: Box<dyn Operator>,
553 node_column: usize,
554 output_schema: Vec<LogicalType>,
555 detach: bool,
556 ) -> Self {
557 Self {
558 store,
559 input,
560 node_column,
561 output_schema,
562 detach,
563 viewing_epoch: None,
564 tx_id: None,
565 }
566 }
567
568 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
570 self.viewing_epoch = Some(epoch);
571 self.tx_id = tx_id;
572 self
573 }
574}
575
576impl Operator for DeleteNodeOperator {
577 fn next(&mut self) -> OperatorResult {
578 let epoch = self
580 .viewing_epoch
581 .unwrap_or_else(|| self.store.current_epoch());
582 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
583
584 if let Some(chunk) = self.input.next()? {
585 let mut deleted_count = 0;
586
587 for row in chunk.selected_indices() {
588 let node_val = chunk
589 .column(self.node_column)
590 .and_then(|c| c.get_value(row))
591 .ok_or_else(|| {
592 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
593 })?;
594
595 let node_id = match node_val {
596 Value::Int64(id) => NodeId(id as u64),
597 _ => {
598 return Err(OperatorError::TypeMismatch {
599 expected: "Int64 (node ID)".to_string(),
600 found: format!("{node_val:?}"),
601 });
602 }
603 };
604
605 if self.detach {
606 self.store.delete_node_edges(node_id);
608 } else {
609 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
611 if degree > 0 {
612 return Err(OperatorError::ConstraintViolation(format!(
613 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
614 degree
615 )));
616 }
617 }
618
619 if self.store.delete_node_versioned(node_id, epoch, tx) {
621 deleted_count += 1;
622 }
623 }
624
625 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
627 if let Some(dst) = builder.column_mut(0) {
628 dst.push_value(Value::Int64(deleted_count));
629 }
630 builder.advance_row();
631
632 return Ok(Some(builder.finish()));
633 }
634 Ok(None)
635 }
636
637 fn reset(&mut self) {
638 self.input.reset();
639 }
640
641 fn name(&self) -> &'static str {
642 "DeleteNode"
643 }
644}
645
646pub struct DeleteEdgeOperator {
648 store: Arc<dyn GraphStoreMut>,
650 input: Box<dyn Operator>,
652 edge_column: usize,
654 output_schema: Vec<LogicalType>,
656 viewing_epoch: Option<EpochId>,
658 tx_id: Option<TxId>,
660}
661
662impl DeleteEdgeOperator {
663 pub fn new(
665 store: Arc<dyn GraphStoreMut>,
666 input: Box<dyn Operator>,
667 edge_column: usize,
668 output_schema: Vec<LogicalType>,
669 ) -> Self {
670 Self {
671 store,
672 input,
673 edge_column,
674 output_schema,
675 viewing_epoch: None,
676 tx_id: None,
677 }
678 }
679
680 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
682 self.viewing_epoch = Some(epoch);
683 self.tx_id = tx_id;
684 self
685 }
686}
687
688impl Operator for DeleteEdgeOperator {
689 fn next(&mut self) -> OperatorResult {
690 let epoch = self
692 .viewing_epoch
693 .unwrap_or_else(|| self.store.current_epoch());
694 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
695
696 if let Some(chunk) = self.input.next()? {
697 let mut deleted_count = 0;
698
699 for row in chunk.selected_indices() {
700 let edge_val = chunk
701 .column(self.edge_column)
702 .and_then(|c| c.get_value(row))
703 .ok_or_else(|| {
704 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
705 })?;
706
707 let edge_id = match edge_val {
708 Value::Int64(id) => EdgeId(id as u64),
709 _ => {
710 return Err(OperatorError::TypeMismatch {
711 expected: "Int64 (edge ID)".to_string(),
712 found: format!("{edge_val:?}"),
713 });
714 }
715 };
716
717 if self.store.delete_edge_versioned(edge_id, epoch, tx) {
719 deleted_count += 1;
720 }
721 }
722
723 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
725 if let Some(dst) = builder.column_mut(0) {
726 dst.push_value(Value::Int64(deleted_count));
727 }
728 builder.advance_row();
729
730 return Ok(Some(builder.finish()));
731 }
732 Ok(None)
733 }
734
735 fn reset(&mut self) {
736 self.input.reset();
737 }
738
739 fn name(&self) -> &'static str {
740 "DeleteEdge"
741 }
742}
743
744pub struct AddLabelOperator {
746 store: Arc<dyn GraphStoreMut>,
748 input: Box<dyn Operator>,
750 node_column: usize,
752 labels: Vec<String>,
754 output_schema: Vec<LogicalType>,
756}
757
758impl AddLabelOperator {
759 pub fn new(
761 store: Arc<dyn GraphStoreMut>,
762 input: Box<dyn Operator>,
763 node_column: usize,
764 labels: Vec<String>,
765 output_schema: Vec<LogicalType>,
766 ) -> Self {
767 Self {
768 store,
769 input,
770 node_column,
771 labels,
772 output_schema,
773 }
774 }
775}
776
777impl Operator for AddLabelOperator {
778 fn next(&mut self) -> OperatorResult {
779 if let Some(chunk) = self.input.next()? {
780 let mut updated_count = 0;
781
782 for row in chunk.selected_indices() {
783 let node_val = chunk
784 .column(self.node_column)
785 .and_then(|c| c.get_value(row))
786 .ok_or_else(|| {
787 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
788 })?;
789
790 let node_id = match node_val {
791 Value::Int64(id) => NodeId(id as u64),
792 _ => {
793 return Err(OperatorError::TypeMismatch {
794 expected: "Int64 (node ID)".to_string(),
795 found: format!("{node_val:?}"),
796 });
797 }
798 };
799
800 for label in &self.labels {
802 if self.store.add_label(node_id, label) {
803 updated_count += 1;
804 }
805 }
806 }
807
808 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
810 if let Some(dst) = builder.column_mut(0) {
811 dst.push_value(Value::Int64(updated_count));
812 }
813 builder.advance_row();
814
815 return Ok(Some(builder.finish()));
816 }
817 Ok(None)
818 }
819
820 fn reset(&mut self) {
821 self.input.reset();
822 }
823
824 fn name(&self) -> &'static str {
825 "AddLabel"
826 }
827}
828
829pub struct RemoveLabelOperator {
831 store: Arc<dyn GraphStoreMut>,
833 input: Box<dyn Operator>,
835 node_column: usize,
837 labels: Vec<String>,
839 output_schema: Vec<LogicalType>,
841}
842
843impl RemoveLabelOperator {
844 pub fn new(
846 store: Arc<dyn GraphStoreMut>,
847 input: Box<dyn Operator>,
848 node_column: usize,
849 labels: Vec<String>,
850 output_schema: Vec<LogicalType>,
851 ) -> Self {
852 Self {
853 store,
854 input,
855 node_column,
856 labels,
857 output_schema,
858 }
859 }
860}
861
862impl Operator for RemoveLabelOperator {
863 fn next(&mut self) -> OperatorResult {
864 if let Some(chunk) = self.input.next()? {
865 let mut updated_count = 0;
866
867 for row in chunk.selected_indices() {
868 let node_val = chunk
869 .column(self.node_column)
870 .and_then(|c| c.get_value(row))
871 .ok_or_else(|| {
872 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
873 })?;
874
875 let node_id = match node_val {
876 Value::Int64(id) => NodeId(id as u64),
877 _ => {
878 return Err(OperatorError::TypeMismatch {
879 expected: "Int64 (node ID)".to_string(),
880 found: format!("{node_val:?}"),
881 });
882 }
883 };
884
885 for label in &self.labels {
887 if self.store.remove_label(node_id, label) {
888 updated_count += 1;
889 }
890 }
891 }
892
893 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
895 if let Some(dst) = builder.column_mut(0) {
896 dst.push_value(Value::Int64(updated_count));
897 }
898 builder.advance_row();
899
900 return Ok(Some(builder.finish()));
901 }
902 Ok(None)
903 }
904
905 fn reset(&mut self) {
906 self.input.reset();
907 }
908
909 fn name(&self) -> &'static str {
910 "RemoveLabel"
911 }
912}
913
914pub struct SetPropertyOperator {
919 store: Arc<dyn GraphStoreMut>,
921 input: Box<dyn Operator>,
923 entity_column: usize,
925 is_edge: bool,
927 properties: Vec<(String, PropertySource)>,
929 output_schema: Vec<LogicalType>,
931 replace: bool,
933 validator: Option<Arc<dyn ConstraintValidator>>,
935 labels: Vec<String>,
937 edge_type_name: Option<String>,
939}
940
941impl SetPropertyOperator {
942 pub fn new_for_node(
944 store: Arc<dyn GraphStoreMut>,
945 input: Box<dyn Operator>,
946 node_column: usize,
947 properties: Vec<(String, PropertySource)>,
948 output_schema: Vec<LogicalType>,
949 ) -> Self {
950 Self {
951 store,
952 input,
953 entity_column: node_column,
954 is_edge: false,
955 properties,
956 output_schema,
957 replace: false,
958 validator: None,
959 labels: Vec::new(),
960 edge_type_name: None,
961 }
962 }
963
964 pub fn new_for_edge(
966 store: Arc<dyn GraphStoreMut>,
967 input: Box<dyn Operator>,
968 edge_column: usize,
969 properties: Vec<(String, PropertySource)>,
970 output_schema: Vec<LogicalType>,
971 ) -> Self {
972 Self {
973 store,
974 input,
975 entity_column: edge_column,
976 is_edge: true,
977 properties,
978 output_schema,
979 replace: false,
980 validator: None,
981 labels: Vec::new(),
982 edge_type_name: None,
983 }
984 }
985
986 pub fn with_replace(mut self, replace: bool) -> Self {
988 self.replace = replace;
989 self
990 }
991
992 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
994 self.validator = Some(validator);
995 self
996 }
997
998 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1000 self.labels = labels;
1001 self
1002 }
1003
1004 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1006 self.edge_type_name = Some(edge_type);
1007 self
1008 }
1009}
1010
1011impl Operator for SetPropertyOperator {
1012 fn next(&mut self) -> OperatorResult {
1013 if let Some(chunk) = self.input.next()? {
1014 let mut builder =
1015 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1016
1017 for row in chunk.selected_indices() {
1018 let entity_val = chunk
1019 .column(self.entity_column)
1020 .and_then(|c| c.get_value(row))
1021 .ok_or_else(|| {
1022 OperatorError::ColumnNotFound(format!(
1023 "entity column {}",
1024 self.entity_column
1025 ))
1026 })?;
1027
1028 let entity_id = match entity_val {
1029 Value::Int64(id) => id as u64,
1030 _ => {
1031 return Err(OperatorError::TypeMismatch {
1032 expected: "Int64 (entity ID)".to_string(),
1033 found: format!("{entity_val:?}"),
1034 });
1035 }
1036 };
1037
1038 let resolved_props: Vec<(String, Value)> = self
1040 .properties
1041 .iter()
1042 .map(|(name, source)| {
1043 let value =
1044 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1045 (name.clone(), value)
1046 })
1047 .collect();
1048
1049 if let Some(ref validator) = self.validator {
1051 if self.is_edge {
1052 if let Some(ref et) = self.edge_type_name {
1053 for (name, value) in &resolved_props {
1054 validator.validate_edge_property(et, name, value)?;
1055 }
1056 }
1057 } else {
1058 for (name, value) in &resolved_props {
1059 validator.validate_node_property(&self.labels, name, value)?;
1060 validator.check_unique_node_property(&self.labels, name, value)?;
1061 }
1062 }
1063 }
1064
1065 for (prop_name, value) in resolved_props {
1067 if prop_name == "*" {
1068 if let Value::Map(map) = value {
1070 if self.replace {
1071 if self.is_edge {
1073 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1074 let keys: Vec<String> = edge
1075 .properties
1076 .iter()
1077 .map(|(k, _)| k.as_str().to_string())
1078 .collect();
1079 for key in keys {
1080 self.store
1081 .remove_edge_property(EdgeId(entity_id), &key);
1082 }
1083 }
1084 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1085 let keys: Vec<String> = node
1086 .properties
1087 .iter()
1088 .map(|(k, _)| k.as_str().to_string())
1089 .collect();
1090 for key in keys {
1091 self.store.remove_node_property(NodeId(entity_id), &key);
1092 }
1093 }
1094 }
1095 for (key, val) in map.iter() {
1097 if self.is_edge {
1098 self.store.set_edge_property(
1099 EdgeId(entity_id),
1100 key.as_str(),
1101 val.clone(),
1102 );
1103 } else {
1104 self.store.set_node_property(
1105 NodeId(entity_id),
1106 key.as_str(),
1107 val.clone(),
1108 );
1109 }
1110 }
1111 }
1112 } else if self.is_edge {
1113 self.store
1114 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1115 } else {
1116 self.store
1117 .set_node_property(NodeId(entity_id), &prop_name, value);
1118 }
1119 }
1120
1121 for col_idx in 0..chunk.column_count() {
1123 if let (Some(src), Some(dst)) =
1124 (chunk.column(col_idx), builder.column_mut(col_idx))
1125 {
1126 if let Some(val) = src.get_value(row) {
1127 dst.push_value(val);
1128 } else {
1129 dst.push_value(Value::Null);
1130 }
1131 }
1132 }
1133
1134 builder.advance_row();
1135 }
1136
1137 return Ok(Some(builder.finish()));
1138 }
1139 Ok(None)
1140 }
1141
1142 fn reset(&mut self) {
1143 self.input.reset();
1144 }
1145
1146 fn name(&self) -> &'static str {
1147 "SetProperty"
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154 use crate::execution::DataChunk;
1155 use crate::execution::chunk::DataChunkBuilder;
1156 use crate::graph::lpg::LpgStore;
1157
1158 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1161 Arc::new(LpgStore::new().unwrap())
1162 }
1163
1164 struct MockInput {
1165 chunk: Option<DataChunk>,
1166 }
1167
1168 impl MockInput {
1169 fn boxed(chunk: DataChunk) -> Box<Self> {
1170 Box::new(Self { chunk: Some(chunk) })
1171 }
1172 }
1173
1174 impl Operator for MockInput {
1175 fn next(&mut self) -> OperatorResult {
1176 Ok(self.chunk.take())
1177 }
1178 fn reset(&mut self) {}
1179 fn name(&self) -> &'static str {
1180 "MockInput"
1181 }
1182 }
1183
1184 struct EmptyInput;
1185 impl Operator for EmptyInput {
1186 fn next(&mut self) -> OperatorResult {
1187 Ok(None)
1188 }
1189 fn reset(&mut self) {}
1190 fn name(&self) -> &'static str {
1191 "EmptyInput"
1192 }
1193 }
1194
1195 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1196 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1197 for id in ids {
1198 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1199 builder.advance_row();
1200 }
1201 builder.finish()
1202 }
1203
1204 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1205 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1206 for id in ids {
1207 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1208 builder.advance_row();
1209 }
1210 builder.finish()
1211 }
1212
1213 #[test]
1216 fn test_create_node_standalone() {
1217 let store = create_test_store();
1218
1219 let mut op = CreateNodeOperator::new(
1220 Arc::clone(&store),
1221 None,
1222 vec!["Person".to_string()],
1223 vec![(
1224 "name".to_string(),
1225 PropertySource::Constant(Value::String("Alix".into())),
1226 )],
1227 vec![LogicalType::Int64],
1228 0,
1229 );
1230
1231 let chunk = op.next().unwrap().unwrap();
1232 assert_eq!(chunk.row_count(), 1);
1233
1234 assert!(op.next().unwrap().is_none());
1236
1237 assert_eq!(store.node_count(), 1);
1238 }
1239
1240 #[test]
1241 fn test_create_edge() {
1242 let store = create_test_store();
1243
1244 let node1 = store.create_node(&["Person"]);
1245 let node2 = store.create_node(&["Person"]);
1246
1247 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1248 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1249 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1250 builder.advance_row();
1251
1252 let mut op = CreateEdgeOperator::new(
1253 Arc::clone(&store),
1254 MockInput::boxed(builder.finish()),
1255 0,
1256 1,
1257 "KNOWS".to_string(),
1258 vec![LogicalType::Int64, LogicalType::Int64],
1259 );
1260
1261 let _chunk = op.next().unwrap().unwrap();
1262 assert_eq!(store.edge_count(), 1);
1263 }
1264
1265 #[test]
1266 fn test_delete_node() {
1267 let store = create_test_store();
1268
1269 let node_id = store.create_node(&["Person"]);
1270 assert_eq!(store.node_count(), 1);
1271
1272 let mut op = DeleteNodeOperator::new(
1273 Arc::clone(&store),
1274 MockInput::boxed(node_id_chunk(&[node_id])),
1275 0,
1276 vec![LogicalType::Int64],
1277 false,
1278 );
1279
1280 let chunk = op.next().unwrap().unwrap();
1281 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1282 assert_eq!(deleted, 1);
1283 assert_eq!(store.node_count(), 0);
1284 }
1285
1286 #[test]
1289 fn test_delete_edge() {
1290 let store = create_test_store();
1291
1292 let n1 = store.create_node(&["Person"]);
1293 let n2 = store.create_node(&["Person"]);
1294 let eid = store.create_edge(n1, n2, "KNOWS");
1295 assert_eq!(store.edge_count(), 1);
1296
1297 let mut op = DeleteEdgeOperator::new(
1298 Arc::clone(&store),
1299 MockInput::boxed(edge_id_chunk(&[eid])),
1300 0,
1301 vec![LogicalType::Int64],
1302 );
1303
1304 let chunk = op.next().unwrap().unwrap();
1305 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1306 assert_eq!(deleted, 1);
1307 assert_eq!(store.edge_count(), 0);
1308 }
1309
1310 #[test]
1311 fn test_delete_edge_no_input_returns_none() {
1312 let store = create_test_store();
1313
1314 let mut op = DeleteEdgeOperator::new(
1315 Arc::clone(&store),
1316 Box::new(EmptyInput),
1317 0,
1318 vec![LogicalType::Int64],
1319 );
1320
1321 assert!(op.next().unwrap().is_none());
1322 }
1323
1324 #[test]
1325 fn test_delete_multiple_edges() {
1326 let store = create_test_store();
1327
1328 let n1 = store.create_node(&["N"]);
1329 let n2 = store.create_node(&["N"]);
1330 let e1 = store.create_edge(n1, n2, "R");
1331 let e2 = store.create_edge(n2, n1, "S");
1332 assert_eq!(store.edge_count(), 2);
1333
1334 let mut op = DeleteEdgeOperator::new(
1335 Arc::clone(&store),
1336 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1337 0,
1338 vec![LogicalType::Int64],
1339 );
1340
1341 let chunk = op.next().unwrap().unwrap();
1342 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1343 assert_eq!(deleted, 2);
1344 assert_eq!(store.edge_count(), 0);
1345 }
1346
1347 #[test]
1350 fn test_delete_node_detach() {
1351 let store = create_test_store();
1352
1353 let n1 = store.create_node(&["Person"]);
1354 let n2 = store.create_node(&["Person"]);
1355 store.create_edge(n1, n2, "KNOWS");
1356 store.create_edge(n2, n1, "FOLLOWS");
1357 assert_eq!(store.edge_count(), 2);
1358
1359 let mut op = DeleteNodeOperator::new(
1360 Arc::clone(&store),
1361 MockInput::boxed(node_id_chunk(&[n1])),
1362 0,
1363 vec![LogicalType::Int64],
1364 true, );
1366
1367 let chunk = op.next().unwrap().unwrap();
1368 let deleted = chunk.column(0).unwrap().get_int64(0).unwrap();
1369 assert_eq!(deleted, 1);
1370 assert_eq!(store.node_count(), 1);
1371 assert_eq!(store.edge_count(), 0); }
1373
1374 #[test]
1377 fn test_add_label() {
1378 let store = create_test_store();
1379
1380 let node = store.create_node(&["Person"]);
1381
1382 let mut op = AddLabelOperator::new(
1383 Arc::clone(&store),
1384 MockInput::boxed(node_id_chunk(&[node])),
1385 0,
1386 vec!["Employee".to_string()],
1387 vec![LogicalType::Int64],
1388 );
1389
1390 let chunk = op.next().unwrap().unwrap();
1391 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1392 assert_eq!(updated, 1);
1393
1394 let node_data = store.get_node(node).unwrap();
1396 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1397 assert!(labels.contains(&"Person"));
1398 assert!(labels.contains(&"Employee"));
1399 }
1400
1401 #[test]
1402 fn test_add_multiple_labels() {
1403 let store = create_test_store();
1404
1405 let node = store.create_node(&["Base"]);
1406
1407 let mut op = AddLabelOperator::new(
1408 Arc::clone(&store),
1409 MockInput::boxed(node_id_chunk(&[node])),
1410 0,
1411 vec!["LabelA".to_string(), "LabelB".to_string()],
1412 vec![LogicalType::Int64],
1413 );
1414
1415 let chunk = op.next().unwrap().unwrap();
1416 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1417 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1420 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1421 assert!(labels.contains(&"LabelA"));
1422 assert!(labels.contains(&"LabelB"));
1423 }
1424
1425 #[test]
1426 fn test_add_label_no_input_returns_none() {
1427 let store = create_test_store();
1428
1429 let mut op = AddLabelOperator::new(
1430 Arc::clone(&store),
1431 Box::new(EmptyInput),
1432 0,
1433 vec!["Foo".to_string()],
1434 vec![LogicalType::Int64],
1435 );
1436
1437 assert!(op.next().unwrap().is_none());
1438 }
1439
1440 #[test]
1443 fn test_remove_label() {
1444 let store = create_test_store();
1445
1446 let node = store.create_node(&["Person", "Employee"]);
1447
1448 let mut op = RemoveLabelOperator::new(
1449 Arc::clone(&store),
1450 MockInput::boxed(node_id_chunk(&[node])),
1451 0,
1452 vec!["Employee".to_string()],
1453 vec![LogicalType::Int64],
1454 );
1455
1456 let chunk = op.next().unwrap().unwrap();
1457 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1458 assert_eq!(updated, 1);
1459
1460 let node_data = store.get_node(node).unwrap();
1462 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1463 assert!(labels.contains(&"Person"));
1464 assert!(!labels.contains(&"Employee"));
1465 }
1466
1467 #[test]
1468 fn test_remove_nonexistent_label() {
1469 let store = create_test_store();
1470
1471 let node = store.create_node(&["Person"]);
1472
1473 let mut op = RemoveLabelOperator::new(
1474 Arc::clone(&store),
1475 MockInput::boxed(node_id_chunk(&[node])),
1476 0,
1477 vec!["NonExistent".to_string()],
1478 vec![LogicalType::Int64],
1479 );
1480
1481 let chunk = op.next().unwrap().unwrap();
1482 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1483 assert_eq!(updated, 0); }
1485
1486 #[test]
1489 fn test_set_node_property_constant() {
1490 let store = create_test_store();
1491
1492 let node = store.create_node(&["Person"]);
1493
1494 let mut op = SetPropertyOperator::new_for_node(
1495 Arc::clone(&store),
1496 MockInput::boxed(node_id_chunk(&[node])),
1497 0,
1498 vec![(
1499 "name".to_string(),
1500 PropertySource::Constant(Value::String("Alix".into())),
1501 )],
1502 vec![LogicalType::Int64],
1503 );
1504
1505 let chunk = op.next().unwrap().unwrap();
1506 assert_eq!(chunk.row_count(), 1);
1507
1508 let node_data = store.get_node(node).unwrap();
1510 assert_eq!(
1511 node_data
1512 .properties
1513 .get(&grafeo_common::types::PropertyKey::new("name")),
1514 Some(&Value::String("Alix".into()))
1515 );
1516 }
1517
1518 #[test]
1519 fn test_set_node_property_from_column() {
1520 let store = create_test_store();
1521
1522 let node = store.create_node(&["Person"]);
1523
1524 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1526 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1527 builder
1528 .column_mut(1)
1529 .unwrap()
1530 .push_value(Value::String("Gus".into()));
1531 builder.advance_row();
1532
1533 let mut op = SetPropertyOperator::new_for_node(
1534 Arc::clone(&store),
1535 MockInput::boxed(builder.finish()),
1536 0,
1537 vec![("name".to_string(), PropertySource::Column(1))],
1538 vec![LogicalType::Int64, LogicalType::String],
1539 );
1540
1541 let chunk = op.next().unwrap().unwrap();
1542 assert_eq!(chunk.row_count(), 1);
1543
1544 let node_data = store.get_node(node).unwrap();
1545 assert_eq!(
1546 node_data
1547 .properties
1548 .get(&grafeo_common::types::PropertyKey::new("name")),
1549 Some(&Value::String("Gus".into()))
1550 );
1551 }
1552
1553 #[test]
1554 fn test_set_edge_property() {
1555 let store = create_test_store();
1556
1557 let n1 = store.create_node(&["N"]);
1558 let n2 = store.create_node(&["N"]);
1559 let eid = store.create_edge(n1, n2, "KNOWS");
1560
1561 let mut op = SetPropertyOperator::new_for_edge(
1562 Arc::clone(&store),
1563 MockInput::boxed(edge_id_chunk(&[eid])),
1564 0,
1565 vec![(
1566 "weight".to_string(),
1567 PropertySource::Constant(Value::Float64(0.75)),
1568 )],
1569 vec![LogicalType::Int64],
1570 );
1571
1572 let chunk = op.next().unwrap().unwrap();
1573 assert_eq!(chunk.row_count(), 1);
1574
1575 let edge_data = store.get_edge(eid).unwrap();
1576 assert_eq!(
1577 edge_data
1578 .properties
1579 .get(&grafeo_common::types::PropertyKey::new("weight")),
1580 Some(&Value::Float64(0.75))
1581 );
1582 }
1583
1584 #[test]
1585 fn test_set_multiple_properties() {
1586 let store = create_test_store();
1587
1588 let node = store.create_node(&["Person"]);
1589
1590 let mut op = SetPropertyOperator::new_for_node(
1591 Arc::clone(&store),
1592 MockInput::boxed(node_id_chunk(&[node])),
1593 0,
1594 vec![
1595 (
1596 "name".to_string(),
1597 PropertySource::Constant(Value::String("Alix".into())),
1598 ),
1599 (
1600 "age".to_string(),
1601 PropertySource::Constant(Value::Int64(30)),
1602 ),
1603 ],
1604 vec![LogicalType::Int64],
1605 );
1606
1607 op.next().unwrap().unwrap();
1608
1609 let node_data = store.get_node(node).unwrap();
1610 assert_eq!(
1611 node_data
1612 .properties
1613 .get(&grafeo_common::types::PropertyKey::new("name")),
1614 Some(&Value::String("Alix".into()))
1615 );
1616 assert_eq!(
1617 node_data
1618 .properties
1619 .get(&grafeo_common::types::PropertyKey::new("age")),
1620 Some(&Value::Int64(30))
1621 );
1622 }
1623
1624 #[test]
1625 fn test_set_property_no_input_returns_none() {
1626 let store = create_test_store();
1627
1628 let mut op = SetPropertyOperator::new_for_node(
1629 Arc::clone(&store),
1630 Box::new(EmptyInput),
1631 0,
1632 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1633 vec![LogicalType::Int64],
1634 );
1635
1636 assert!(op.next().unwrap().is_none());
1637 }
1638
1639 #[test]
1642 fn test_delete_node_without_detach_errors_when_edges_exist() {
1643 let store = create_test_store();
1644
1645 let n1 = store.create_node(&["Person"]);
1646 let n2 = store.create_node(&["Person"]);
1647 store.create_edge(n1, n2, "KNOWS");
1648
1649 let mut op = DeleteNodeOperator::new(
1650 Arc::clone(&store),
1651 MockInput::boxed(node_id_chunk(&[n1])),
1652 0,
1653 vec![LogicalType::Int64],
1654 false, );
1656
1657 let err = op.next().unwrap_err();
1658 match err {
1659 OperatorError::ConstraintViolation(msg) => {
1660 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1661 }
1662 other => panic!("expected ConstraintViolation, got {other:?}"),
1663 }
1664 assert_eq!(store.node_count(), 2);
1666 }
1667
1668 #[test]
1671 fn test_create_node_with_input_operator() {
1672 let store = create_test_store();
1673
1674 let existing = store.create_node(&["Seed"]);
1676
1677 let mut op = CreateNodeOperator::new(
1678 Arc::clone(&store),
1679 Some(MockInput::boxed(node_id_chunk(&[existing]))),
1680 vec!["Created".to_string()],
1681 vec![(
1682 "source".to_string(),
1683 PropertySource::Constant(Value::String("from_input".into())),
1684 )],
1685 vec![LogicalType::Int64, LogicalType::Int64], 1, );
1688
1689 let chunk = op.next().unwrap().unwrap();
1690 assert_eq!(chunk.row_count(), 1);
1691
1692 assert_eq!(store.node_count(), 2);
1694
1695 assert!(op.next().unwrap().is_none());
1697 }
1698
1699 #[test]
1702 fn test_create_edge_with_properties_and_output_column() {
1703 let store = create_test_store();
1704
1705 let n1 = store.create_node(&["Person"]);
1706 let n2 = store.create_node(&["Person"]);
1707
1708 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1709 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1710 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1711 builder.advance_row();
1712
1713 let mut op = CreateEdgeOperator::new(
1714 Arc::clone(&store),
1715 MockInput::boxed(builder.finish()),
1716 0,
1717 1,
1718 "KNOWS".to_string(),
1719 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1720 )
1721 .with_properties(vec![(
1722 "since".to_string(),
1723 PropertySource::Constant(Value::Int64(2024)),
1724 )])
1725 .with_output_column(2);
1726
1727 let chunk = op.next().unwrap().unwrap();
1728 assert_eq!(chunk.row_count(), 1);
1729 assert_eq!(store.edge_count(), 1);
1730
1731 let edge_id_raw = chunk
1733 .column(2)
1734 .and_then(|c| c.get_int64(0))
1735 .expect("edge ID should be in output column 2");
1736 let edge_id = EdgeId(edge_id_raw as u64);
1737
1738 let edge = store.get_edge(edge_id).expect("edge should exist");
1740 assert_eq!(
1741 edge.properties
1742 .get(&grafeo_common::types::PropertyKey::new("since")),
1743 Some(&Value::Int64(2024))
1744 );
1745 }
1746
1747 #[test]
1750 fn test_set_property_map_replace() {
1751 use std::collections::BTreeMap;
1752
1753 let store = create_test_store();
1754
1755 let node = store.create_node(&["Person"]);
1756 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1757
1758 let mut map = BTreeMap::new();
1759 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1760
1761 let mut op = SetPropertyOperator::new_for_node(
1762 Arc::clone(&store),
1763 MockInput::boxed(node_id_chunk(&[node])),
1764 0,
1765 vec![(
1766 "*".to_string(),
1767 PropertySource::Constant(Value::Map(Arc::new(map))),
1768 )],
1769 vec![LogicalType::Int64],
1770 )
1771 .with_replace(true);
1772
1773 op.next().unwrap().unwrap();
1774
1775 let node_data = store.get_node(node).unwrap();
1776 assert!(
1778 node_data
1779 .properties
1780 .get(&PropertyKey::new("old_prop"))
1781 .is_none()
1782 );
1783 assert_eq!(
1785 node_data.properties.get(&PropertyKey::new("new_key")),
1786 Some(&Value::String("new_val".into()))
1787 );
1788 }
1789
1790 #[test]
1793 fn test_set_property_map_merge() {
1794 use std::collections::BTreeMap;
1795
1796 let store = create_test_store();
1797
1798 let node = store.create_node(&["Person"]);
1799 store.set_node_property(node, "existing", Value::Int64(42));
1800
1801 let mut map = BTreeMap::new();
1802 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1803
1804 let mut op = SetPropertyOperator::new_for_node(
1805 Arc::clone(&store),
1806 MockInput::boxed(node_id_chunk(&[node])),
1807 0,
1808 vec![(
1809 "*".to_string(),
1810 PropertySource::Constant(Value::Map(Arc::new(map))),
1811 )],
1812 vec![LogicalType::Int64],
1813 ); op.next().unwrap().unwrap();
1816
1817 let node_data = store.get_node(node).unwrap();
1818 assert_eq!(
1820 node_data.properties.get(&PropertyKey::new("existing")),
1821 Some(&Value::Int64(42))
1822 );
1823 assert_eq!(
1825 node_data.properties.get(&PropertyKey::new("added")),
1826 Some(&Value::String("hello".into()))
1827 );
1828 }
1829
1830 #[test]
1833 fn test_property_source_property_access() {
1834 let store = create_test_store();
1835
1836 let source_node = store.create_node(&["Source"]);
1837 store.set_node_property(source_node, "name", Value::String("Alix".into()));
1838
1839 let target_node = store.create_node(&["Target"]);
1840
1841 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1843 builder.column_mut(0).unwrap().push_node_id(source_node);
1844 builder
1845 .column_mut(1)
1846 .unwrap()
1847 .push_int64(target_node.0 as i64);
1848 builder.advance_row();
1849
1850 let mut op = SetPropertyOperator::new_for_node(
1851 Arc::clone(&store),
1852 MockInput::boxed(builder.finish()),
1853 1, vec![(
1855 "copied_name".to_string(),
1856 PropertySource::PropertyAccess {
1857 column: 0,
1858 property: "name".to_string(),
1859 },
1860 )],
1861 vec![LogicalType::Node, LogicalType::Int64],
1862 );
1863
1864 op.next().unwrap().unwrap();
1865
1866 let target_data = store.get_node(target_node).unwrap();
1867 assert_eq!(
1868 target_data.properties.get(&PropertyKey::new("copied_name")),
1869 Some(&Value::String("Alix".into()))
1870 );
1871 }
1872
1873 #[test]
1876 fn test_create_node_with_constraint_validator() {
1877 let store = create_test_store();
1878
1879 struct RejectAgeValidator;
1880 impl ConstraintValidator for RejectAgeValidator {
1881 fn validate_node_property(
1882 &self,
1883 _labels: &[String],
1884 key: &str,
1885 _value: &Value,
1886 ) -> Result<(), OperatorError> {
1887 if key == "forbidden" {
1888 return Err(OperatorError::ConstraintViolation(
1889 "property 'forbidden' is not allowed".to_string(),
1890 ));
1891 }
1892 Ok(())
1893 }
1894 fn validate_node_complete(
1895 &self,
1896 _labels: &[String],
1897 _properties: &[(String, Value)],
1898 ) -> Result<(), OperatorError> {
1899 Ok(())
1900 }
1901 fn check_unique_node_property(
1902 &self,
1903 _labels: &[String],
1904 _key: &str,
1905 _value: &Value,
1906 ) -> Result<(), OperatorError> {
1907 Ok(())
1908 }
1909 fn validate_edge_property(
1910 &self,
1911 _edge_type: &str,
1912 _key: &str,
1913 _value: &Value,
1914 ) -> Result<(), OperatorError> {
1915 Ok(())
1916 }
1917 fn validate_edge_complete(
1918 &self,
1919 _edge_type: &str,
1920 _properties: &[(String, Value)],
1921 ) -> Result<(), OperatorError> {
1922 Ok(())
1923 }
1924 }
1925
1926 let mut op = CreateNodeOperator::new(
1928 Arc::clone(&store),
1929 None,
1930 vec!["Thing".to_string()],
1931 vec![(
1932 "name".to_string(),
1933 PropertySource::Constant(Value::String("ok".into())),
1934 )],
1935 vec![LogicalType::Int64],
1936 0,
1937 )
1938 .with_validator(Arc::new(RejectAgeValidator));
1939
1940 assert!(op.next().is_ok());
1941 assert_eq!(store.node_count(), 1);
1942
1943 let mut op = CreateNodeOperator::new(
1945 Arc::clone(&store),
1946 None,
1947 vec!["Thing".to_string()],
1948 vec![(
1949 "forbidden".to_string(),
1950 PropertySource::Constant(Value::Int64(1)),
1951 )],
1952 vec![LogicalType::Int64],
1953 0,
1954 )
1955 .with_validator(Arc::new(RejectAgeValidator));
1956
1957 let err = op.next().unwrap_err();
1958 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
1959 }
1962
1963 #[test]
1966 fn test_create_node_reset_allows_re_execution() {
1967 let store = create_test_store();
1968
1969 let mut op = CreateNodeOperator::new(
1970 Arc::clone(&store),
1971 None,
1972 vec!["Person".to_string()],
1973 vec![],
1974 vec![LogicalType::Int64],
1975 0,
1976 );
1977
1978 assert!(op.next().unwrap().is_some());
1980 assert!(op.next().unwrap().is_none());
1981
1982 op.reset();
1984 assert!(op.next().unwrap().is_some());
1985
1986 assert_eq!(store.node_count(), 2);
1987 }
1988
1989 #[test]
1992 fn test_operator_names() {
1993 let store = create_test_store();
1994
1995 let op = CreateNodeOperator::new(
1996 Arc::clone(&store),
1997 None,
1998 vec![],
1999 vec![],
2000 vec![LogicalType::Int64],
2001 0,
2002 );
2003 assert_eq!(op.name(), "CreateNode");
2004
2005 let op = CreateEdgeOperator::new(
2006 Arc::clone(&store),
2007 Box::new(EmptyInput),
2008 0,
2009 1,
2010 "R".to_string(),
2011 vec![LogicalType::Int64],
2012 );
2013 assert_eq!(op.name(), "CreateEdge");
2014
2015 let op = DeleteNodeOperator::new(
2016 Arc::clone(&store),
2017 Box::new(EmptyInput),
2018 0,
2019 vec![LogicalType::Int64],
2020 false,
2021 );
2022 assert_eq!(op.name(), "DeleteNode");
2023
2024 let op = DeleteEdgeOperator::new(
2025 Arc::clone(&store),
2026 Box::new(EmptyInput),
2027 0,
2028 vec![LogicalType::Int64],
2029 );
2030 assert_eq!(op.name(), "DeleteEdge");
2031
2032 let op = AddLabelOperator::new(
2033 Arc::clone(&store),
2034 Box::new(EmptyInput),
2035 0,
2036 vec!["L".to_string()],
2037 vec![LogicalType::Int64],
2038 );
2039 assert_eq!(op.name(), "AddLabel");
2040
2041 let op = RemoveLabelOperator::new(
2042 Arc::clone(&store),
2043 Box::new(EmptyInput),
2044 0,
2045 vec!["L".to_string()],
2046 vec![LogicalType::Int64],
2047 );
2048 assert_eq!(op.name(), "RemoveLabel");
2049
2050 let op = SetPropertyOperator::new_for_node(
2051 Arc::clone(&store),
2052 Box::new(EmptyInput),
2053 0,
2054 vec![],
2055 vec![LogicalType::Int64],
2056 );
2057 assert_eq!(op.name(), "SetProperty");
2058 }
2059}