1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{
13 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
14};
15
16use super::filter::FilterExpression;
17use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
18use crate::execution::chunk::DataChunkBuilder;
19use crate::graph::{GraphStore, GraphStoreMut};
20
21pub trait ConstraintValidator: Send + Sync {
26 fn validate_node_property(
34 &self,
35 labels: &[String],
36 key: &str,
37 value: &Value,
38 ) -> Result<(), OperatorError>;
39
40 fn validate_node_complete(
48 &self,
49 labels: &[String],
50 properties: &[(String, Value)],
51 ) -> Result<(), OperatorError>;
52
53 fn check_unique_node_property(
59 &self,
60 labels: &[String],
61 key: &str,
62 value: &Value,
63 ) -> Result<(), OperatorError>;
64
65 fn validate_edge_property(
71 &self,
72 edge_type: &str,
73 key: &str,
74 value: &Value,
75 ) -> Result<(), OperatorError>;
76
77 fn validate_edge_complete(
83 &self,
84 edge_type: &str,
85 properties: &[(String, Value)],
86 ) -> Result<(), OperatorError>;
87
88 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
94 let _ = labels;
95 Ok(())
96 }
97
98 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
104 let _ = edge_type;
105 Ok(())
106 }
107
108 fn validate_edge_endpoints(
114 &self,
115 edge_type: &str,
116 source_labels: &[String],
117 target_labels: &[String],
118 ) -> Result<(), OperatorError> {
119 let _ = (edge_type, source_labels, target_labels);
120 Ok(())
121 }
122
123 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
126 let _ = (labels, properties);
127 }
128}
129
130pub struct CreateNodeOperator {
135 store: Arc<dyn GraphStoreMut>,
137 input: Option<Box<dyn Operator>>,
139 labels: Vec<String>,
141 properties: Vec<(String, PropertySource)>,
143 output_schema: Vec<LogicalType>,
145 output_column: usize,
147 executed: bool,
149 viewing_epoch: Option<EpochId>,
151 transaction_id: Option<TransactionId>,
153 validator: Option<Arc<dyn ConstraintValidator>>,
155 write_tracker: Option<SharedWriteTracker>,
157}
158
159#[derive(Debug, Clone)]
161#[non_exhaustive]
162pub enum PropertySource {
163 Column(usize),
165 Constant(Value),
167 PropertyAccess {
169 column: usize,
171 property: String,
173 },
174 Expression {
184 expr: Box<FilterExpression>,
186 variable_columns: HashMap<String, usize>,
188 },
189}
190
191impl PropertySource {
192 pub fn resolve(
198 &self,
199 chunk: &crate::execution::chunk::DataChunk,
200 row: usize,
201 store: &dyn GraphStore,
202 ) -> Value {
203 match self {
204 PropertySource::Column(col_idx) => chunk
205 .column(*col_idx)
206 .and_then(|c| c.get_value(row))
207 .unwrap_or(Value::Null),
208 PropertySource::Constant(v) => v.clone(),
209 PropertySource::PropertyAccess { column, property } => {
210 let Some(col) = chunk.column(*column) else {
211 return Value::Null;
212 };
213 if let Some(node_id) = col.get_node_id(row) {
215 store
216 .get_node(node_id)
217 .and_then(|node| node.get_property(property).cloned())
218 .unwrap_or(Value::Null)
219 } else if let Some(edge_id) = col.get_edge_id(row) {
220 store
221 .get_edge(edge_id)
222 .and_then(|edge| edge.get_property(property).cloned())
223 .unwrap_or(Value::Null)
224 } else if let Some(Value::Map(map)) = col.get_value(row) {
225 let key = PropertyKey::new(property);
226 map.get(&key).cloned().unwrap_or(Value::Null)
227 } else {
228 Value::Null
229 }
230 }
231 PropertySource::Expression { .. } => Value::Null,
234 }
235 }
236}
237
238impl CreateNodeOperator {
239 pub fn new(
249 store: Arc<dyn GraphStoreMut>,
250 input: Option<Box<dyn Operator>>,
251 labels: Vec<String>,
252 properties: Vec<(String, PropertySource)>,
253 output_schema: Vec<LogicalType>,
254 output_column: usize,
255 ) -> Self {
256 Self {
257 store,
258 input,
259 labels,
260 properties,
261 output_schema,
262 output_column,
263 executed: false,
264 viewing_epoch: None,
265 transaction_id: None,
266 validator: None,
267 write_tracker: None,
268 }
269 }
270
271 pub fn with_transaction_context(
273 mut self,
274 epoch: EpochId,
275 transaction_id: Option<TransactionId>,
276 ) -> Self {
277 self.viewing_epoch = Some(epoch);
278 self.transaction_id = transaction_id;
279 self
280 }
281
282 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
284 self.validator = Some(validator);
285 self
286 }
287
288 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
290 self.write_tracker = Some(tracker);
291 self
292 }
293}
294
295impl CreateNodeOperator {
296 fn validate_and_set_properties(
298 &self,
299 node_id: NodeId,
300 resolved_props: &mut Vec<(String, Value)>,
301 ) -> Result<(), OperatorError> {
302 if let Some(ref validator) = self.validator {
304 validator.validate_node_labels_allowed(&self.labels)?;
305 }
306
307 if let Some(ref validator) = self.validator {
309 validator.inject_defaults(&self.labels, resolved_props);
310 }
311
312 if let Some(ref validator) = self.validator {
314 for (name, value) in resolved_props.iter() {
315 validator.validate_node_property(&self.labels, name, value)?;
316 validator.check_unique_node_property(&self.labels, name, value)?;
317 }
318 validator.validate_node_complete(&self.labels, resolved_props)?;
320 }
321
322 if let Some(tid) = self.transaction_id {
324 for (name, value) in resolved_props.iter() {
325 self.store
326 .set_node_property_versioned(node_id, name, value.clone(), tid);
327 }
328 } else {
329 for (name, value) in resolved_props.iter() {
330 self.store.set_node_property(node_id, name, value.clone());
331 }
332 }
333 Ok(())
334 }
335}
336
337impl Operator for CreateNodeOperator {
338 fn next(&mut self) -> OperatorResult {
339 let epoch = self
341 .viewing_epoch
342 .unwrap_or_else(|| self.store.current_epoch());
343 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
344
345 if let Some(ref mut input) = self.input {
346 if let Some(chunk) = input.next()? {
348 let mut builder =
349 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
350
351 for row in chunk.selected_indices() {
352 let mut resolved_props: Vec<(String, Value)> = self
354 .properties
355 .iter()
356 .map(|(name, source)| {
357 let value =
358 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
359 (name.clone(), value)
360 })
361 .collect();
362
363 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
365 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
366
367 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
369 tracker.record_node_write(tid, node_id)?;
370 }
371
372 self.validate_and_set_properties(node_id, &mut resolved_props)?;
374
375 for col_idx in 0..chunk.column_count() {
377 if col_idx < self.output_column
378 && let (Some(src), Some(dst)) =
379 (chunk.column(col_idx), builder.column_mut(col_idx))
380 {
381 if let Some(val) = src.get_value(row) {
382 dst.push_value(val);
383 } else {
384 dst.push_value(Value::Null);
385 }
386 }
387 }
388
389 if let Some(dst) = builder.column_mut(self.output_column) {
391 #[allow(clippy::cast_possible_wrap)]
393 #[allow(clippy::cast_possible_wrap)]
395 dst.push_value(Value::Int64(node_id.0 as i64));
396 }
397
398 builder.advance_row();
399 }
400
401 return Ok(Some(builder.finish()));
402 }
403 Ok(None)
404 } else {
405 if self.executed {
407 return Ok(None);
408 }
409 self.executed = true;
410
411 let mut resolved_props: Vec<(String, Value)> = self
413 .properties
414 .iter()
415 .filter_map(|(name, source)| {
416 if let PropertySource::Constant(value) = source {
417 Some((name.clone(), value.clone()))
418 } else {
419 None
420 }
421 })
422 .collect();
423
424 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
426 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
427
428 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
430 tracker.record_node_write(tid, node_id)?;
431 }
432
433 self.validate_and_set_properties(node_id, &mut resolved_props)?;
435
436 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
438 if let Some(dst) = builder.column_mut(self.output_column) {
439 #[allow(clippy::cast_possible_wrap)]
441 dst.push_value(Value::Int64(node_id.0 as i64));
442 }
443 builder.advance_row();
444
445 Ok(Some(builder.finish()))
446 }
447 }
448
449 fn reset(&mut self) {
450 if let Some(ref mut input) = self.input {
451 input.reset();
452 }
453 self.executed = false;
454 }
455
456 fn name(&self) -> &'static str {
457 "CreateNode"
458 }
459
460 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
461 self
462 }
463}
464
465pub struct CreateEdgeOperator {
467 store: Arc<dyn GraphStoreMut>,
469 input: Box<dyn Operator>,
471 from_column: usize,
473 to_column: usize,
475 edge_type: String,
477 properties: Vec<(String, PropertySource)>,
479 output_schema: Vec<LogicalType>,
481 output_column: Option<usize>,
483 viewing_epoch: Option<EpochId>,
485 transaction_id: Option<TransactionId>,
487 validator: Option<Arc<dyn ConstraintValidator>>,
489 write_tracker: Option<SharedWriteTracker>,
491}
492
493impl CreateEdgeOperator {
494 pub fn new(
501 store: Arc<dyn GraphStoreMut>,
502 input: Box<dyn Operator>,
503 from_column: usize,
504 to_column: usize,
505 edge_type: String,
506 output_schema: Vec<LogicalType>,
507 ) -> Self {
508 Self {
509 store,
510 input,
511 from_column,
512 to_column,
513 edge_type,
514 properties: Vec::new(),
515 output_schema,
516 output_column: None,
517 viewing_epoch: None,
518 transaction_id: None,
519 validator: None,
520 write_tracker: None,
521 }
522 }
523
524 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
526 self.properties = properties;
527 self
528 }
529
530 pub fn with_output_column(mut self, column: usize) -> Self {
532 self.output_column = Some(column);
533 self
534 }
535
536 pub fn with_transaction_context(
538 mut self,
539 epoch: EpochId,
540 transaction_id: Option<TransactionId>,
541 ) -> Self {
542 self.viewing_epoch = Some(epoch);
543 self.transaction_id = transaction_id;
544 self
545 }
546
547 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
549 self.validator = Some(validator);
550 self
551 }
552
553 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
555 self.write_tracker = Some(tracker);
556 self
557 }
558}
559
560impl Operator for CreateEdgeOperator {
561 fn next(&mut self) -> OperatorResult {
562 let epoch = self
564 .viewing_epoch
565 .unwrap_or_else(|| self.store.current_epoch());
566 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
567
568 if let Some(chunk) = self.input.next()? {
569 let mut builder =
570 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
571
572 for row in chunk.selected_indices() {
573 let from_id = chunk
575 .column(self.from_column)
576 .and_then(|c| c.get_value(row))
577 .ok_or_else(|| {
578 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
579 })?;
580
581 let to_id = chunk
582 .column(self.to_column)
583 .and_then(|c| c.get_value(row))
584 .ok_or_else(|| {
585 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
586 })?;
587
588 let from_node_id = match from_id {
590 #[allow(clippy::cast_sign_loss)]
592 Value::Int64(id) => NodeId(id as u64),
593 _ => {
594 return Err(OperatorError::TypeMismatch {
595 expected: "Int64 (node ID)".to_string(),
596 found: format!("{from_id:?}"),
597 });
598 }
599 };
600
601 let to_node_id = match to_id {
602 #[allow(clippy::cast_sign_loss)]
604 Value::Int64(id) => NodeId(id as u64),
605 _ => {
606 return Err(OperatorError::TypeMismatch {
607 expected: "Int64 (node ID)".to_string(),
608 found: format!("{to_id:?}"),
609 });
610 }
611 };
612
613 if let Some(ref validator) = self.validator {
615 validator.validate_edge_type_allowed(&self.edge_type)?;
616
617 let source_labels: Vec<String> = self
619 .store
620 .get_node(from_node_id)
621 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
622 .unwrap_or_default();
623 let target_labels: Vec<String> = self
624 .store
625 .get_node(to_node_id)
626 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
627 .unwrap_or_default();
628 validator.validate_edge_endpoints(
629 &self.edge_type,
630 &source_labels,
631 &target_labels,
632 )?;
633 }
634
635 let resolved_props: Vec<(String, Value)> = self
637 .properties
638 .iter()
639 .map(|(name, source)| {
640 let value =
641 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
642 (name.clone(), value)
643 })
644 .collect();
645
646 if let Some(ref validator) = self.validator {
648 for (name, value) in &resolved_props {
649 validator.validate_edge_property(&self.edge_type, name, value)?;
650 }
651 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
652 }
653
654 let edge_id = self.store.create_edge_versioned(
656 from_node_id,
657 to_node_id,
658 &self.edge_type,
659 epoch,
660 tx,
661 );
662
663 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
665 tracker.record_edge_write(tid, edge_id)?;
666 }
667
668 if let Some(tid) = self.transaction_id {
670 for (name, value) in resolved_props {
671 self.store
672 .set_edge_property_versioned(edge_id, &name, value, tid);
673 }
674 } else {
675 for (name, value) in resolved_props {
676 self.store.set_edge_property(edge_id, &name, value);
677 }
678 }
679
680 for col_idx in 0..chunk.column_count() {
682 if let (Some(src), Some(dst)) =
683 (chunk.column(col_idx), builder.column_mut(col_idx))
684 {
685 if let Some(val) = src.get_value(row) {
686 dst.push_value(val);
687 } else {
688 dst.push_value(Value::Null);
689 }
690 }
691 }
692
693 if let Some(out_col) = self.output_column
695 && let Some(dst) = builder.column_mut(out_col)
696 {
697 #[allow(clippy::cast_possible_wrap)]
699 dst.push_value(Value::Int64(edge_id.0 as i64));
700 }
701
702 builder.advance_row();
703 }
704
705 return Ok(Some(builder.finish()));
706 }
707 Ok(None)
708 }
709
710 fn reset(&mut self) {
711 self.input.reset();
712 }
713
714 fn name(&self) -> &'static str {
715 "CreateEdge"
716 }
717
718 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
719 self
720 }
721}
722
723pub struct DeleteNodeOperator {
725 store: Arc<dyn GraphStoreMut>,
727 input: Box<dyn Operator>,
729 node_column: usize,
731 output_schema: Vec<LogicalType>,
733 detach: bool,
735 viewing_epoch: Option<EpochId>,
737 transaction_id: Option<TransactionId>,
739 write_tracker: Option<SharedWriteTracker>,
741}
742
743impl DeleteNodeOperator {
744 pub fn new(
746 store: Arc<dyn GraphStoreMut>,
747 input: Box<dyn Operator>,
748 node_column: usize,
749 output_schema: Vec<LogicalType>,
750 detach: bool,
751 ) -> Self {
752 Self {
753 store,
754 input,
755 node_column,
756 output_schema,
757 detach,
758 viewing_epoch: None,
759 transaction_id: None,
760 write_tracker: None,
761 }
762 }
763
764 pub fn with_transaction_context(
766 mut self,
767 epoch: EpochId,
768 transaction_id: Option<TransactionId>,
769 ) -> Self {
770 self.viewing_epoch = Some(epoch);
771 self.transaction_id = transaction_id;
772 self
773 }
774
775 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
777 self.write_tracker = Some(tracker);
778 self
779 }
780}
781
782impl Operator for DeleteNodeOperator {
783 fn next(&mut self) -> OperatorResult {
784 let epoch = self
786 .viewing_epoch
787 .unwrap_or_else(|| self.store.current_epoch());
788 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
789
790 if let Some(chunk) = self.input.next()? {
791 let mut builder =
792 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
793
794 for row in chunk.selected_indices() {
795 let node_val = chunk
796 .column(self.node_column)
797 .and_then(|c| c.get_value(row))
798 .ok_or_else(|| {
799 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
800 })?;
801
802 let node_id = match node_val {
803 #[allow(clippy::cast_sign_loss)]
805 Value::Int64(id) => NodeId(id as u64),
806 _ => {
807 return Err(OperatorError::TypeMismatch {
808 expected: "Int64 (node ID)".to_string(),
809 found: format!("{node_val:?}"),
810 });
811 }
812 };
813
814 if self.detach {
815 let outgoing = self
818 .store
819 .edges_from(node_id, crate::graph::Direction::Outgoing);
820 let incoming = self
821 .store
822 .edges_from(node_id, crate::graph::Direction::Incoming);
823 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
824 self.store.delete_edge_versioned(edge_id, epoch, tx);
825 if let (Some(tracker), Some(tid)) =
826 (&self.write_tracker, self.transaction_id)
827 {
828 tracker.record_edge_write(tid, edge_id)?;
829 }
830 }
831 } else {
832 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
834 if degree > 0 {
835 return Err(OperatorError::ConstraintViolation(format!(
836 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
837 degree
838 )));
839 }
840 }
841
842 self.store.delete_node_versioned(node_id, epoch, tx);
844
845 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
847 tracker.record_node_write(tid, node_id)?;
848 }
849
850 for col_idx in 0..chunk.column_count() {
853 if let (Some(src), Some(dst)) =
854 (chunk.column(col_idx), builder.column_mut(col_idx))
855 {
856 if let Some(val) = src.get_value(row) {
857 dst.push_value(val);
858 } else {
859 dst.push_value(Value::Null);
860 }
861 }
862 }
863 builder.advance_row();
864 }
865
866 return Ok(Some(builder.finish()));
867 }
868 Ok(None)
869 }
870
871 fn reset(&mut self) {
872 self.input.reset();
873 }
874
875 fn name(&self) -> &'static str {
876 "DeleteNode"
877 }
878
879 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
880 self
881 }
882}
883
884pub struct DeleteEdgeOperator {
886 store: Arc<dyn GraphStoreMut>,
888 input: Box<dyn Operator>,
890 edge_column: usize,
892 output_schema: Vec<LogicalType>,
894 viewing_epoch: Option<EpochId>,
896 transaction_id: Option<TransactionId>,
898 write_tracker: Option<SharedWriteTracker>,
900}
901
902impl DeleteEdgeOperator {
903 pub fn new(
905 store: Arc<dyn GraphStoreMut>,
906 input: Box<dyn Operator>,
907 edge_column: usize,
908 output_schema: Vec<LogicalType>,
909 ) -> Self {
910 Self {
911 store,
912 input,
913 edge_column,
914 output_schema,
915 viewing_epoch: None,
916 transaction_id: None,
917 write_tracker: None,
918 }
919 }
920
921 pub fn with_transaction_context(
923 mut self,
924 epoch: EpochId,
925 transaction_id: Option<TransactionId>,
926 ) -> Self {
927 self.viewing_epoch = Some(epoch);
928 self.transaction_id = transaction_id;
929 self
930 }
931
932 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
934 self.write_tracker = Some(tracker);
935 self
936 }
937}
938
939impl Operator for DeleteEdgeOperator {
940 fn next(&mut self) -> OperatorResult {
941 let epoch = self
943 .viewing_epoch
944 .unwrap_or_else(|| self.store.current_epoch());
945 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
946
947 if let Some(chunk) = self.input.next()? {
948 let mut builder =
949 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
950
951 for row in chunk.selected_indices() {
952 let edge_val = chunk
953 .column(self.edge_column)
954 .and_then(|c| c.get_value(row))
955 .ok_or_else(|| {
956 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
957 })?;
958
959 let edge_id = match edge_val {
960 #[allow(clippy::cast_sign_loss)]
962 Value::Int64(id) => EdgeId(id as u64),
963 _ => {
964 return Err(OperatorError::TypeMismatch {
965 expected: "Int64 (edge ID)".to_string(),
966 found: format!("{edge_val:?}"),
967 });
968 }
969 };
970
971 self.store.delete_edge_versioned(edge_id, epoch, tx);
973
974 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
976 tracker.record_edge_write(tid, edge_id)?;
977 }
978
979 for col_idx in 0..chunk.column_count() {
981 if let (Some(src), Some(dst)) =
982 (chunk.column(col_idx), builder.column_mut(col_idx))
983 {
984 if let Some(val) = src.get_value(row) {
985 dst.push_value(val);
986 } else {
987 dst.push_value(Value::Null);
988 }
989 }
990 }
991 builder.advance_row();
992 }
993
994 return Ok(Some(builder.finish()));
995 }
996 Ok(None)
997 }
998
999 fn reset(&mut self) {
1000 self.input.reset();
1001 }
1002
1003 fn name(&self) -> &'static str {
1004 "DeleteEdge"
1005 }
1006
1007 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1008 self
1009 }
1010}
1011
1012pub struct AddLabelOperator {
1014 store: Arc<dyn GraphStoreMut>,
1016 input: Box<dyn Operator>,
1018 node_column: usize,
1020 labels: Vec<String>,
1022 output_schema: Vec<LogicalType>,
1024 count_column: usize,
1026 viewing_epoch: Option<EpochId>,
1028 transaction_id: Option<TransactionId>,
1030 write_tracker: Option<SharedWriteTracker>,
1032}
1033
1034impl AddLabelOperator {
1035 pub fn new(
1037 store: Arc<dyn GraphStoreMut>,
1038 input: Box<dyn Operator>,
1039 node_column: usize,
1040 labels: Vec<String>,
1041 output_schema: Vec<LogicalType>,
1042 ) -> Self {
1043 let count_column = output_schema.len() - 1;
1044 Self {
1045 store,
1046 input,
1047 node_column,
1048 labels,
1049 count_column,
1050 output_schema,
1051 viewing_epoch: None,
1052 transaction_id: None,
1053 write_tracker: None,
1054 }
1055 }
1056
1057 pub fn with_transaction_context(
1059 mut self,
1060 epoch: EpochId,
1061 transaction_id: Option<TransactionId>,
1062 ) -> Self {
1063 self.viewing_epoch = Some(epoch);
1064 self.transaction_id = transaction_id;
1065 self
1066 }
1067
1068 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1070 self.write_tracker = Some(tracker);
1071 self
1072 }
1073}
1074
1075impl Operator for AddLabelOperator {
1076 fn next(&mut self) -> OperatorResult {
1077 if let Some(chunk) = self.input.next()? {
1078 let mut builder =
1079 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1080
1081 for row in chunk.selected_indices() {
1082 let node_val = chunk
1083 .column(self.node_column)
1084 .and_then(|c| c.get_value(row))
1085 .ok_or_else(|| {
1086 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1087 })?;
1088
1089 let node_id = match node_val {
1090 #[allow(clippy::cast_sign_loss)]
1092 Value::Int64(id) => NodeId(id as u64),
1093 _ => {
1094 return Err(OperatorError::TypeMismatch {
1095 expected: "Int64 (node ID)".to_string(),
1096 found: format!("{node_val:?}"),
1097 });
1098 }
1099 };
1100
1101 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1103 tracker.record_node_write(tid, node_id)?;
1104 }
1105
1106 let mut row_count: i64 = 0;
1108 for label in &self.labels {
1109 let added = if let Some(tid) = self.transaction_id {
1110 self.store.add_label_versioned(node_id, label, tid)
1111 } else {
1112 self.store.add_label(node_id, label)
1113 };
1114 if added {
1115 row_count += 1;
1116 }
1117 }
1118
1119 for col_idx in 0..chunk.column_count() {
1121 if let (Some(src), Some(dst)) =
1122 (chunk.column(col_idx), builder.column_mut(col_idx))
1123 {
1124 if let Some(val) = src.get_value(row) {
1125 dst.push_value(val);
1126 } else {
1127 dst.push_value(Value::Null);
1128 }
1129 }
1130 }
1131 if let Some(dst) = builder.column_mut(self.count_column) {
1133 dst.push_value(Value::Int64(row_count));
1134 }
1135
1136 builder.advance_row();
1137 }
1138
1139 return Ok(Some(builder.finish()));
1140 }
1141 Ok(None)
1142 }
1143
1144 fn reset(&mut self) {
1145 self.input.reset();
1146 }
1147
1148 fn name(&self) -> &'static str {
1149 "AddLabel"
1150 }
1151
1152 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1153 self
1154 }
1155}
1156
1157pub struct RemoveLabelOperator {
1159 store: Arc<dyn GraphStoreMut>,
1161 input: Box<dyn Operator>,
1163 node_column: usize,
1165 labels: Vec<String>,
1167 output_schema: Vec<LogicalType>,
1169 count_column: usize,
1171 viewing_epoch: Option<EpochId>,
1173 transaction_id: Option<TransactionId>,
1175 write_tracker: Option<SharedWriteTracker>,
1177}
1178
1179impl RemoveLabelOperator {
1180 pub fn new(
1182 store: Arc<dyn GraphStoreMut>,
1183 input: Box<dyn Operator>,
1184 node_column: usize,
1185 labels: Vec<String>,
1186 output_schema: Vec<LogicalType>,
1187 ) -> Self {
1188 let count_column = output_schema.len() - 1;
1189 Self {
1190 store,
1191 input,
1192 node_column,
1193 labels,
1194 count_column,
1195 output_schema,
1196 viewing_epoch: None,
1197 transaction_id: None,
1198 write_tracker: None,
1199 }
1200 }
1201
1202 pub fn with_transaction_context(
1204 mut self,
1205 epoch: EpochId,
1206 transaction_id: Option<TransactionId>,
1207 ) -> Self {
1208 self.viewing_epoch = Some(epoch);
1209 self.transaction_id = transaction_id;
1210 self
1211 }
1212
1213 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1215 self.write_tracker = Some(tracker);
1216 self
1217 }
1218}
1219
1220impl Operator for RemoveLabelOperator {
1221 fn next(&mut self) -> OperatorResult {
1222 if let Some(chunk) = self.input.next()? {
1223 let mut builder =
1224 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1225
1226 for row in chunk.selected_indices() {
1227 let node_val = chunk
1228 .column(self.node_column)
1229 .and_then(|c| c.get_value(row))
1230 .ok_or_else(|| {
1231 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1232 })?;
1233
1234 let node_id = match node_val {
1235 #[allow(clippy::cast_sign_loss)]
1237 Value::Int64(id) => NodeId(id as u64),
1238 _ => {
1239 return Err(OperatorError::TypeMismatch {
1240 expected: "Int64 (node ID)".to_string(),
1241 found: format!("{node_val:?}"),
1242 });
1243 }
1244 };
1245
1246 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1248 tracker.record_node_write(tid, node_id)?;
1249 }
1250
1251 let mut row_count: i64 = 0;
1253 for label in &self.labels {
1254 let removed = if let Some(tid) = self.transaction_id {
1255 self.store.remove_label_versioned(node_id, label, tid)
1256 } else {
1257 self.store.remove_label(node_id, label)
1258 };
1259 if removed {
1260 row_count += 1;
1261 }
1262 }
1263
1264 for col_idx in 0..chunk.column_count() {
1266 if let (Some(src), Some(dst)) =
1267 (chunk.column(col_idx), builder.column_mut(col_idx))
1268 {
1269 if let Some(val) = src.get_value(row) {
1270 dst.push_value(val);
1271 } else {
1272 dst.push_value(Value::Null);
1273 }
1274 }
1275 }
1276 if let Some(dst) = builder.column_mut(self.count_column) {
1278 dst.push_value(Value::Int64(row_count));
1279 }
1280
1281 builder.advance_row();
1282 }
1283
1284 return Ok(Some(builder.finish()));
1285 }
1286 Ok(None)
1287 }
1288
1289 fn reset(&mut self) {
1290 self.input.reset();
1291 }
1292
1293 fn name(&self) -> &'static str {
1294 "RemoveLabel"
1295 }
1296
1297 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1298 self
1299 }
1300}
1301
1302pub struct SetPropertyOperator {
1307 store: Arc<dyn GraphStoreMut>,
1309 input: Box<dyn Operator>,
1311 entity_column: usize,
1313 is_edge: bool,
1315 properties: Vec<(String, PropertySource)>,
1317 output_schema: Vec<LogicalType>,
1319 replace: bool,
1321 validator: Option<Arc<dyn ConstraintValidator>>,
1323 labels: Vec<String>,
1325 edge_type_name: Option<String>,
1327 viewing_epoch: Option<EpochId>,
1329 transaction_id: Option<TransactionId>,
1331 write_tracker: Option<SharedWriteTracker>,
1333}
1334
1335impl SetPropertyOperator {
1336 pub fn new_for_node(
1338 store: Arc<dyn GraphStoreMut>,
1339 input: Box<dyn Operator>,
1340 node_column: usize,
1341 properties: Vec<(String, PropertySource)>,
1342 output_schema: Vec<LogicalType>,
1343 ) -> Self {
1344 Self {
1345 store,
1346 input,
1347 entity_column: node_column,
1348 is_edge: false,
1349 properties,
1350 output_schema,
1351 replace: false,
1352 validator: None,
1353 labels: Vec::new(),
1354 edge_type_name: None,
1355 viewing_epoch: None,
1356 transaction_id: None,
1357 write_tracker: None,
1358 }
1359 }
1360
1361 pub fn new_for_edge(
1363 store: Arc<dyn GraphStoreMut>,
1364 input: Box<dyn Operator>,
1365 edge_column: usize,
1366 properties: Vec<(String, PropertySource)>,
1367 output_schema: Vec<LogicalType>,
1368 ) -> Self {
1369 Self {
1370 store,
1371 input,
1372 entity_column: edge_column,
1373 is_edge: true,
1374 properties,
1375 output_schema,
1376 replace: false,
1377 validator: None,
1378 labels: Vec::new(),
1379 edge_type_name: None,
1380 viewing_epoch: None,
1381 transaction_id: None,
1382 write_tracker: None,
1383 }
1384 }
1385
1386 pub fn with_replace(mut self, replace: bool) -> Self {
1388 self.replace = replace;
1389 self
1390 }
1391
1392 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1394 self.validator = Some(validator);
1395 self
1396 }
1397
1398 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1400 self.labels = labels;
1401 self
1402 }
1403
1404 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1406 self.edge_type_name = Some(edge_type);
1407 self
1408 }
1409
1410 pub fn with_transaction_context(
1415 mut self,
1416 epoch: EpochId,
1417 transaction_id: Option<TransactionId>,
1418 ) -> Self {
1419 self.viewing_epoch = Some(epoch);
1420 self.transaction_id = transaction_id;
1421 self
1422 }
1423
1424 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1426 self.write_tracker = Some(tracker);
1427 self
1428 }
1429}
1430
1431impl Operator for SetPropertyOperator {
1432 fn next(&mut self) -> OperatorResult {
1433 if let Some(chunk) = self.input.next()? {
1434 let mut builder =
1435 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1436
1437 for row in chunk.selected_indices() {
1438 let entity_val = chunk
1439 .column(self.entity_column)
1440 .and_then(|c| c.get_value(row))
1441 .ok_or_else(|| {
1442 OperatorError::ColumnNotFound(format!(
1443 "entity column {}",
1444 self.entity_column
1445 ))
1446 })?;
1447
1448 let entity_id = match entity_val {
1449 #[allow(clippy::cast_sign_loss)]
1451 Value::Int64(id) => id as u64,
1452 _ => {
1453 return Err(OperatorError::TypeMismatch {
1454 expected: "Int64 (entity ID)".to_string(),
1455 found: format!("{entity_val:?}"),
1456 });
1457 }
1458 };
1459
1460 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1462 if self.is_edge {
1463 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1464 } else {
1465 tracker.record_node_write(tid, NodeId(entity_id))?;
1466 }
1467 }
1468
1469 let resolved_props: Vec<(String, Value)> = self
1471 .properties
1472 .iter()
1473 .map(|(name, source)| {
1474 let value =
1475 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1476 (name.clone(), value)
1477 })
1478 .collect();
1479
1480 if let Some(ref validator) = self.validator {
1482 if self.is_edge {
1483 if let Some(ref et) = self.edge_type_name {
1484 for (name, value) in &resolved_props {
1485 validator.validate_edge_property(et, name, value)?;
1486 }
1487 }
1488 } else {
1489 for (name, value) in &resolved_props {
1490 validator.validate_node_property(&self.labels, name, value)?;
1491 validator.check_unique_node_property(&self.labels, name, value)?;
1492 }
1493 }
1494 }
1495
1496 let tx_id = self.transaction_id;
1498 for (prop_name, value) in resolved_props {
1499 if prop_name == "*" {
1500 if let Value::Map(map) = value {
1502 if self.replace {
1503 if self.is_edge {
1505 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1506 let keys: Vec<String> = edge
1507 .properties
1508 .iter()
1509 .map(|(k, _)| k.as_str().to_string())
1510 .collect();
1511 for key in keys {
1512 if let Some(tid) = tx_id {
1513 self.store.remove_edge_property_versioned(
1514 EdgeId(entity_id),
1515 &key,
1516 tid,
1517 );
1518 } else {
1519 self.store
1520 .remove_edge_property(EdgeId(entity_id), &key);
1521 }
1522 }
1523 }
1524 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1525 let keys: Vec<String> = node
1526 .properties
1527 .iter()
1528 .map(|(k, _)| k.as_str().to_string())
1529 .collect();
1530 for key in keys {
1531 if let Some(tid) = tx_id {
1532 self.store.remove_node_property_versioned(
1533 NodeId(entity_id),
1534 &key,
1535 tid,
1536 );
1537 } else {
1538 self.store
1539 .remove_node_property(NodeId(entity_id), &key);
1540 }
1541 }
1542 }
1543 }
1544 for (key, val) in map.iter() {
1546 if val.is_null() {
1547 if self.is_edge {
1549 if let Some(tid) = tx_id {
1550 self.store.remove_edge_property_versioned(
1551 EdgeId(entity_id),
1552 key.as_str(),
1553 tid,
1554 );
1555 } else {
1556 self.store.remove_edge_property(
1557 EdgeId(entity_id),
1558 key.as_str(),
1559 );
1560 }
1561 } else if let Some(tid) = tx_id {
1562 self.store.remove_node_property_versioned(
1563 NodeId(entity_id),
1564 key.as_str(),
1565 tid,
1566 );
1567 } else {
1568 self.store
1569 .remove_node_property(NodeId(entity_id), key.as_str());
1570 }
1571 } else if self.is_edge {
1572 if let Some(tid) = tx_id {
1573 self.store.set_edge_property_versioned(
1574 EdgeId(entity_id),
1575 key.as_str(),
1576 val.clone(),
1577 tid,
1578 );
1579 } else {
1580 self.store.set_edge_property(
1581 EdgeId(entity_id),
1582 key.as_str(),
1583 val.clone(),
1584 );
1585 }
1586 } else if let Some(tid) = tx_id {
1587 self.store.set_node_property_versioned(
1588 NodeId(entity_id),
1589 key.as_str(),
1590 val.clone(),
1591 tid,
1592 );
1593 } else {
1594 self.store.set_node_property(
1595 NodeId(entity_id),
1596 key.as_str(),
1597 val.clone(),
1598 );
1599 }
1600 }
1601 }
1602 } else if self.is_edge {
1603 if let Some(tid) = tx_id {
1604 self.store.set_edge_property_versioned(
1605 EdgeId(entity_id),
1606 &prop_name,
1607 value,
1608 tid,
1609 );
1610 } else {
1611 self.store
1612 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1613 }
1614 } else if let Some(tid) = tx_id {
1615 self.store.set_node_property_versioned(
1616 NodeId(entity_id),
1617 &prop_name,
1618 value,
1619 tid,
1620 );
1621 } else {
1622 self.store
1623 .set_node_property(NodeId(entity_id), &prop_name, value);
1624 }
1625 }
1626
1627 for col_idx in 0..chunk.column_count() {
1629 if let (Some(src), Some(dst)) =
1630 (chunk.column(col_idx), builder.column_mut(col_idx))
1631 {
1632 if let Some(val) = src.get_value(row) {
1633 dst.push_value(val);
1634 } else {
1635 dst.push_value(Value::Null);
1636 }
1637 }
1638 }
1639
1640 builder.advance_row();
1641 }
1642
1643 return Ok(Some(builder.finish()));
1644 }
1645 Ok(None)
1646 }
1647
1648 fn reset(&mut self) {
1649 self.input.reset();
1650 }
1651
1652 fn name(&self) -> &'static str {
1653 "SetProperty"
1654 }
1655
1656 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1657 self
1658 }
1659}
1660
1661#[cfg(all(test, feature = "lpg"))]
1662mod tests {
1663 use super::*;
1664 use crate::execution::DataChunk;
1665 use crate::execution::chunk::DataChunkBuilder;
1666 use crate::graph::lpg::LpgStore;
1667
1668 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1671 Arc::new(LpgStore::new().unwrap())
1672 }
1673
1674 struct MockInput {
1675 chunk: Option<DataChunk>,
1676 }
1677
1678 impl MockInput {
1679 fn boxed(chunk: DataChunk) -> Box<Self> {
1680 Box::new(Self { chunk: Some(chunk) })
1681 }
1682 }
1683
1684 impl Operator for MockInput {
1685 fn next(&mut self) -> OperatorResult {
1686 Ok(self.chunk.take())
1687 }
1688 fn reset(&mut self) {}
1689 fn name(&self) -> &'static str {
1690 "MockInput"
1691 }
1692
1693 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1694 self
1695 }
1696 }
1697
1698 struct EmptyInput;
1699 impl Operator for EmptyInput {
1700 fn next(&mut self) -> OperatorResult {
1701 Ok(None)
1702 }
1703 fn reset(&mut self) {}
1704 fn name(&self) -> &'static str {
1705 "EmptyInput"
1706 }
1707
1708 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1709 self
1710 }
1711 }
1712
1713 #[allow(clippy::cast_possible_wrap)]
1715 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1716 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1717 for id in ids {
1718 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1719 builder.advance_row();
1720 }
1721 builder.finish()
1722 }
1723
1724 #[allow(clippy::cast_possible_wrap)]
1726 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1727 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1728 for id in ids {
1729 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1730 builder.advance_row();
1731 }
1732 builder.finish()
1733 }
1734
1735 #[test]
1738 fn test_create_node_standalone() {
1739 let store = create_test_store();
1740
1741 let mut op = CreateNodeOperator::new(
1742 Arc::clone(&store),
1743 None,
1744 vec!["Person".to_string()],
1745 vec![(
1746 "name".to_string(),
1747 PropertySource::Constant(Value::String("Alix".into())),
1748 )],
1749 vec![LogicalType::Int64],
1750 0,
1751 );
1752
1753 let chunk = op.next().unwrap().unwrap();
1754 assert_eq!(chunk.row_count(), 1);
1755
1756 assert!(op.next().unwrap().is_none());
1758
1759 assert_eq!(store.node_count(), 1);
1760 }
1761
1762 #[test]
1763 #[allow(clippy::cast_possible_wrap)]
1765 fn test_create_edge() {
1766 let store = create_test_store();
1767
1768 let node1 = store.create_node(&["Person"]);
1769 let node2 = store.create_node(&["Person"]);
1770
1771 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1772 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1773 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1774 builder.advance_row();
1775
1776 let mut op = CreateEdgeOperator::new(
1777 Arc::clone(&store),
1778 MockInput::boxed(builder.finish()),
1779 0,
1780 1,
1781 "KNOWS".to_string(),
1782 vec![LogicalType::Int64, LogicalType::Int64],
1783 );
1784
1785 let _chunk = op.next().unwrap().unwrap();
1786 assert_eq!(store.edge_count(), 1);
1787 }
1788
1789 #[test]
1790 fn test_delete_node() {
1791 let store = create_test_store();
1792
1793 let node_id = store.create_node(&["Person"]);
1794 assert_eq!(store.node_count(), 1);
1795
1796 let mut op = DeleteNodeOperator::new(
1797 Arc::clone(&store),
1798 MockInput::boxed(node_id_chunk(&[node_id])),
1799 0,
1800 vec![LogicalType::Node],
1801 false,
1802 );
1803
1804 let chunk = op.next().unwrap().unwrap();
1805 assert_eq!(chunk.row_count(), 1);
1807 assert_eq!(store.node_count(), 0);
1808 }
1809
1810 #[test]
1813 fn test_delete_edge() {
1814 let store = create_test_store();
1815
1816 let n1 = store.create_node(&["Person"]);
1817 let n2 = store.create_node(&["Person"]);
1818 let eid = store.create_edge(n1, n2, "KNOWS");
1819 assert_eq!(store.edge_count(), 1);
1820
1821 let mut op = DeleteEdgeOperator::new(
1822 Arc::clone(&store),
1823 MockInput::boxed(edge_id_chunk(&[eid])),
1824 0,
1825 vec![LogicalType::Node],
1826 );
1827
1828 let chunk = op.next().unwrap().unwrap();
1829 assert_eq!(chunk.row_count(), 1);
1830 assert_eq!(store.edge_count(), 0);
1831 }
1832
1833 #[test]
1834 fn test_delete_edge_no_input_returns_none() {
1835 let store = create_test_store();
1836
1837 let mut op = DeleteEdgeOperator::new(
1838 Arc::clone(&store),
1839 Box::new(EmptyInput),
1840 0,
1841 vec![LogicalType::Int64],
1842 );
1843
1844 assert!(op.next().unwrap().is_none());
1845 }
1846
1847 #[test]
1848 fn test_delete_multiple_edges() {
1849 let store = create_test_store();
1850
1851 let n1 = store.create_node(&["N"]);
1852 let n2 = store.create_node(&["N"]);
1853 let e1 = store.create_edge(n1, n2, "R");
1854 let e2 = store.create_edge(n2, n1, "S");
1855 assert_eq!(store.edge_count(), 2);
1856
1857 let mut op = DeleteEdgeOperator::new(
1858 Arc::clone(&store),
1859 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1860 0,
1861 vec![LogicalType::Node],
1862 );
1863
1864 let chunk = op.next().unwrap().unwrap();
1865 assert_eq!(chunk.row_count(), 2);
1866 assert_eq!(store.edge_count(), 0);
1867 }
1868
1869 #[test]
1872 fn test_delete_node_detach() {
1873 let store = create_test_store();
1874
1875 let n1 = store.create_node(&["Person"]);
1876 let n2 = store.create_node(&["Person"]);
1877 store.create_edge(n1, n2, "KNOWS");
1878 store.create_edge(n2, n1, "FOLLOWS");
1879 assert_eq!(store.edge_count(), 2);
1880
1881 let mut op = DeleteNodeOperator::new(
1882 Arc::clone(&store),
1883 MockInput::boxed(node_id_chunk(&[n1])),
1884 0,
1885 vec![LogicalType::Node],
1886 true, );
1888
1889 let chunk = op.next().unwrap().unwrap();
1890 assert_eq!(chunk.row_count(), 1);
1891 assert_eq!(store.node_count(), 1);
1892 assert_eq!(store.edge_count(), 0); }
1894
1895 #[test]
1898 fn test_add_label() {
1899 let store = create_test_store();
1900
1901 let node = store.create_node(&["Person"]);
1902
1903 let mut op = AddLabelOperator::new(
1904 Arc::clone(&store),
1905 MockInput::boxed(node_id_chunk(&[node])),
1906 0,
1907 vec!["Employee".to_string()],
1908 vec![LogicalType::Int64, LogicalType::Int64],
1909 );
1910
1911 let chunk = op.next().unwrap().unwrap();
1912 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1913 assert_eq!(updated, 1);
1914
1915 let node_data = store.get_node(node).unwrap();
1917 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1918 assert!(labels.contains(&"Person"));
1919 assert!(labels.contains(&"Employee"));
1920 }
1921
1922 #[test]
1923 fn test_add_multiple_labels() {
1924 let store = create_test_store();
1925
1926 let node = store.create_node(&["Base"]);
1927
1928 let mut op = AddLabelOperator::new(
1929 Arc::clone(&store),
1930 MockInput::boxed(node_id_chunk(&[node])),
1931 0,
1932 vec!["LabelA".to_string(), "LabelB".to_string()],
1933 vec![LogicalType::Int64, LogicalType::Int64],
1934 );
1935
1936 let chunk = op.next().unwrap().unwrap();
1937 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1938 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1941 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1942 assert!(labels.contains(&"LabelA"));
1943 assert!(labels.contains(&"LabelB"));
1944 }
1945
1946 #[test]
1947 fn test_add_label_no_input_returns_none() {
1948 let store = create_test_store();
1949
1950 let mut op = AddLabelOperator::new(
1951 Arc::clone(&store),
1952 Box::new(EmptyInput),
1953 0,
1954 vec!["Foo".to_string()],
1955 vec![LogicalType::Int64, LogicalType::Int64],
1956 );
1957
1958 assert!(op.next().unwrap().is_none());
1959 }
1960
1961 #[test]
1964 fn test_remove_label() {
1965 let store = create_test_store();
1966
1967 let node = store.create_node(&["Person", "Employee"]);
1968
1969 let mut op = RemoveLabelOperator::new(
1970 Arc::clone(&store),
1971 MockInput::boxed(node_id_chunk(&[node])),
1972 0,
1973 vec!["Employee".to_string()],
1974 vec![LogicalType::Int64, LogicalType::Int64],
1975 );
1976
1977 let chunk = op.next().unwrap().unwrap();
1978 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1979 assert_eq!(updated, 1);
1980
1981 let node_data = store.get_node(node).unwrap();
1983 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1984 assert!(labels.contains(&"Person"));
1985 assert!(!labels.contains(&"Employee"));
1986 }
1987
1988 #[test]
1989 fn test_remove_nonexistent_label() {
1990 let store = create_test_store();
1991
1992 let node = store.create_node(&["Person"]);
1993
1994 let mut op = RemoveLabelOperator::new(
1995 Arc::clone(&store),
1996 MockInput::boxed(node_id_chunk(&[node])),
1997 0,
1998 vec!["NonExistent".to_string()],
1999 vec![LogicalType::Int64, LogicalType::Int64],
2000 );
2001
2002 let chunk = op.next().unwrap().unwrap();
2003 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
2004 assert_eq!(updated, 0); }
2006
2007 #[test]
2010 fn test_set_node_property_constant() {
2011 let store = create_test_store();
2012
2013 let node = store.create_node(&["Person"]);
2014
2015 let mut op = SetPropertyOperator::new_for_node(
2016 Arc::clone(&store),
2017 MockInput::boxed(node_id_chunk(&[node])),
2018 0,
2019 vec![(
2020 "name".to_string(),
2021 PropertySource::Constant(Value::String("Alix".into())),
2022 )],
2023 vec![LogicalType::Int64],
2024 );
2025
2026 let chunk = op.next().unwrap().unwrap();
2027 assert_eq!(chunk.row_count(), 1);
2028
2029 let node_data = store.get_node(node).unwrap();
2031 assert_eq!(
2032 node_data
2033 .properties
2034 .get(&grafeo_common::types::PropertyKey::new("name")),
2035 Some(&Value::String("Alix".into()))
2036 );
2037 }
2038
2039 #[test]
2040 #[allow(clippy::cast_possible_wrap)]
2042 fn test_set_node_property_from_column() {
2043 let store = create_test_store();
2044
2045 let node = store.create_node(&["Person"]);
2046
2047 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
2049 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
2050 builder
2051 .column_mut(1)
2052 .unwrap()
2053 .push_value(Value::String("Gus".into()));
2054 builder.advance_row();
2055
2056 let mut op = SetPropertyOperator::new_for_node(
2057 Arc::clone(&store),
2058 MockInput::boxed(builder.finish()),
2059 0,
2060 vec![("name".to_string(), PropertySource::Column(1))],
2061 vec![LogicalType::Int64, LogicalType::String],
2062 );
2063
2064 let chunk = op.next().unwrap().unwrap();
2065 assert_eq!(chunk.row_count(), 1);
2066
2067 let node_data = store.get_node(node).unwrap();
2068 assert_eq!(
2069 node_data
2070 .properties
2071 .get(&grafeo_common::types::PropertyKey::new("name")),
2072 Some(&Value::String("Gus".into()))
2073 );
2074 }
2075
2076 #[test]
2077 fn test_set_edge_property() {
2078 let store = create_test_store();
2079
2080 let n1 = store.create_node(&["N"]);
2081 let n2 = store.create_node(&["N"]);
2082 let eid = store.create_edge(n1, n2, "KNOWS");
2083
2084 let mut op = SetPropertyOperator::new_for_edge(
2085 Arc::clone(&store),
2086 MockInput::boxed(edge_id_chunk(&[eid])),
2087 0,
2088 vec![(
2089 "weight".to_string(),
2090 PropertySource::Constant(Value::Float64(0.75)),
2091 )],
2092 vec![LogicalType::Int64],
2093 );
2094
2095 let chunk = op.next().unwrap().unwrap();
2096 assert_eq!(chunk.row_count(), 1);
2097
2098 let edge_data = store.get_edge(eid).unwrap();
2099 assert_eq!(
2100 edge_data
2101 .properties
2102 .get(&grafeo_common::types::PropertyKey::new("weight")),
2103 Some(&Value::Float64(0.75))
2104 );
2105 }
2106
2107 #[test]
2108 fn test_set_multiple_properties() {
2109 let store = create_test_store();
2110
2111 let node = store.create_node(&["Person"]);
2112
2113 let mut op = SetPropertyOperator::new_for_node(
2114 Arc::clone(&store),
2115 MockInput::boxed(node_id_chunk(&[node])),
2116 0,
2117 vec![
2118 (
2119 "name".to_string(),
2120 PropertySource::Constant(Value::String("Alix".into())),
2121 ),
2122 (
2123 "age".to_string(),
2124 PropertySource::Constant(Value::Int64(30)),
2125 ),
2126 ],
2127 vec![LogicalType::Int64],
2128 );
2129
2130 op.next().unwrap().unwrap();
2131
2132 let node_data = store.get_node(node).unwrap();
2133 assert_eq!(
2134 node_data
2135 .properties
2136 .get(&grafeo_common::types::PropertyKey::new("name")),
2137 Some(&Value::String("Alix".into()))
2138 );
2139 assert_eq!(
2140 node_data
2141 .properties
2142 .get(&grafeo_common::types::PropertyKey::new("age")),
2143 Some(&Value::Int64(30))
2144 );
2145 }
2146
2147 #[test]
2148 fn test_set_property_no_input_returns_none() {
2149 let store = create_test_store();
2150
2151 let mut op = SetPropertyOperator::new_for_node(
2152 Arc::clone(&store),
2153 Box::new(EmptyInput),
2154 0,
2155 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2156 vec![LogicalType::Int64],
2157 );
2158
2159 assert!(op.next().unwrap().is_none());
2160 }
2161
2162 #[test]
2165 fn test_delete_node_without_detach_errors_when_edges_exist() {
2166 let store = create_test_store();
2167
2168 let n1 = store.create_node(&["Person"]);
2169 let n2 = store.create_node(&["Person"]);
2170 store.create_edge(n1, n2, "KNOWS");
2171
2172 let mut op = DeleteNodeOperator::new(
2173 Arc::clone(&store),
2174 MockInput::boxed(node_id_chunk(&[n1])),
2175 0,
2176 vec![LogicalType::Int64],
2177 false, );
2179
2180 let err = op.next().unwrap_err();
2181 match err {
2182 OperatorError::ConstraintViolation(msg) => {
2183 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2184 }
2185 other => panic!("expected ConstraintViolation, got {other:?}"),
2186 }
2187 assert_eq!(store.node_count(), 2);
2189 }
2190
2191 #[test]
2194 fn test_create_node_with_input_operator() {
2195 let store = create_test_store();
2196
2197 let existing = store.create_node(&["Seed"]);
2199
2200 let mut op = CreateNodeOperator::new(
2201 Arc::clone(&store),
2202 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2203 vec!["Created".to_string()],
2204 vec![(
2205 "source".to_string(),
2206 PropertySource::Constant(Value::String("from_input".into())),
2207 )],
2208 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2211
2212 let chunk = op.next().unwrap().unwrap();
2213 assert_eq!(chunk.row_count(), 1);
2214
2215 assert_eq!(store.node_count(), 2);
2217
2218 assert!(op.next().unwrap().is_none());
2220 }
2221
2222 #[test]
2225 #[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)]
2227 fn test_create_edge_with_properties_and_output_column() {
2228 let store = create_test_store();
2229
2230 let n1 = store.create_node(&["Person"]);
2231 let n2 = store.create_node(&["Person"]);
2232
2233 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2234 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2235 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2236 builder.advance_row();
2237
2238 let mut op = CreateEdgeOperator::new(
2239 Arc::clone(&store),
2240 MockInput::boxed(builder.finish()),
2241 0,
2242 1,
2243 "KNOWS".to_string(),
2244 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2245 )
2246 .with_properties(vec![(
2247 "since".to_string(),
2248 PropertySource::Constant(Value::Int64(2024)),
2249 )])
2250 .with_output_column(2);
2251
2252 let chunk = op.next().unwrap().unwrap();
2253 assert_eq!(chunk.row_count(), 1);
2254 assert_eq!(store.edge_count(), 1);
2255
2256 let edge_id_raw = chunk
2258 .column(2)
2259 .and_then(|c| c.get_int64(0))
2260 .expect("edge ID should be in output column 2");
2261 let edge_id = EdgeId(edge_id_raw as u64);
2262
2263 let edge = store.get_edge(edge_id).expect("edge should exist");
2265 assert_eq!(
2266 edge.properties
2267 .get(&grafeo_common::types::PropertyKey::new("since")),
2268 Some(&Value::Int64(2024))
2269 );
2270 }
2271
2272 #[test]
2275 fn test_set_property_map_replace() {
2276 use std::collections::BTreeMap;
2277
2278 let store = create_test_store();
2279
2280 let node = store.create_node(&["Person"]);
2281 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2282
2283 let mut map = BTreeMap::new();
2284 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2285
2286 let mut op = SetPropertyOperator::new_for_node(
2287 Arc::clone(&store),
2288 MockInput::boxed(node_id_chunk(&[node])),
2289 0,
2290 vec![(
2291 "*".to_string(),
2292 PropertySource::Constant(Value::Map(Arc::new(map))),
2293 )],
2294 vec![LogicalType::Int64],
2295 )
2296 .with_replace(true);
2297
2298 op.next().unwrap().unwrap();
2299
2300 let node_data = store.get_node(node).unwrap();
2301 assert!(
2303 node_data
2304 .properties
2305 .get(&PropertyKey::new("old_prop"))
2306 .is_none()
2307 );
2308 assert_eq!(
2310 node_data.properties.get(&PropertyKey::new("new_key")),
2311 Some(&Value::String("new_val".into()))
2312 );
2313 }
2314
2315 #[test]
2318 fn test_set_property_map_merge() {
2319 use std::collections::BTreeMap;
2320
2321 let store = create_test_store();
2322
2323 let node = store.create_node(&["Person"]);
2324 store.set_node_property(node, "existing", Value::Int64(42));
2325
2326 let mut map = BTreeMap::new();
2327 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2328
2329 let mut op = SetPropertyOperator::new_for_node(
2330 Arc::clone(&store),
2331 MockInput::boxed(node_id_chunk(&[node])),
2332 0,
2333 vec![(
2334 "*".to_string(),
2335 PropertySource::Constant(Value::Map(Arc::new(map))),
2336 )],
2337 vec![LogicalType::Int64],
2338 ); op.next().unwrap().unwrap();
2341
2342 let node_data = store.get_node(node).unwrap();
2343 assert_eq!(
2345 node_data.properties.get(&PropertyKey::new("existing")),
2346 Some(&Value::Int64(42))
2347 );
2348 assert_eq!(
2350 node_data.properties.get(&PropertyKey::new("added")),
2351 Some(&Value::String("hello".into()))
2352 );
2353 }
2354
2355 #[test]
2358 #[allow(clippy::cast_possible_wrap)]
2360 fn test_property_source_property_access() {
2361 let store = create_test_store();
2362
2363 let source_node = store.create_node(&["Source"]);
2364 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2365
2366 let target_node = store.create_node(&["Target"]);
2367
2368 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2370 builder.column_mut(0).unwrap().push_node_id(source_node);
2371 builder
2372 .column_mut(1)
2373 .unwrap()
2374 .push_int64(target_node.0 as i64);
2375 builder.advance_row();
2376
2377 let mut op = SetPropertyOperator::new_for_node(
2378 Arc::clone(&store),
2379 MockInput::boxed(builder.finish()),
2380 1, vec![(
2382 "copied_name".to_string(),
2383 PropertySource::PropertyAccess {
2384 column: 0,
2385 property: "name".to_string(),
2386 },
2387 )],
2388 vec![LogicalType::Node, LogicalType::Int64],
2389 );
2390
2391 op.next().unwrap().unwrap();
2392
2393 let target_data = store.get_node(target_node).unwrap();
2394 assert_eq!(
2395 target_data.properties.get(&PropertyKey::new("copied_name")),
2396 Some(&Value::String("Alix".into()))
2397 );
2398 }
2399
2400 #[test]
2403 fn test_create_node_with_constraint_validator() {
2404 let store = create_test_store();
2405
2406 struct RejectAgeValidator;
2407 impl ConstraintValidator for RejectAgeValidator {
2408 fn validate_node_property(
2409 &self,
2410 _labels: &[String],
2411 key: &str,
2412 _value: &Value,
2413 ) -> Result<(), OperatorError> {
2414 if key == "forbidden" {
2415 return Err(OperatorError::ConstraintViolation(
2416 "property 'forbidden' is not allowed".to_string(),
2417 ));
2418 }
2419 Ok(())
2420 }
2421 fn validate_node_complete(
2422 &self,
2423 _labels: &[String],
2424 _properties: &[(String, Value)],
2425 ) -> Result<(), OperatorError> {
2426 Ok(())
2427 }
2428 fn check_unique_node_property(
2429 &self,
2430 _labels: &[String],
2431 _key: &str,
2432 _value: &Value,
2433 ) -> Result<(), OperatorError> {
2434 Ok(())
2435 }
2436 fn validate_edge_property(
2437 &self,
2438 _edge_type: &str,
2439 _key: &str,
2440 _value: &Value,
2441 ) -> Result<(), OperatorError> {
2442 Ok(())
2443 }
2444 fn validate_edge_complete(
2445 &self,
2446 _edge_type: &str,
2447 _properties: &[(String, Value)],
2448 ) -> Result<(), OperatorError> {
2449 Ok(())
2450 }
2451 }
2452
2453 let mut op = CreateNodeOperator::new(
2455 Arc::clone(&store),
2456 None,
2457 vec!["Thing".to_string()],
2458 vec![(
2459 "name".to_string(),
2460 PropertySource::Constant(Value::String("ok".into())),
2461 )],
2462 vec![LogicalType::Int64],
2463 0,
2464 )
2465 .with_validator(Arc::new(RejectAgeValidator));
2466
2467 assert!(op.next().is_ok());
2468 assert_eq!(store.node_count(), 1);
2469
2470 let mut op = CreateNodeOperator::new(
2472 Arc::clone(&store),
2473 None,
2474 vec!["Thing".to_string()],
2475 vec![(
2476 "forbidden".to_string(),
2477 PropertySource::Constant(Value::Int64(1)),
2478 )],
2479 vec![LogicalType::Int64],
2480 0,
2481 )
2482 .with_validator(Arc::new(RejectAgeValidator));
2483
2484 let err = op.next().unwrap_err();
2485 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2486 }
2489
2490 #[test]
2493 fn test_create_node_reset_allows_re_execution() {
2494 let store = create_test_store();
2495
2496 let mut op = CreateNodeOperator::new(
2497 Arc::clone(&store),
2498 None,
2499 vec!["Person".to_string()],
2500 vec![],
2501 vec![LogicalType::Int64],
2502 0,
2503 );
2504
2505 assert!(op.next().unwrap().is_some());
2507 assert!(op.next().unwrap().is_none());
2508
2509 op.reset();
2511 assert!(op.next().unwrap().is_some());
2512
2513 assert_eq!(store.node_count(), 2);
2514 }
2515
2516 #[test]
2519 fn test_operator_names() {
2520 let store = create_test_store();
2521
2522 let op = CreateNodeOperator::new(
2523 Arc::clone(&store),
2524 None,
2525 vec![],
2526 vec![],
2527 vec![LogicalType::Int64],
2528 0,
2529 );
2530 assert_eq!(op.name(), "CreateNode");
2531
2532 let op = CreateEdgeOperator::new(
2533 Arc::clone(&store),
2534 Box::new(EmptyInput),
2535 0,
2536 1,
2537 "R".to_string(),
2538 vec![LogicalType::Int64],
2539 );
2540 assert_eq!(op.name(), "CreateEdge");
2541
2542 let op = DeleteNodeOperator::new(
2543 Arc::clone(&store),
2544 Box::new(EmptyInput),
2545 0,
2546 vec![LogicalType::Int64],
2547 false,
2548 );
2549 assert_eq!(op.name(), "DeleteNode");
2550
2551 let op = DeleteEdgeOperator::new(
2552 Arc::clone(&store),
2553 Box::new(EmptyInput),
2554 0,
2555 vec![LogicalType::Int64],
2556 );
2557 assert_eq!(op.name(), "DeleteEdge");
2558
2559 let op = AddLabelOperator::new(
2560 Arc::clone(&store),
2561 Box::new(EmptyInput),
2562 0,
2563 vec!["L".to_string()],
2564 vec![LogicalType::Int64],
2565 );
2566 assert_eq!(op.name(), "AddLabel");
2567
2568 let op = RemoveLabelOperator::new(
2569 Arc::clone(&store),
2570 Box::new(EmptyInput),
2571 0,
2572 vec!["L".to_string()],
2573 vec![LogicalType::Int64],
2574 );
2575 assert_eq!(op.name(), "RemoveLabel");
2576
2577 let op = SetPropertyOperator::new_for_node(
2578 Arc::clone(&store),
2579 Box::new(EmptyInput),
2580 0,
2581 vec![],
2582 vec![LogicalType::Int64],
2583 );
2584 assert_eq!(op.name(), "SetProperty");
2585 }
2586
2587 #[test]
2590 fn test_create_node_into_any() {
2591 let store = create_test_store();
2592 let op = CreateNodeOperator::new(
2593 Arc::clone(&store),
2594 None,
2595 vec!["Person".to_string()],
2596 vec![],
2597 vec![LogicalType::Int64],
2598 0,
2599 );
2600 let any = Box::new(op).into_any();
2601 assert!(any.downcast::<CreateNodeOperator>().is_ok());
2602 }
2603
2604 #[test]
2605 fn test_create_edge_into_any() {
2606 let store = create_test_store();
2607 let op = CreateEdgeOperator::new(
2608 Arc::clone(&store),
2609 Box::new(EmptyInput),
2610 0,
2611 1,
2612 "KNOWS".to_string(),
2613 vec![LogicalType::Int64],
2614 );
2615 let any = Box::new(op).into_any();
2616 assert!(any.downcast::<CreateEdgeOperator>().is_ok());
2617 }
2618
2619 #[test]
2620 fn test_delete_node_into_any() {
2621 let store = create_test_store();
2622 let op = DeleteNodeOperator::new(
2623 Arc::clone(&store),
2624 Box::new(EmptyInput),
2625 0,
2626 vec![LogicalType::Int64],
2627 false,
2628 );
2629 let any = Box::new(op).into_any();
2630 assert!(any.downcast::<DeleteNodeOperator>().is_ok());
2631 }
2632
2633 #[test]
2634 fn test_delete_edge_into_any() {
2635 let store = create_test_store();
2636 let op = DeleteEdgeOperator::new(
2637 Arc::clone(&store),
2638 Box::new(EmptyInput),
2639 0,
2640 vec![LogicalType::Int64],
2641 );
2642 let any = Box::new(op).into_any();
2643 assert!(any.downcast::<DeleteEdgeOperator>().is_ok());
2644 }
2645
2646 #[test]
2647 fn test_add_label_into_any() {
2648 let store = create_test_store();
2649 let op = AddLabelOperator::new(
2650 Arc::clone(&store),
2651 Box::new(EmptyInput),
2652 0,
2653 vec!["Label".to_string()],
2654 vec![LogicalType::Int64],
2655 );
2656 let any = Box::new(op).into_any();
2657 assert!(any.downcast::<AddLabelOperator>().is_ok());
2658 }
2659
2660 #[test]
2661 fn test_remove_label_into_any() {
2662 let store = create_test_store();
2663 let op = RemoveLabelOperator::new(
2664 Arc::clone(&store),
2665 Box::new(EmptyInput),
2666 0,
2667 vec!["Label".to_string()],
2668 vec![LogicalType::Int64],
2669 );
2670 let any = Box::new(op).into_any();
2671 assert!(any.downcast::<RemoveLabelOperator>().is_ok());
2672 }
2673
2674 #[test]
2675 fn test_set_property_into_any() {
2676 let store = create_test_store();
2677 let op = SetPropertyOperator::new_for_node(
2678 Arc::clone(&store),
2679 Box::new(EmptyInput),
2680 0,
2681 vec![],
2682 vec![LogicalType::Int64],
2683 );
2684 let any = Box::new(op).into_any();
2685 assert!(any.downcast::<SetPropertyOperator>().is_ok());
2686 }
2687
2688 struct MinimalValidator;
2693
2694 impl ConstraintValidator for MinimalValidator {
2695 fn validate_node_property(
2696 &self,
2697 _labels: &[String],
2698 _key: &str,
2699 _value: &Value,
2700 ) -> Result<(), OperatorError> {
2701 Ok(())
2702 }
2703 fn validate_node_complete(
2704 &self,
2705 _labels: &[String],
2706 _properties: &[(String, Value)],
2707 ) -> Result<(), OperatorError> {
2708 Ok(())
2709 }
2710 fn check_unique_node_property(
2711 &self,
2712 _labels: &[String],
2713 _key: &str,
2714 _value: &Value,
2715 ) -> Result<(), OperatorError> {
2716 Ok(())
2717 }
2718 fn validate_edge_property(
2719 &self,
2720 _edge_type: &str,
2721 _key: &str,
2722 _value: &Value,
2723 ) -> Result<(), OperatorError> {
2724 Ok(())
2725 }
2726 fn validate_edge_complete(
2727 &self,
2728 _edge_type: &str,
2729 _properties: &[(String, Value)],
2730 ) -> Result<(), OperatorError> {
2731 Ok(())
2732 }
2733 }
2734
2735 #[test]
2736 fn test_constraint_validator_default_node_labels_allowed() {
2737 let v = MinimalValidator;
2738 assert!(
2739 v.validate_node_labels_allowed(&["Person".to_string(), "Actor".to_string()])
2740 .is_ok()
2741 );
2742 }
2743
2744 #[test]
2745 fn test_constraint_validator_default_edge_type_allowed() {
2746 let v = MinimalValidator;
2747 assert!(v.validate_edge_type_allowed("KNOWS").is_ok());
2748 }
2749
2750 #[test]
2751 fn test_constraint_validator_default_edge_endpoints() {
2752 let v = MinimalValidator;
2753 assert!(
2754 v.validate_edge_endpoints("KNOWS", &["Person".to_string()], &["Person".to_string()],)
2755 .is_ok()
2756 );
2757 }
2758
2759 #[test]
2760 fn test_constraint_validator_default_inject_defaults() {
2761 let v = MinimalValidator;
2762 let mut props = vec![("name".to_string(), Value::String("Alix".into()))];
2763 v.inject_defaults(&["Person".to_string()], &mut props);
2764 assert_eq!(props.len(), 1);
2766 }
2767
2768 #[test]
2771 fn test_property_source_column() {
2772 let store = LpgStore::new().unwrap();
2773 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
2774 builder.column_mut(0).unwrap().push_int64(42);
2775 builder.advance_row();
2776 let chunk = builder.finish();
2777
2778 let src = PropertySource::Column(0);
2779 assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(42));
2780 }
2781
2782 #[test]
2783 fn test_property_source_constant() {
2784 let store = LpgStore::new().unwrap();
2785 let chunk = DataChunk::empty();
2786
2787 let src = PropertySource::Constant(Value::String("hello".into()));
2788 assert_eq!(
2789 src.resolve(&chunk, 0, &store),
2790 Value::String("hello".into()),
2791 );
2792 }
2793
2794 #[test]
2795 fn test_property_source_column_out_of_bounds() {
2796 let store = LpgStore::new().unwrap();
2797 let chunk = DataChunk::empty();
2798
2799 let src = PropertySource::Column(99);
2800 assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2801 }
2802
2803 #[test]
2804 fn test_property_source_property_access_from_map() {
2805 let store = LpgStore::new().unwrap();
2806 let mut map = std::collections::BTreeMap::new();
2807 map.insert(PropertyKey::new("age"), Value::Int64(30));
2808
2809 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
2810 builder
2811 .column_mut(0)
2812 .unwrap()
2813 .push_value(Value::Map(Arc::new(map)));
2814 builder.advance_row();
2815 let chunk = builder.finish();
2816
2817 let src = PropertySource::PropertyAccess {
2818 column: 0,
2819 property: "age".to_string(),
2820 };
2821 assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(30));
2822 }
2823
2824 #[test]
2825 fn test_property_source_property_access_missing_column() {
2826 let store = LpgStore::new().unwrap();
2827 let chunk = DataChunk::empty();
2828
2829 let src = PropertySource::PropertyAccess {
2830 column: 99,
2831 property: "name".to_string(),
2832 };
2833 assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2834 }
2835}