1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
28 &self,
29 labels: &[String],
30 key: &str,
31 value: &Value,
32 ) -> Result<(), OperatorError>;
33
34 fn validate_node_complete(
38 &self,
39 labels: &[String],
40 properties: &[(String, Value)],
41 ) -> Result<(), OperatorError>;
42
43 fn check_unique_node_property(
47 &self,
48 labels: &[String],
49 key: &str,
50 value: &Value,
51 ) -> Result<(), OperatorError>;
52
53 fn validate_edge_property(
55 &self,
56 edge_type: &str,
57 key: &str,
58 value: &Value,
59 ) -> Result<(), OperatorError>;
60
61 fn validate_edge_complete(
63 &self,
64 edge_type: &str,
65 properties: &[(String, Value)],
66 ) -> Result<(), OperatorError>;
67}
68
69pub struct CreateNodeOperator {
74 store: Arc<dyn GraphStoreMut>,
76 input: Option<Box<dyn Operator>>,
78 labels: Vec<String>,
80 properties: Vec<(String, PropertySource)>,
82 output_schema: Vec<LogicalType>,
84 output_column: usize,
86 executed: bool,
88 viewing_epoch: Option<EpochId>,
90 transaction_id: Option<TransactionId>,
92 validator: Option<Arc<dyn ConstraintValidator>>,
94}
95
96#[derive(Debug, Clone)]
98pub enum PropertySource {
99 Column(usize),
101 Constant(Value),
103 PropertyAccess {
105 column: usize,
107 property: String,
109 },
110}
111
112impl PropertySource {
113 pub fn resolve(
115 &self,
116 chunk: &crate::execution::chunk::DataChunk,
117 row: usize,
118 store: &dyn GraphStore,
119 ) -> Value {
120 match self {
121 PropertySource::Column(col_idx) => chunk
122 .column(*col_idx)
123 .and_then(|c| c.get_value(row))
124 .unwrap_or(Value::Null),
125 PropertySource::Constant(v) => v.clone(),
126 PropertySource::PropertyAccess { column, property } => {
127 let Some(col) = chunk.column(*column) else {
128 return Value::Null;
129 };
130 if let Some(node_id) = col.get_node_id(row) {
132 store
133 .get_node(node_id)
134 .and_then(|node| node.get_property(property).cloned())
135 .unwrap_or(Value::Null)
136 } else if let Some(edge_id) = col.get_edge_id(row) {
137 store
138 .get_edge(edge_id)
139 .and_then(|edge| edge.get_property(property).cloned())
140 .unwrap_or(Value::Null)
141 } else if let Some(Value::Map(map)) = col.get_value(row) {
142 let key = PropertyKey::new(property);
143 map.get(&key).cloned().unwrap_or(Value::Null)
144 } else {
145 Value::Null
146 }
147 }
148 }
149 }
150}
151
152impl CreateNodeOperator {
153 pub fn new(
163 store: Arc<dyn GraphStoreMut>,
164 input: Option<Box<dyn Operator>>,
165 labels: Vec<String>,
166 properties: Vec<(String, PropertySource)>,
167 output_schema: Vec<LogicalType>,
168 output_column: usize,
169 ) -> Self {
170 Self {
171 store,
172 input,
173 labels,
174 properties,
175 output_schema,
176 output_column,
177 executed: false,
178 viewing_epoch: None,
179 transaction_id: None,
180 validator: None,
181 }
182 }
183
184 pub fn with_transaction_context(
186 mut self,
187 epoch: EpochId,
188 transaction_id: Option<TransactionId>,
189 ) -> Self {
190 self.viewing_epoch = Some(epoch);
191 self.transaction_id = transaction_id;
192 self
193 }
194
195 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
197 self.validator = Some(validator);
198 self
199 }
200}
201
202impl CreateNodeOperator {
203 fn validate_and_set_properties(
205 &self,
206 node_id: NodeId,
207 resolved_props: &[(String, Value)],
208 ) -> Result<(), OperatorError> {
209 if let Some(ref validator) = self.validator {
211 for (name, value) in resolved_props {
212 validator.validate_node_property(&self.labels, name, value)?;
213 validator.check_unique_node_property(&self.labels, name, value)?;
214 }
215 validator.validate_node_complete(&self.labels, resolved_props)?;
217 }
218
219 for (name, value) in resolved_props {
221 self.store.set_node_property(node_id, name, value.clone());
222 }
223 Ok(())
224 }
225}
226
227impl Operator for CreateNodeOperator {
228 fn next(&mut self) -> OperatorResult {
229 let epoch = self
231 .viewing_epoch
232 .unwrap_or_else(|| self.store.current_epoch());
233 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
234
235 if let Some(ref mut input) = self.input {
236 if let Some(chunk) = input.next()? {
238 let mut builder =
239 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
240
241 for row in chunk.selected_indices() {
242 let resolved_props: Vec<(String, Value)> = self
244 .properties
245 .iter()
246 .map(|(name, source)| {
247 let value =
248 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
249 (name.clone(), value)
250 })
251 .collect();
252
253 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
255 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
256
257 self.validate_and_set_properties(node_id, &resolved_props)?;
259
260 for col_idx in 0..chunk.column_count() {
262 if col_idx < self.output_column
263 && let (Some(src), Some(dst)) =
264 (chunk.column(col_idx), builder.column_mut(col_idx))
265 {
266 if let Some(val) = src.get_value(row) {
267 dst.push_value(val);
268 } else {
269 dst.push_value(Value::Null);
270 }
271 }
272 }
273
274 if let Some(dst) = builder.column_mut(self.output_column) {
276 dst.push_value(Value::Int64(node_id.0 as i64));
277 }
278
279 builder.advance_row();
280 }
281
282 return Ok(Some(builder.finish()));
283 }
284 Ok(None)
285 } else {
286 if self.executed {
288 return Ok(None);
289 }
290 self.executed = true;
291
292 let resolved_props: Vec<(String, Value)> = self
294 .properties
295 .iter()
296 .filter_map(|(name, source)| {
297 if let PropertySource::Constant(value) = source {
298 Some((name.clone(), value.clone()))
299 } else {
300 None
301 }
302 })
303 .collect();
304
305 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
307 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
308
309 self.validate_and_set_properties(node_id, &resolved_props)?;
311
312 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
314 if let Some(dst) = builder.column_mut(self.output_column) {
315 dst.push_value(Value::Int64(node_id.0 as i64));
316 }
317 builder.advance_row();
318
319 Ok(Some(builder.finish()))
320 }
321 }
322
323 fn reset(&mut self) {
324 if let Some(ref mut input) = self.input {
325 input.reset();
326 }
327 self.executed = false;
328 }
329
330 fn name(&self) -> &'static str {
331 "CreateNode"
332 }
333}
334
335pub struct CreateEdgeOperator {
337 store: Arc<dyn GraphStoreMut>,
339 input: Box<dyn Operator>,
341 from_column: usize,
343 to_column: usize,
345 edge_type: String,
347 properties: Vec<(String, PropertySource)>,
349 output_schema: Vec<LogicalType>,
351 output_column: Option<usize>,
353 viewing_epoch: Option<EpochId>,
355 transaction_id: Option<TransactionId>,
357 validator: Option<Arc<dyn ConstraintValidator>>,
359}
360
361impl CreateEdgeOperator {
362 pub fn new(
369 store: Arc<dyn GraphStoreMut>,
370 input: Box<dyn Operator>,
371 from_column: usize,
372 to_column: usize,
373 edge_type: String,
374 output_schema: Vec<LogicalType>,
375 ) -> Self {
376 Self {
377 store,
378 input,
379 from_column,
380 to_column,
381 edge_type,
382 properties: Vec::new(),
383 output_schema,
384 output_column: None,
385 viewing_epoch: None,
386 transaction_id: None,
387 validator: None,
388 }
389 }
390
391 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
393 self.properties = properties;
394 self
395 }
396
397 pub fn with_output_column(mut self, column: usize) -> Self {
399 self.output_column = Some(column);
400 self
401 }
402
403 pub fn with_transaction_context(
405 mut self,
406 epoch: EpochId,
407 transaction_id: Option<TransactionId>,
408 ) -> Self {
409 self.viewing_epoch = Some(epoch);
410 self.transaction_id = transaction_id;
411 self
412 }
413
414 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
416 self.validator = Some(validator);
417 self
418 }
419}
420
421impl Operator for CreateEdgeOperator {
422 fn next(&mut self) -> OperatorResult {
423 let epoch = self
425 .viewing_epoch
426 .unwrap_or_else(|| self.store.current_epoch());
427 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
428
429 if let Some(chunk) = self.input.next()? {
430 let mut builder =
431 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
432
433 for row in chunk.selected_indices() {
434 let from_id = chunk
436 .column(self.from_column)
437 .and_then(|c| c.get_value(row))
438 .ok_or_else(|| {
439 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
440 })?;
441
442 let to_id = chunk
443 .column(self.to_column)
444 .and_then(|c| c.get_value(row))
445 .ok_or_else(|| {
446 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
447 })?;
448
449 let from_node_id = match from_id {
451 Value::Int64(id) => NodeId(id as u64),
452 _ => {
453 return Err(OperatorError::TypeMismatch {
454 expected: "Int64 (node ID)".to_string(),
455 found: format!("{from_id:?}"),
456 });
457 }
458 };
459
460 let to_node_id = match to_id {
461 Value::Int64(id) => NodeId(id as u64),
462 _ => {
463 return Err(OperatorError::TypeMismatch {
464 expected: "Int64 (node ID)".to_string(),
465 found: format!("{to_id:?}"),
466 });
467 }
468 };
469
470 let resolved_props: Vec<(String, Value)> = self
472 .properties
473 .iter()
474 .map(|(name, source)| {
475 let value =
476 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
477 (name.clone(), value)
478 })
479 .collect();
480
481 if let Some(ref validator) = self.validator {
483 for (name, value) in &resolved_props {
484 validator.validate_edge_property(&self.edge_type, name, value)?;
485 }
486 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
487 }
488
489 let edge_id = self.store.create_edge_versioned(
491 from_node_id,
492 to_node_id,
493 &self.edge_type,
494 epoch,
495 tx,
496 );
497
498 for (name, value) in resolved_props {
500 self.store.set_edge_property(edge_id, &name, value);
501 }
502
503 for col_idx in 0..chunk.column_count() {
505 if let (Some(src), Some(dst)) =
506 (chunk.column(col_idx), builder.column_mut(col_idx))
507 {
508 if let Some(val) = src.get_value(row) {
509 dst.push_value(val);
510 } else {
511 dst.push_value(Value::Null);
512 }
513 }
514 }
515
516 if let Some(out_col) = self.output_column
518 && let Some(dst) = builder.column_mut(out_col)
519 {
520 dst.push_value(Value::Int64(edge_id.0 as i64));
521 }
522
523 builder.advance_row();
524 }
525
526 return Ok(Some(builder.finish()));
527 }
528 Ok(None)
529 }
530
531 fn reset(&mut self) {
532 self.input.reset();
533 }
534
535 fn name(&self) -> &'static str {
536 "CreateEdge"
537 }
538}
539
540pub struct DeleteNodeOperator {
542 store: Arc<dyn GraphStoreMut>,
544 input: Box<dyn Operator>,
546 node_column: usize,
548 output_schema: Vec<LogicalType>,
550 detach: bool,
552 viewing_epoch: Option<EpochId>,
554 transaction_id: Option<TransactionId>,
556}
557
558impl DeleteNodeOperator {
559 pub fn new(
561 store: Arc<dyn GraphStoreMut>,
562 input: Box<dyn Operator>,
563 node_column: usize,
564 output_schema: Vec<LogicalType>,
565 detach: bool,
566 ) -> Self {
567 Self {
568 store,
569 input,
570 node_column,
571 output_schema,
572 detach,
573 viewing_epoch: None,
574 transaction_id: None,
575 }
576 }
577
578 pub fn with_transaction_context(
580 mut self,
581 epoch: EpochId,
582 transaction_id: Option<TransactionId>,
583 ) -> Self {
584 self.viewing_epoch = Some(epoch);
585 self.transaction_id = transaction_id;
586 self
587 }
588}
589
590impl Operator for DeleteNodeOperator {
591 fn next(&mut self) -> OperatorResult {
592 let epoch = self
594 .viewing_epoch
595 .unwrap_or_else(|| self.store.current_epoch());
596 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
597
598 if let Some(chunk) = self.input.next()? {
599 let mut builder =
600 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
601
602 for row in chunk.selected_indices() {
603 let node_val = chunk
604 .column(self.node_column)
605 .and_then(|c| c.get_value(row))
606 .ok_or_else(|| {
607 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
608 })?;
609
610 let node_id = match node_val {
611 Value::Int64(id) => NodeId(id as u64),
612 _ => {
613 return Err(OperatorError::TypeMismatch {
614 expected: "Int64 (node ID)".to_string(),
615 found: format!("{node_val:?}"),
616 });
617 }
618 };
619
620 if self.detach {
621 self.store.delete_node_edges(node_id);
623 } else {
624 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
626 if degree > 0 {
627 return Err(OperatorError::ConstraintViolation(format!(
628 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
629 degree
630 )));
631 }
632 }
633
634 self.store.delete_node_versioned(node_id, epoch, tx);
636
637 for col_idx in 0..chunk.column_count() {
640 if let (Some(src), Some(dst)) =
641 (chunk.column(col_idx), builder.column_mut(col_idx))
642 {
643 if let Some(val) = src.get_value(row) {
644 dst.push_value(val);
645 } else {
646 dst.push_value(Value::Null);
647 }
648 }
649 }
650 builder.advance_row();
651 }
652
653 return Ok(Some(builder.finish()));
654 }
655 Ok(None)
656 }
657
658 fn reset(&mut self) {
659 self.input.reset();
660 }
661
662 fn name(&self) -> &'static str {
663 "DeleteNode"
664 }
665}
666
667pub struct DeleteEdgeOperator {
669 store: Arc<dyn GraphStoreMut>,
671 input: Box<dyn Operator>,
673 edge_column: usize,
675 output_schema: Vec<LogicalType>,
677 viewing_epoch: Option<EpochId>,
679 transaction_id: Option<TransactionId>,
681}
682
683impl DeleteEdgeOperator {
684 pub fn new(
686 store: Arc<dyn GraphStoreMut>,
687 input: Box<dyn Operator>,
688 edge_column: usize,
689 output_schema: Vec<LogicalType>,
690 ) -> Self {
691 Self {
692 store,
693 input,
694 edge_column,
695 output_schema,
696 viewing_epoch: None,
697 transaction_id: None,
698 }
699 }
700
701 pub fn with_transaction_context(
703 mut self,
704 epoch: EpochId,
705 transaction_id: Option<TransactionId>,
706 ) -> Self {
707 self.viewing_epoch = Some(epoch);
708 self.transaction_id = transaction_id;
709 self
710 }
711}
712
713impl Operator for DeleteEdgeOperator {
714 fn next(&mut self) -> OperatorResult {
715 let epoch = self
717 .viewing_epoch
718 .unwrap_or_else(|| self.store.current_epoch());
719 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
720
721 if let Some(chunk) = self.input.next()? {
722 let mut builder =
723 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
724
725 for row in chunk.selected_indices() {
726 let edge_val = chunk
727 .column(self.edge_column)
728 .and_then(|c| c.get_value(row))
729 .ok_or_else(|| {
730 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
731 })?;
732
733 let edge_id = match edge_val {
734 Value::Int64(id) => EdgeId(id as u64),
735 _ => {
736 return Err(OperatorError::TypeMismatch {
737 expected: "Int64 (edge ID)".to_string(),
738 found: format!("{edge_val:?}"),
739 });
740 }
741 };
742
743 self.store.delete_edge_versioned(edge_id, epoch, tx);
745
746 for col_idx in 0..chunk.column_count() {
748 if let (Some(src), Some(dst)) =
749 (chunk.column(col_idx), builder.column_mut(col_idx))
750 {
751 if let Some(val) = src.get_value(row) {
752 dst.push_value(val);
753 } else {
754 dst.push_value(Value::Null);
755 }
756 }
757 }
758 builder.advance_row();
759 }
760
761 return Ok(Some(builder.finish()));
762 }
763 Ok(None)
764 }
765
766 fn reset(&mut self) {
767 self.input.reset();
768 }
769
770 fn name(&self) -> &'static str {
771 "DeleteEdge"
772 }
773}
774
775pub struct AddLabelOperator {
777 store: Arc<dyn GraphStoreMut>,
779 input: Box<dyn Operator>,
781 node_column: usize,
783 labels: Vec<String>,
785 output_schema: Vec<LogicalType>,
787 viewing_epoch: Option<EpochId>,
789 transaction_id: Option<TransactionId>,
791}
792
793impl AddLabelOperator {
794 pub fn new(
796 store: Arc<dyn GraphStoreMut>,
797 input: Box<dyn Operator>,
798 node_column: usize,
799 labels: Vec<String>,
800 output_schema: Vec<LogicalType>,
801 ) -> Self {
802 Self {
803 store,
804 input,
805 node_column,
806 labels,
807 output_schema,
808 viewing_epoch: None,
809 transaction_id: None,
810 }
811 }
812
813 pub fn with_transaction_context(
815 mut self,
816 epoch: EpochId,
817 transaction_id: Option<TransactionId>,
818 ) -> Self {
819 self.viewing_epoch = Some(epoch);
820 self.transaction_id = transaction_id;
821 self
822 }
823}
824
825impl Operator for AddLabelOperator {
826 fn next(&mut self) -> OperatorResult {
827 if let Some(chunk) = self.input.next()? {
828 let mut updated_count = 0;
829
830 for row in chunk.selected_indices() {
831 let node_val = chunk
832 .column(self.node_column)
833 .and_then(|c| c.get_value(row))
834 .ok_or_else(|| {
835 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
836 })?;
837
838 let node_id = match node_val {
839 Value::Int64(id) => NodeId(id as u64),
840 _ => {
841 return Err(OperatorError::TypeMismatch {
842 expected: "Int64 (node ID)".to_string(),
843 found: format!("{node_val:?}"),
844 });
845 }
846 };
847
848 for label in &self.labels {
850 let added = if let Some(tid) = self.transaction_id {
851 self.store.add_label_versioned(node_id, label, tid)
852 } else {
853 self.store.add_label(node_id, label)
854 };
855 if added {
856 updated_count += 1;
857 }
858 }
859 }
860
861 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
863 if let Some(dst) = builder.column_mut(0) {
864 dst.push_value(Value::Int64(updated_count));
865 }
866 builder.advance_row();
867
868 return Ok(Some(builder.finish()));
869 }
870 Ok(None)
871 }
872
873 fn reset(&mut self) {
874 self.input.reset();
875 }
876
877 fn name(&self) -> &'static str {
878 "AddLabel"
879 }
880}
881
882pub struct RemoveLabelOperator {
884 store: Arc<dyn GraphStoreMut>,
886 input: Box<dyn Operator>,
888 node_column: usize,
890 labels: Vec<String>,
892 output_schema: Vec<LogicalType>,
894 viewing_epoch: Option<EpochId>,
896 transaction_id: Option<TransactionId>,
898}
899
900impl RemoveLabelOperator {
901 pub fn new(
903 store: Arc<dyn GraphStoreMut>,
904 input: Box<dyn Operator>,
905 node_column: usize,
906 labels: Vec<String>,
907 output_schema: Vec<LogicalType>,
908 ) -> Self {
909 Self {
910 store,
911 input,
912 node_column,
913 labels,
914 output_schema,
915 viewing_epoch: None,
916 transaction_id: None,
917 }
918 }
919
920 pub fn with_transaction_context(
922 mut self,
923 epoch: EpochId,
924 transaction_id: Option<TransactionId>,
925 ) -> Self {
926 self.viewing_epoch = Some(epoch);
927 self.transaction_id = transaction_id;
928 self
929 }
930}
931
932impl Operator for RemoveLabelOperator {
933 fn next(&mut self) -> OperatorResult {
934 if let Some(chunk) = self.input.next()? {
935 let mut updated_count = 0;
936
937 for row in chunk.selected_indices() {
938 let node_val = chunk
939 .column(self.node_column)
940 .and_then(|c| c.get_value(row))
941 .ok_or_else(|| {
942 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
943 })?;
944
945 let node_id = match node_val {
946 Value::Int64(id) => NodeId(id as u64),
947 _ => {
948 return Err(OperatorError::TypeMismatch {
949 expected: "Int64 (node ID)".to_string(),
950 found: format!("{node_val:?}"),
951 });
952 }
953 };
954
955 for label in &self.labels {
957 let removed = if let Some(tid) = self.transaction_id {
958 self.store.remove_label_versioned(node_id, label, tid)
959 } else {
960 self.store.remove_label(node_id, label)
961 };
962 if removed {
963 updated_count += 1;
964 }
965 }
966 }
967
968 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
970 if let Some(dst) = builder.column_mut(0) {
971 dst.push_value(Value::Int64(updated_count));
972 }
973 builder.advance_row();
974
975 return Ok(Some(builder.finish()));
976 }
977 Ok(None)
978 }
979
980 fn reset(&mut self) {
981 self.input.reset();
982 }
983
984 fn name(&self) -> &'static str {
985 "RemoveLabel"
986 }
987}
988
989pub struct SetPropertyOperator {
994 store: Arc<dyn GraphStoreMut>,
996 input: Box<dyn Operator>,
998 entity_column: usize,
1000 is_edge: bool,
1002 properties: Vec<(String, PropertySource)>,
1004 output_schema: Vec<LogicalType>,
1006 replace: bool,
1008 validator: Option<Arc<dyn ConstraintValidator>>,
1010 labels: Vec<String>,
1012 edge_type_name: Option<String>,
1014 viewing_epoch: Option<EpochId>,
1016 transaction_id: Option<TransactionId>,
1018}
1019
1020impl SetPropertyOperator {
1021 pub fn new_for_node(
1023 store: Arc<dyn GraphStoreMut>,
1024 input: Box<dyn Operator>,
1025 node_column: usize,
1026 properties: Vec<(String, PropertySource)>,
1027 output_schema: Vec<LogicalType>,
1028 ) -> Self {
1029 Self {
1030 store,
1031 input,
1032 entity_column: node_column,
1033 is_edge: false,
1034 properties,
1035 output_schema,
1036 replace: false,
1037 validator: None,
1038 labels: Vec::new(),
1039 edge_type_name: None,
1040 viewing_epoch: None,
1041 transaction_id: None,
1042 }
1043 }
1044
1045 pub fn new_for_edge(
1047 store: Arc<dyn GraphStoreMut>,
1048 input: Box<dyn Operator>,
1049 edge_column: usize,
1050 properties: Vec<(String, PropertySource)>,
1051 output_schema: Vec<LogicalType>,
1052 ) -> Self {
1053 Self {
1054 store,
1055 input,
1056 entity_column: edge_column,
1057 is_edge: true,
1058 properties,
1059 output_schema,
1060 replace: false,
1061 validator: None,
1062 labels: Vec::new(),
1063 edge_type_name: None,
1064 viewing_epoch: None,
1065 transaction_id: None,
1066 }
1067 }
1068
1069 pub fn with_replace(mut self, replace: bool) -> Self {
1071 self.replace = replace;
1072 self
1073 }
1074
1075 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1077 self.validator = Some(validator);
1078 self
1079 }
1080
1081 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1083 self.labels = labels;
1084 self
1085 }
1086
1087 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1089 self.edge_type_name = Some(edge_type);
1090 self
1091 }
1092
1093 pub fn with_transaction_context(
1098 mut self,
1099 epoch: EpochId,
1100 transaction_id: Option<TransactionId>,
1101 ) -> Self {
1102 self.viewing_epoch = Some(epoch);
1103 self.transaction_id = transaction_id;
1104 self
1105 }
1106}
1107
1108impl Operator for SetPropertyOperator {
1109 fn next(&mut self) -> OperatorResult {
1110 if let Some(chunk) = self.input.next()? {
1111 let mut builder =
1112 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1113
1114 for row in chunk.selected_indices() {
1115 let entity_val = chunk
1116 .column(self.entity_column)
1117 .and_then(|c| c.get_value(row))
1118 .ok_or_else(|| {
1119 OperatorError::ColumnNotFound(format!(
1120 "entity column {}",
1121 self.entity_column
1122 ))
1123 })?;
1124
1125 let entity_id = match entity_val {
1126 Value::Int64(id) => id as u64,
1127 _ => {
1128 return Err(OperatorError::TypeMismatch {
1129 expected: "Int64 (entity ID)".to_string(),
1130 found: format!("{entity_val:?}"),
1131 });
1132 }
1133 };
1134
1135 let resolved_props: Vec<(String, Value)> = self
1137 .properties
1138 .iter()
1139 .map(|(name, source)| {
1140 let value =
1141 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1142 (name.clone(), value)
1143 })
1144 .collect();
1145
1146 if let Some(ref validator) = self.validator {
1148 if self.is_edge {
1149 if let Some(ref et) = self.edge_type_name {
1150 for (name, value) in &resolved_props {
1151 validator.validate_edge_property(et, name, value)?;
1152 }
1153 }
1154 } else {
1155 for (name, value) in &resolved_props {
1156 validator.validate_node_property(&self.labels, name, value)?;
1157 validator.check_unique_node_property(&self.labels, name, value)?;
1158 }
1159 }
1160 }
1161
1162 let tx_id = self.transaction_id;
1164 for (prop_name, value) in resolved_props {
1165 if prop_name == "*" {
1166 if let Value::Map(map) = value {
1168 if self.replace {
1169 if self.is_edge {
1171 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1172 let keys: Vec<String> = edge
1173 .properties
1174 .iter()
1175 .map(|(k, _)| k.as_str().to_string())
1176 .collect();
1177 for key in keys {
1178 if let Some(tid) = tx_id {
1179 self.store.remove_edge_property_versioned(
1180 EdgeId(entity_id),
1181 &key,
1182 tid,
1183 );
1184 } else {
1185 self.store
1186 .remove_edge_property(EdgeId(entity_id), &key);
1187 }
1188 }
1189 }
1190 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1191 let keys: Vec<String> = node
1192 .properties
1193 .iter()
1194 .map(|(k, _)| k.as_str().to_string())
1195 .collect();
1196 for key in keys {
1197 if let Some(tid) = tx_id {
1198 self.store.remove_node_property_versioned(
1199 NodeId(entity_id),
1200 &key,
1201 tid,
1202 );
1203 } else {
1204 self.store
1205 .remove_node_property(NodeId(entity_id), &key);
1206 }
1207 }
1208 }
1209 }
1210 for (key, val) in map.iter() {
1212 if self.is_edge {
1213 if let Some(tid) = tx_id {
1214 self.store.set_edge_property_versioned(
1215 EdgeId(entity_id),
1216 key.as_str(),
1217 val.clone(),
1218 tid,
1219 );
1220 } else {
1221 self.store.set_edge_property(
1222 EdgeId(entity_id),
1223 key.as_str(),
1224 val.clone(),
1225 );
1226 }
1227 } else if let Some(tid) = tx_id {
1228 self.store.set_node_property_versioned(
1229 NodeId(entity_id),
1230 key.as_str(),
1231 val.clone(),
1232 tid,
1233 );
1234 } else {
1235 self.store.set_node_property(
1236 NodeId(entity_id),
1237 key.as_str(),
1238 val.clone(),
1239 );
1240 }
1241 }
1242 }
1243 } else if self.is_edge {
1244 if let Some(tid) = tx_id {
1245 self.store.set_edge_property_versioned(
1246 EdgeId(entity_id),
1247 &prop_name,
1248 value,
1249 tid,
1250 );
1251 } else {
1252 self.store
1253 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1254 }
1255 } else if let Some(tid) = tx_id {
1256 self.store.set_node_property_versioned(
1257 NodeId(entity_id),
1258 &prop_name,
1259 value,
1260 tid,
1261 );
1262 } else {
1263 self.store
1264 .set_node_property(NodeId(entity_id), &prop_name, value);
1265 }
1266 }
1267
1268 for col_idx in 0..chunk.column_count() {
1270 if let (Some(src), Some(dst)) =
1271 (chunk.column(col_idx), builder.column_mut(col_idx))
1272 {
1273 if let Some(val) = src.get_value(row) {
1274 dst.push_value(val);
1275 } else {
1276 dst.push_value(Value::Null);
1277 }
1278 }
1279 }
1280
1281 builder.advance_row();
1282 }
1283
1284 return Ok(Some(builder.finish()));
1285 }
1286 Ok(None)
1287 }
1288
1289 fn reset(&mut self) {
1290 self.input.reset();
1291 }
1292
1293 fn name(&self) -> &'static str {
1294 "SetProperty"
1295 }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301 use crate::execution::DataChunk;
1302 use crate::execution::chunk::DataChunkBuilder;
1303 use crate::graph::lpg::LpgStore;
1304
1305 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1308 Arc::new(LpgStore::new().unwrap())
1309 }
1310
1311 struct MockInput {
1312 chunk: Option<DataChunk>,
1313 }
1314
1315 impl MockInput {
1316 fn boxed(chunk: DataChunk) -> Box<Self> {
1317 Box::new(Self { chunk: Some(chunk) })
1318 }
1319 }
1320
1321 impl Operator for MockInput {
1322 fn next(&mut self) -> OperatorResult {
1323 Ok(self.chunk.take())
1324 }
1325 fn reset(&mut self) {}
1326 fn name(&self) -> &'static str {
1327 "MockInput"
1328 }
1329 }
1330
1331 struct EmptyInput;
1332 impl Operator for EmptyInput {
1333 fn next(&mut self) -> OperatorResult {
1334 Ok(None)
1335 }
1336 fn reset(&mut self) {}
1337 fn name(&self) -> &'static str {
1338 "EmptyInput"
1339 }
1340 }
1341
1342 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1343 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1344 for id in ids {
1345 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1346 builder.advance_row();
1347 }
1348 builder.finish()
1349 }
1350
1351 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1352 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1353 for id in ids {
1354 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1355 builder.advance_row();
1356 }
1357 builder.finish()
1358 }
1359
1360 #[test]
1363 fn test_create_node_standalone() {
1364 let store = create_test_store();
1365
1366 let mut op = CreateNodeOperator::new(
1367 Arc::clone(&store),
1368 None,
1369 vec!["Person".to_string()],
1370 vec![(
1371 "name".to_string(),
1372 PropertySource::Constant(Value::String("Alix".into())),
1373 )],
1374 vec![LogicalType::Int64],
1375 0,
1376 );
1377
1378 let chunk = op.next().unwrap().unwrap();
1379 assert_eq!(chunk.row_count(), 1);
1380
1381 assert!(op.next().unwrap().is_none());
1383
1384 assert_eq!(store.node_count(), 1);
1385 }
1386
1387 #[test]
1388 fn test_create_edge() {
1389 let store = create_test_store();
1390
1391 let node1 = store.create_node(&["Person"]);
1392 let node2 = store.create_node(&["Person"]);
1393
1394 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1395 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1396 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1397 builder.advance_row();
1398
1399 let mut op = CreateEdgeOperator::new(
1400 Arc::clone(&store),
1401 MockInput::boxed(builder.finish()),
1402 0,
1403 1,
1404 "KNOWS".to_string(),
1405 vec![LogicalType::Int64, LogicalType::Int64],
1406 );
1407
1408 let _chunk = op.next().unwrap().unwrap();
1409 assert_eq!(store.edge_count(), 1);
1410 }
1411
1412 #[test]
1413 fn test_delete_node() {
1414 let store = create_test_store();
1415
1416 let node_id = store.create_node(&["Person"]);
1417 assert_eq!(store.node_count(), 1);
1418
1419 let mut op = DeleteNodeOperator::new(
1420 Arc::clone(&store),
1421 MockInput::boxed(node_id_chunk(&[node_id])),
1422 0,
1423 vec![LogicalType::Node],
1424 false,
1425 );
1426
1427 let chunk = op.next().unwrap().unwrap();
1428 assert_eq!(chunk.row_count(), 1);
1430 assert_eq!(store.node_count(), 0);
1431 }
1432
1433 #[test]
1436 fn test_delete_edge() {
1437 let store = create_test_store();
1438
1439 let n1 = store.create_node(&["Person"]);
1440 let n2 = store.create_node(&["Person"]);
1441 let eid = store.create_edge(n1, n2, "KNOWS");
1442 assert_eq!(store.edge_count(), 1);
1443
1444 let mut op = DeleteEdgeOperator::new(
1445 Arc::clone(&store),
1446 MockInput::boxed(edge_id_chunk(&[eid])),
1447 0,
1448 vec![LogicalType::Node],
1449 );
1450
1451 let chunk = op.next().unwrap().unwrap();
1452 assert_eq!(chunk.row_count(), 1);
1453 assert_eq!(store.edge_count(), 0);
1454 }
1455
1456 #[test]
1457 fn test_delete_edge_no_input_returns_none() {
1458 let store = create_test_store();
1459
1460 let mut op = DeleteEdgeOperator::new(
1461 Arc::clone(&store),
1462 Box::new(EmptyInput),
1463 0,
1464 vec![LogicalType::Int64],
1465 );
1466
1467 assert!(op.next().unwrap().is_none());
1468 }
1469
1470 #[test]
1471 fn test_delete_multiple_edges() {
1472 let store = create_test_store();
1473
1474 let n1 = store.create_node(&["N"]);
1475 let n2 = store.create_node(&["N"]);
1476 let e1 = store.create_edge(n1, n2, "R");
1477 let e2 = store.create_edge(n2, n1, "S");
1478 assert_eq!(store.edge_count(), 2);
1479
1480 let mut op = DeleteEdgeOperator::new(
1481 Arc::clone(&store),
1482 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1483 0,
1484 vec![LogicalType::Node],
1485 );
1486
1487 let chunk = op.next().unwrap().unwrap();
1488 assert_eq!(chunk.row_count(), 2);
1489 assert_eq!(store.edge_count(), 0);
1490 }
1491
1492 #[test]
1495 fn test_delete_node_detach() {
1496 let store = create_test_store();
1497
1498 let n1 = store.create_node(&["Person"]);
1499 let n2 = store.create_node(&["Person"]);
1500 store.create_edge(n1, n2, "KNOWS");
1501 store.create_edge(n2, n1, "FOLLOWS");
1502 assert_eq!(store.edge_count(), 2);
1503
1504 let mut op = DeleteNodeOperator::new(
1505 Arc::clone(&store),
1506 MockInput::boxed(node_id_chunk(&[n1])),
1507 0,
1508 vec![LogicalType::Node],
1509 true, );
1511
1512 let chunk = op.next().unwrap().unwrap();
1513 assert_eq!(chunk.row_count(), 1);
1514 assert_eq!(store.node_count(), 1);
1515 assert_eq!(store.edge_count(), 0); }
1517
1518 #[test]
1521 fn test_add_label() {
1522 let store = create_test_store();
1523
1524 let node = store.create_node(&["Person"]);
1525
1526 let mut op = AddLabelOperator::new(
1527 Arc::clone(&store),
1528 MockInput::boxed(node_id_chunk(&[node])),
1529 0,
1530 vec!["Employee".to_string()],
1531 vec![LogicalType::Int64],
1532 );
1533
1534 let chunk = op.next().unwrap().unwrap();
1535 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1536 assert_eq!(updated, 1);
1537
1538 let node_data = store.get_node(node).unwrap();
1540 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1541 assert!(labels.contains(&"Person"));
1542 assert!(labels.contains(&"Employee"));
1543 }
1544
1545 #[test]
1546 fn test_add_multiple_labels() {
1547 let store = create_test_store();
1548
1549 let node = store.create_node(&["Base"]);
1550
1551 let mut op = AddLabelOperator::new(
1552 Arc::clone(&store),
1553 MockInput::boxed(node_id_chunk(&[node])),
1554 0,
1555 vec!["LabelA".to_string(), "LabelB".to_string()],
1556 vec![LogicalType::Int64],
1557 );
1558
1559 let chunk = op.next().unwrap().unwrap();
1560 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1561 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1564 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1565 assert!(labels.contains(&"LabelA"));
1566 assert!(labels.contains(&"LabelB"));
1567 }
1568
1569 #[test]
1570 fn test_add_label_no_input_returns_none() {
1571 let store = create_test_store();
1572
1573 let mut op = AddLabelOperator::new(
1574 Arc::clone(&store),
1575 Box::new(EmptyInput),
1576 0,
1577 vec!["Foo".to_string()],
1578 vec![LogicalType::Int64],
1579 );
1580
1581 assert!(op.next().unwrap().is_none());
1582 }
1583
1584 #[test]
1587 fn test_remove_label() {
1588 let store = create_test_store();
1589
1590 let node = store.create_node(&["Person", "Employee"]);
1591
1592 let mut op = RemoveLabelOperator::new(
1593 Arc::clone(&store),
1594 MockInput::boxed(node_id_chunk(&[node])),
1595 0,
1596 vec!["Employee".to_string()],
1597 vec![LogicalType::Int64],
1598 );
1599
1600 let chunk = op.next().unwrap().unwrap();
1601 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1602 assert_eq!(updated, 1);
1603
1604 let node_data = store.get_node(node).unwrap();
1606 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1607 assert!(labels.contains(&"Person"));
1608 assert!(!labels.contains(&"Employee"));
1609 }
1610
1611 #[test]
1612 fn test_remove_nonexistent_label() {
1613 let store = create_test_store();
1614
1615 let node = store.create_node(&["Person"]);
1616
1617 let mut op = RemoveLabelOperator::new(
1618 Arc::clone(&store),
1619 MockInput::boxed(node_id_chunk(&[node])),
1620 0,
1621 vec!["NonExistent".to_string()],
1622 vec![LogicalType::Int64],
1623 );
1624
1625 let chunk = op.next().unwrap().unwrap();
1626 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1627 assert_eq!(updated, 0); }
1629
1630 #[test]
1633 fn test_set_node_property_constant() {
1634 let store = create_test_store();
1635
1636 let node = store.create_node(&["Person"]);
1637
1638 let mut op = SetPropertyOperator::new_for_node(
1639 Arc::clone(&store),
1640 MockInput::boxed(node_id_chunk(&[node])),
1641 0,
1642 vec![(
1643 "name".to_string(),
1644 PropertySource::Constant(Value::String("Alix".into())),
1645 )],
1646 vec![LogicalType::Int64],
1647 );
1648
1649 let chunk = op.next().unwrap().unwrap();
1650 assert_eq!(chunk.row_count(), 1);
1651
1652 let node_data = store.get_node(node).unwrap();
1654 assert_eq!(
1655 node_data
1656 .properties
1657 .get(&grafeo_common::types::PropertyKey::new("name")),
1658 Some(&Value::String("Alix".into()))
1659 );
1660 }
1661
1662 #[test]
1663 fn test_set_node_property_from_column() {
1664 let store = create_test_store();
1665
1666 let node = store.create_node(&["Person"]);
1667
1668 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1670 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1671 builder
1672 .column_mut(1)
1673 .unwrap()
1674 .push_value(Value::String("Gus".into()));
1675 builder.advance_row();
1676
1677 let mut op = SetPropertyOperator::new_for_node(
1678 Arc::clone(&store),
1679 MockInput::boxed(builder.finish()),
1680 0,
1681 vec![("name".to_string(), PropertySource::Column(1))],
1682 vec![LogicalType::Int64, LogicalType::String],
1683 );
1684
1685 let chunk = op.next().unwrap().unwrap();
1686 assert_eq!(chunk.row_count(), 1);
1687
1688 let node_data = store.get_node(node).unwrap();
1689 assert_eq!(
1690 node_data
1691 .properties
1692 .get(&grafeo_common::types::PropertyKey::new("name")),
1693 Some(&Value::String("Gus".into()))
1694 );
1695 }
1696
1697 #[test]
1698 fn test_set_edge_property() {
1699 let store = create_test_store();
1700
1701 let n1 = store.create_node(&["N"]);
1702 let n2 = store.create_node(&["N"]);
1703 let eid = store.create_edge(n1, n2, "KNOWS");
1704
1705 let mut op = SetPropertyOperator::new_for_edge(
1706 Arc::clone(&store),
1707 MockInput::boxed(edge_id_chunk(&[eid])),
1708 0,
1709 vec![(
1710 "weight".to_string(),
1711 PropertySource::Constant(Value::Float64(0.75)),
1712 )],
1713 vec![LogicalType::Int64],
1714 );
1715
1716 let chunk = op.next().unwrap().unwrap();
1717 assert_eq!(chunk.row_count(), 1);
1718
1719 let edge_data = store.get_edge(eid).unwrap();
1720 assert_eq!(
1721 edge_data
1722 .properties
1723 .get(&grafeo_common::types::PropertyKey::new("weight")),
1724 Some(&Value::Float64(0.75))
1725 );
1726 }
1727
1728 #[test]
1729 fn test_set_multiple_properties() {
1730 let store = create_test_store();
1731
1732 let node = store.create_node(&["Person"]);
1733
1734 let mut op = SetPropertyOperator::new_for_node(
1735 Arc::clone(&store),
1736 MockInput::boxed(node_id_chunk(&[node])),
1737 0,
1738 vec![
1739 (
1740 "name".to_string(),
1741 PropertySource::Constant(Value::String("Alix".into())),
1742 ),
1743 (
1744 "age".to_string(),
1745 PropertySource::Constant(Value::Int64(30)),
1746 ),
1747 ],
1748 vec![LogicalType::Int64],
1749 );
1750
1751 op.next().unwrap().unwrap();
1752
1753 let node_data = store.get_node(node).unwrap();
1754 assert_eq!(
1755 node_data
1756 .properties
1757 .get(&grafeo_common::types::PropertyKey::new("name")),
1758 Some(&Value::String("Alix".into()))
1759 );
1760 assert_eq!(
1761 node_data
1762 .properties
1763 .get(&grafeo_common::types::PropertyKey::new("age")),
1764 Some(&Value::Int64(30))
1765 );
1766 }
1767
1768 #[test]
1769 fn test_set_property_no_input_returns_none() {
1770 let store = create_test_store();
1771
1772 let mut op = SetPropertyOperator::new_for_node(
1773 Arc::clone(&store),
1774 Box::new(EmptyInput),
1775 0,
1776 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1777 vec![LogicalType::Int64],
1778 );
1779
1780 assert!(op.next().unwrap().is_none());
1781 }
1782
1783 #[test]
1786 fn test_delete_node_without_detach_errors_when_edges_exist() {
1787 let store = create_test_store();
1788
1789 let n1 = store.create_node(&["Person"]);
1790 let n2 = store.create_node(&["Person"]);
1791 store.create_edge(n1, n2, "KNOWS");
1792
1793 let mut op = DeleteNodeOperator::new(
1794 Arc::clone(&store),
1795 MockInput::boxed(node_id_chunk(&[n1])),
1796 0,
1797 vec![LogicalType::Int64],
1798 false, );
1800
1801 let err = op.next().unwrap_err();
1802 match err {
1803 OperatorError::ConstraintViolation(msg) => {
1804 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1805 }
1806 other => panic!("expected ConstraintViolation, got {other:?}"),
1807 }
1808 assert_eq!(store.node_count(), 2);
1810 }
1811
1812 #[test]
1815 fn test_create_node_with_input_operator() {
1816 let store = create_test_store();
1817
1818 let existing = store.create_node(&["Seed"]);
1820
1821 let mut op = CreateNodeOperator::new(
1822 Arc::clone(&store),
1823 Some(MockInput::boxed(node_id_chunk(&[existing]))),
1824 vec!["Created".to_string()],
1825 vec![(
1826 "source".to_string(),
1827 PropertySource::Constant(Value::String("from_input".into())),
1828 )],
1829 vec![LogicalType::Int64, LogicalType::Int64], 1, );
1832
1833 let chunk = op.next().unwrap().unwrap();
1834 assert_eq!(chunk.row_count(), 1);
1835
1836 assert_eq!(store.node_count(), 2);
1838
1839 assert!(op.next().unwrap().is_none());
1841 }
1842
1843 #[test]
1846 fn test_create_edge_with_properties_and_output_column() {
1847 let store = create_test_store();
1848
1849 let n1 = store.create_node(&["Person"]);
1850 let n2 = store.create_node(&["Person"]);
1851
1852 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1853 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
1854 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
1855 builder.advance_row();
1856
1857 let mut op = CreateEdgeOperator::new(
1858 Arc::clone(&store),
1859 MockInput::boxed(builder.finish()),
1860 0,
1861 1,
1862 "KNOWS".to_string(),
1863 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
1864 )
1865 .with_properties(vec![(
1866 "since".to_string(),
1867 PropertySource::Constant(Value::Int64(2024)),
1868 )])
1869 .with_output_column(2);
1870
1871 let chunk = op.next().unwrap().unwrap();
1872 assert_eq!(chunk.row_count(), 1);
1873 assert_eq!(store.edge_count(), 1);
1874
1875 let edge_id_raw = chunk
1877 .column(2)
1878 .and_then(|c| c.get_int64(0))
1879 .expect("edge ID should be in output column 2");
1880 let edge_id = EdgeId(edge_id_raw as u64);
1881
1882 let edge = store.get_edge(edge_id).expect("edge should exist");
1884 assert_eq!(
1885 edge.properties
1886 .get(&grafeo_common::types::PropertyKey::new("since")),
1887 Some(&Value::Int64(2024))
1888 );
1889 }
1890
1891 #[test]
1894 fn test_set_property_map_replace() {
1895 use std::collections::BTreeMap;
1896
1897 let store = create_test_store();
1898
1899 let node = store.create_node(&["Person"]);
1900 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
1901
1902 let mut map = BTreeMap::new();
1903 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
1904
1905 let mut op = SetPropertyOperator::new_for_node(
1906 Arc::clone(&store),
1907 MockInput::boxed(node_id_chunk(&[node])),
1908 0,
1909 vec![(
1910 "*".to_string(),
1911 PropertySource::Constant(Value::Map(Arc::new(map))),
1912 )],
1913 vec![LogicalType::Int64],
1914 )
1915 .with_replace(true);
1916
1917 op.next().unwrap().unwrap();
1918
1919 let node_data = store.get_node(node).unwrap();
1920 assert!(
1922 node_data
1923 .properties
1924 .get(&PropertyKey::new("old_prop"))
1925 .is_none()
1926 );
1927 assert_eq!(
1929 node_data.properties.get(&PropertyKey::new("new_key")),
1930 Some(&Value::String("new_val".into()))
1931 );
1932 }
1933
1934 #[test]
1937 fn test_set_property_map_merge() {
1938 use std::collections::BTreeMap;
1939
1940 let store = create_test_store();
1941
1942 let node = store.create_node(&["Person"]);
1943 store.set_node_property(node, "existing", Value::Int64(42));
1944
1945 let mut map = BTreeMap::new();
1946 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
1947
1948 let mut op = SetPropertyOperator::new_for_node(
1949 Arc::clone(&store),
1950 MockInput::boxed(node_id_chunk(&[node])),
1951 0,
1952 vec![(
1953 "*".to_string(),
1954 PropertySource::Constant(Value::Map(Arc::new(map))),
1955 )],
1956 vec![LogicalType::Int64],
1957 ); op.next().unwrap().unwrap();
1960
1961 let node_data = store.get_node(node).unwrap();
1962 assert_eq!(
1964 node_data.properties.get(&PropertyKey::new("existing")),
1965 Some(&Value::Int64(42))
1966 );
1967 assert_eq!(
1969 node_data.properties.get(&PropertyKey::new("added")),
1970 Some(&Value::String("hello".into()))
1971 );
1972 }
1973
1974 #[test]
1977 fn test_property_source_property_access() {
1978 let store = create_test_store();
1979
1980 let source_node = store.create_node(&["Source"]);
1981 store.set_node_property(source_node, "name", Value::String("Alix".into()));
1982
1983 let target_node = store.create_node(&["Target"]);
1984
1985 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
1987 builder.column_mut(0).unwrap().push_node_id(source_node);
1988 builder
1989 .column_mut(1)
1990 .unwrap()
1991 .push_int64(target_node.0 as i64);
1992 builder.advance_row();
1993
1994 let mut op = SetPropertyOperator::new_for_node(
1995 Arc::clone(&store),
1996 MockInput::boxed(builder.finish()),
1997 1, vec![(
1999 "copied_name".to_string(),
2000 PropertySource::PropertyAccess {
2001 column: 0,
2002 property: "name".to_string(),
2003 },
2004 )],
2005 vec![LogicalType::Node, LogicalType::Int64],
2006 );
2007
2008 op.next().unwrap().unwrap();
2009
2010 let target_data = store.get_node(target_node).unwrap();
2011 assert_eq!(
2012 target_data.properties.get(&PropertyKey::new("copied_name")),
2013 Some(&Value::String("Alix".into()))
2014 );
2015 }
2016
2017 #[test]
2020 fn test_create_node_with_constraint_validator() {
2021 let store = create_test_store();
2022
2023 struct RejectAgeValidator;
2024 impl ConstraintValidator for RejectAgeValidator {
2025 fn validate_node_property(
2026 &self,
2027 _labels: &[String],
2028 key: &str,
2029 _value: &Value,
2030 ) -> Result<(), OperatorError> {
2031 if key == "forbidden" {
2032 return Err(OperatorError::ConstraintViolation(
2033 "property 'forbidden' is not allowed".to_string(),
2034 ));
2035 }
2036 Ok(())
2037 }
2038 fn validate_node_complete(
2039 &self,
2040 _labels: &[String],
2041 _properties: &[(String, Value)],
2042 ) -> Result<(), OperatorError> {
2043 Ok(())
2044 }
2045 fn check_unique_node_property(
2046 &self,
2047 _labels: &[String],
2048 _key: &str,
2049 _value: &Value,
2050 ) -> Result<(), OperatorError> {
2051 Ok(())
2052 }
2053 fn validate_edge_property(
2054 &self,
2055 _edge_type: &str,
2056 _key: &str,
2057 _value: &Value,
2058 ) -> Result<(), OperatorError> {
2059 Ok(())
2060 }
2061 fn validate_edge_complete(
2062 &self,
2063 _edge_type: &str,
2064 _properties: &[(String, Value)],
2065 ) -> Result<(), OperatorError> {
2066 Ok(())
2067 }
2068 }
2069
2070 let mut op = CreateNodeOperator::new(
2072 Arc::clone(&store),
2073 None,
2074 vec!["Thing".to_string()],
2075 vec![(
2076 "name".to_string(),
2077 PropertySource::Constant(Value::String("ok".into())),
2078 )],
2079 vec![LogicalType::Int64],
2080 0,
2081 )
2082 .with_validator(Arc::new(RejectAgeValidator));
2083
2084 assert!(op.next().is_ok());
2085 assert_eq!(store.node_count(), 1);
2086
2087 let mut op = CreateNodeOperator::new(
2089 Arc::clone(&store),
2090 None,
2091 vec!["Thing".to_string()],
2092 vec![(
2093 "forbidden".to_string(),
2094 PropertySource::Constant(Value::Int64(1)),
2095 )],
2096 vec![LogicalType::Int64],
2097 0,
2098 )
2099 .with_validator(Arc::new(RejectAgeValidator));
2100
2101 let err = op.next().unwrap_err();
2102 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2103 }
2106
2107 #[test]
2110 fn test_create_node_reset_allows_re_execution() {
2111 let store = create_test_store();
2112
2113 let mut op = CreateNodeOperator::new(
2114 Arc::clone(&store),
2115 None,
2116 vec!["Person".to_string()],
2117 vec![],
2118 vec![LogicalType::Int64],
2119 0,
2120 );
2121
2122 assert!(op.next().unwrap().is_some());
2124 assert!(op.next().unwrap().is_none());
2125
2126 op.reset();
2128 assert!(op.next().unwrap().is_some());
2129
2130 assert_eq!(store.node_count(), 2);
2131 }
2132
2133 #[test]
2136 fn test_operator_names() {
2137 let store = create_test_store();
2138
2139 let op = CreateNodeOperator::new(
2140 Arc::clone(&store),
2141 None,
2142 vec![],
2143 vec![],
2144 vec![LogicalType::Int64],
2145 0,
2146 );
2147 assert_eq!(op.name(), "CreateNode");
2148
2149 let op = CreateEdgeOperator::new(
2150 Arc::clone(&store),
2151 Box::new(EmptyInput),
2152 0,
2153 1,
2154 "R".to_string(),
2155 vec![LogicalType::Int64],
2156 );
2157 assert_eq!(op.name(), "CreateEdge");
2158
2159 let op = DeleteNodeOperator::new(
2160 Arc::clone(&store),
2161 Box::new(EmptyInput),
2162 0,
2163 vec![LogicalType::Int64],
2164 false,
2165 );
2166 assert_eq!(op.name(), "DeleteNode");
2167
2168 let op = DeleteEdgeOperator::new(
2169 Arc::clone(&store),
2170 Box::new(EmptyInput),
2171 0,
2172 vec![LogicalType::Int64],
2173 );
2174 assert_eq!(op.name(), "DeleteEdge");
2175
2176 let op = AddLabelOperator::new(
2177 Arc::clone(&store),
2178 Box::new(EmptyInput),
2179 0,
2180 vec!["L".to_string()],
2181 vec![LogicalType::Int64],
2182 );
2183 assert_eq!(op.name(), "AddLabel");
2184
2185 let op = RemoveLabelOperator::new(
2186 Arc::clone(&store),
2187 Box::new(EmptyInput),
2188 0,
2189 vec!["L".to_string()],
2190 vec![LogicalType::Int64],
2191 );
2192 assert_eq!(op.name(), "RemoveLabel");
2193
2194 let op = SetPropertyOperator::new_for_node(
2195 Arc::clone(&store),
2196 Box::new(EmptyInput),
2197 0,
2198 vec![],
2199 vec![LogicalType::Int64],
2200 );
2201 assert_eq!(op.name(), "SetProperty");
2202 }
2203}