1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
32 &self,
33 labels: &[String],
34 key: &str,
35 value: &Value,
36 ) -> Result<(), OperatorError>;
37
38 fn validate_node_complete(
46 &self,
47 labels: &[String],
48 properties: &[(String, Value)],
49 ) -> Result<(), OperatorError>;
50
51 fn check_unique_node_property(
57 &self,
58 labels: &[String],
59 key: &str,
60 value: &Value,
61 ) -> Result<(), OperatorError>;
62
63 fn validate_edge_property(
69 &self,
70 edge_type: &str,
71 key: &str,
72 value: &Value,
73 ) -> Result<(), OperatorError>;
74
75 fn validate_edge_complete(
81 &self,
82 edge_type: &str,
83 properties: &[(String, Value)],
84 ) -> Result<(), OperatorError>;
85
86 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92 let _ = labels;
93 Ok(())
94 }
95
96 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102 let _ = edge_type;
103 Ok(())
104 }
105
106 fn validate_edge_endpoints(
112 &self,
113 edge_type: &str,
114 source_labels: &[String],
115 target_labels: &[String],
116 ) -> Result<(), OperatorError> {
117 let _ = (edge_type, source_labels, target_labels);
118 Ok(())
119 }
120
121 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124 let _ = (labels, properties);
125 }
126}
127
128pub struct CreateNodeOperator {
133 store: Arc<dyn GraphStoreMut>,
135 input: Option<Box<dyn Operator>>,
137 labels: Vec<String>,
139 properties: Vec<(String, PropertySource)>,
141 output_schema: Vec<LogicalType>,
143 output_column: usize,
145 executed: bool,
147 viewing_epoch: Option<EpochId>,
149 transaction_id: Option<TransactionId>,
151 validator: Option<Arc<dyn ConstraintValidator>>,
153 write_tracker: Option<SharedWriteTracker>,
155}
156
157#[derive(Debug, Clone)]
159#[non_exhaustive]
160pub enum PropertySource {
161 Column(usize),
163 Constant(Value),
165 PropertyAccess {
167 column: usize,
169 property: String,
171 },
172}
173
174impl PropertySource {
175 pub fn resolve(
177 &self,
178 chunk: &crate::execution::chunk::DataChunk,
179 row: usize,
180 store: &dyn GraphStore,
181 ) -> Value {
182 match self {
183 PropertySource::Column(col_idx) => chunk
184 .column(*col_idx)
185 .and_then(|c| c.get_value(row))
186 .unwrap_or(Value::Null),
187 PropertySource::Constant(v) => v.clone(),
188 PropertySource::PropertyAccess { column, property } => {
189 let Some(col) = chunk.column(*column) else {
190 return Value::Null;
191 };
192 if let Some(node_id) = col.get_node_id(row) {
194 store
195 .get_node(node_id)
196 .and_then(|node| node.get_property(property).cloned())
197 .unwrap_or(Value::Null)
198 } else if let Some(edge_id) = col.get_edge_id(row) {
199 store
200 .get_edge(edge_id)
201 .and_then(|edge| edge.get_property(property).cloned())
202 .unwrap_or(Value::Null)
203 } else if let Some(Value::Map(map)) = col.get_value(row) {
204 let key = PropertyKey::new(property);
205 map.get(&key).cloned().unwrap_or(Value::Null)
206 } else {
207 Value::Null
208 }
209 }
210 }
211 }
212}
213
214impl CreateNodeOperator {
215 pub fn new(
225 store: Arc<dyn GraphStoreMut>,
226 input: Option<Box<dyn Operator>>,
227 labels: Vec<String>,
228 properties: Vec<(String, PropertySource)>,
229 output_schema: Vec<LogicalType>,
230 output_column: usize,
231 ) -> Self {
232 Self {
233 store,
234 input,
235 labels,
236 properties,
237 output_schema,
238 output_column,
239 executed: false,
240 viewing_epoch: None,
241 transaction_id: None,
242 validator: None,
243 write_tracker: None,
244 }
245 }
246
247 pub fn with_transaction_context(
249 mut self,
250 epoch: EpochId,
251 transaction_id: Option<TransactionId>,
252 ) -> Self {
253 self.viewing_epoch = Some(epoch);
254 self.transaction_id = transaction_id;
255 self
256 }
257
258 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
260 self.validator = Some(validator);
261 self
262 }
263
264 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
266 self.write_tracker = Some(tracker);
267 self
268 }
269}
270
271impl CreateNodeOperator {
272 fn validate_and_set_properties(
274 &self,
275 node_id: NodeId,
276 resolved_props: &mut Vec<(String, Value)>,
277 ) -> Result<(), OperatorError> {
278 if let Some(ref validator) = self.validator {
280 validator.validate_node_labels_allowed(&self.labels)?;
281 }
282
283 if let Some(ref validator) = self.validator {
285 validator.inject_defaults(&self.labels, resolved_props);
286 }
287
288 if let Some(ref validator) = self.validator {
290 for (name, value) in resolved_props.iter() {
291 validator.validate_node_property(&self.labels, name, value)?;
292 validator.check_unique_node_property(&self.labels, name, value)?;
293 }
294 validator.validate_node_complete(&self.labels, resolved_props)?;
296 }
297
298 if let Some(tid) = self.transaction_id {
300 for (name, value) in resolved_props.iter() {
301 self.store
302 .set_node_property_versioned(node_id, name, value.clone(), tid);
303 }
304 } else {
305 for (name, value) in resolved_props.iter() {
306 self.store.set_node_property(node_id, name, value.clone());
307 }
308 }
309 Ok(())
310 }
311}
312
313impl Operator for CreateNodeOperator {
314 fn next(&mut self) -> OperatorResult {
315 let epoch = self
317 .viewing_epoch
318 .unwrap_or_else(|| self.store.current_epoch());
319 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
320
321 if let Some(ref mut input) = self.input {
322 if let Some(chunk) = input.next()? {
324 let mut builder =
325 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
326
327 for row in chunk.selected_indices() {
328 let mut resolved_props: Vec<(String, Value)> = self
330 .properties
331 .iter()
332 .map(|(name, source)| {
333 let value =
334 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
335 (name.clone(), value)
336 })
337 .collect();
338
339 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
341 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
342
343 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
345 tracker.record_node_write(tid, node_id)?;
346 }
347
348 self.validate_and_set_properties(node_id, &mut resolved_props)?;
350
351 for col_idx in 0..chunk.column_count() {
353 if col_idx < self.output_column
354 && let (Some(src), Some(dst)) =
355 (chunk.column(col_idx), builder.column_mut(col_idx))
356 {
357 if let Some(val) = src.get_value(row) {
358 dst.push_value(val);
359 } else {
360 dst.push_value(Value::Null);
361 }
362 }
363 }
364
365 if let Some(dst) = builder.column_mut(self.output_column) {
367 #[allow(clippy::cast_possible_wrap)]
369 #[allow(clippy::cast_possible_wrap)]
371 dst.push_value(Value::Int64(node_id.0 as i64));
372 }
373
374 builder.advance_row();
375 }
376
377 return Ok(Some(builder.finish()));
378 }
379 Ok(None)
380 } else {
381 if self.executed {
383 return Ok(None);
384 }
385 self.executed = true;
386
387 let mut resolved_props: Vec<(String, Value)> = self
389 .properties
390 .iter()
391 .filter_map(|(name, source)| {
392 if let PropertySource::Constant(value) = source {
393 Some((name.clone(), value.clone()))
394 } else {
395 None
396 }
397 })
398 .collect();
399
400 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
402 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
403
404 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
406 tracker.record_node_write(tid, node_id)?;
407 }
408
409 self.validate_and_set_properties(node_id, &mut resolved_props)?;
411
412 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
414 if let Some(dst) = builder.column_mut(self.output_column) {
415 #[allow(clippy::cast_possible_wrap)]
417 dst.push_value(Value::Int64(node_id.0 as i64));
418 }
419 builder.advance_row();
420
421 Ok(Some(builder.finish()))
422 }
423 }
424
425 fn reset(&mut self) {
426 if let Some(ref mut input) = self.input {
427 input.reset();
428 }
429 self.executed = false;
430 }
431
432 fn name(&self) -> &'static str {
433 "CreateNode"
434 }
435
436 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
437 self
438 }
439}
440
441pub struct CreateEdgeOperator {
443 store: Arc<dyn GraphStoreMut>,
445 input: Box<dyn Operator>,
447 from_column: usize,
449 to_column: usize,
451 edge_type: String,
453 properties: Vec<(String, PropertySource)>,
455 output_schema: Vec<LogicalType>,
457 output_column: Option<usize>,
459 viewing_epoch: Option<EpochId>,
461 transaction_id: Option<TransactionId>,
463 validator: Option<Arc<dyn ConstraintValidator>>,
465 write_tracker: Option<SharedWriteTracker>,
467}
468
469impl CreateEdgeOperator {
470 pub fn new(
477 store: Arc<dyn GraphStoreMut>,
478 input: Box<dyn Operator>,
479 from_column: usize,
480 to_column: usize,
481 edge_type: String,
482 output_schema: Vec<LogicalType>,
483 ) -> Self {
484 Self {
485 store,
486 input,
487 from_column,
488 to_column,
489 edge_type,
490 properties: Vec::new(),
491 output_schema,
492 output_column: None,
493 viewing_epoch: None,
494 transaction_id: None,
495 validator: None,
496 write_tracker: None,
497 }
498 }
499
500 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
502 self.properties = properties;
503 self
504 }
505
506 pub fn with_output_column(mut self, column: usize) -> Self {
508 self.output_column = Some(column);
509 self
510 }
511
512 pub fn with_transaction_context(
514 mut self,
515 epoch: EpochId,
516 transaction_id: Option<TransactionId>,
517 ) -> Self {
518 self.viewing_epoch = Some(epoch);
519 self.transaction_id = transaction_id;
520 self
521 }
522
523 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
525 self.validator = Some(validator);
526 self
527 }
528
529 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
531 self.write_tracker = Some(tracker);
532 self
533 }
534}
535
536impl Operator for CreateEdgeOperator {
537 fn next(&mut self) -> OperatorResult {
538 let epoch = self
540 .viewing_epoch
541 .unwrap_or_else(|| self.store.current_epoch());
542 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
543
544 if let Some(chunk) = self.input.next()? {
545 let mut builder =
546 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
547
548 for row in chunk.selected_indices() {
549 let from_id = chunk
551 .column(self.from_column)
552 .and_then(|c| c.get_value(row))
553 .ok_or_else(|| {
554 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
555 })?;
556
557 let to_id = chunk
558 .column(self.to_column)
559 .and_then(|c| c.get_value(row))
560 .ok_or_else(|| {
561 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
562 })?;
563
564 let from_node_id = match from_id {
566 #[allow(clippy::cast_sign_loss)]
568 Value::Int64(id) => NodeId(id as u64),
569 _ => {
570 return Err(OperatorError::TypeMismatch {
571 expected: "Int64 (node ID)".to_string(),
572 found: format!("{from_id:?}"),
573 });
574 }
575 };
576
577 let to_node_id = match to_id {
578 #[allow(clippy::cast_sign_loss)]
580 Value::Int64(id) => NodeId(id as u64),
581 _ => {
582 return Err(OperatorError::TypeMismatch {
583 expected: "Int64 (node ID)".to_string(),
584 found: format!("{to_id:?}"),
585 });
586 }
587 };
588
589 if let Some(ref validator) = self.validator {
591 validator.validate_edge_type_allowed(&self.edge_type)?;
592
593 let source_labels: Vec<String> = self
595 .store
596 .get_node(from_node_id)
597 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
598 .unwrap_or_default();
599 let target_labels: Vec<String> = self
600 .store
601 .get_node(to_node_id)
602 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
603 .unwrap_or_default();
604 validator.validate_edge_endpoints(
605 &self.edge_type,
606 &source_labels,
607 &target_labels,
608 )?;
609 }
610
611 let resolved_props: Vec<(String, Value)> = self
613 .properties
614 .iter()
615 .map(|(name, source)| {
616 let value =
617 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
618 (name.clone(), value)
619 })
620 .collect();
621
622 if let Some(ref validator) = self.validator {
624 for (name, value) in &resolved_props {
625 validator.validate_edge_property(&self.edge_type, name, value)?;
626 }
627 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
628 }
629
630 let edge_id = self.store.create_edge_versioned(
632 from_node_id,
633 to_node_id,
634 &self.edge_type,
635 epoch,
636 tx,
637 );
638
639 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
641 tracker.record_edge_write(tid, edge_id)?;
642 }
643
644 if let Some(tid) = self.transaction_id {
646 for (name, value) in resolved_props {
647 self.store
648 .set_edge_property_versioned(edge_id, &name, value, tid);
649 }
650 } else {
651 for (name, value) in resolved_props {
652 self.store.set_edge_property(edge_id, &name, value);
653 }
654 }
655
656 for col_idx in 0..chunk.column_count() {
658 if let (Some(src), Some(dst)) =
659 (chunk.column(col_idx), builder.column_mut(col_idx))
660 {
661 if let Some(val) = src.get_value(row) {
662 dst.push_value(val);
663 } else {
664 dst.push_value(Value::Null);
665 }
666 }
667 }
668
669 if let Some(out_col) = self.output_column
671 && let Some(dst) = builder.column_mut(out_col)
672 {
673 #[allow(clippy::cast_possible_wrap)]
675 dst.push_value(Value::Int64(edge_id.0 as i64));
676 }
677
678 builder.advance_row();
679 }
680
681 return Ok(Some(builder.finish()));
682 }
683 Ok(None)
684 }
685
686 fn reset(&mut self) {
687 self.input.reset();
688 }
689
690 fn name(&self) -> &'static str {
691 "CreateEdge"
692 }
693
694 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
695 self
696 }
697}
698
699pub struct DeleteNodeOperator {
701 store: Arc<dyn GraphStoreMut>,
703 input: Box<dyn Operator>,
705 node_column: usize,
707 output_schema: Vec<LogicalType>,
709 detach: bool,
711 viewing_epoch: Option<EpochId>,
713 transaction_id: Option<TransactionId>,
715 write_tracker: Option<SharedWriteTracker>,
717}
718
719impl DeleteNodeOperator {
720 pub fn new(
722 store: Arc<dyn GraphStoreMut>,
723 input: Box<dyn Operator>,
724 node_column: usize,
725 output_schema: Vec<LogicalType>,
726 detach: bool,
727 ) -> Self {
728 Self {
729 store,
730 input,
731 node_column,
732 output_schema,
733 detach,
734 viewing_epoch: None,
735 transaction_id: None,
736 write_tracker: None,
737 }
738 }
739
740 pub fn with_transaction_context(
742 mut self,
743 epoch: EpochId,
744 transaction_id: Option<TransactionId>,
745 ) -> Self {
746 self.viewing_epoch = Some(epoch);
747 self.transaction_id = transaction_id;
748 self
749 }
750
751 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
753 self.write_tracker = Some(tracker);
754 self
755 }
756}
757
758impl Operator for DeleteNodeOperator {
759 fn next(&mut self) -> OperatorResult {
760 let epoch = self
762 .viewing_epoch
763 .unwrap_or_else(|| self.store.current_epoch());
764 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
765
766 if let Some(chunk) = self.input.next()? {
767 let mut builder =
768 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
769
770 for row in chunk.selected_indices() {
771 let node_val = chunk
772 .column(self.node_column)
773 .and_then(|c| c.get_value(row))
774 .ok_or_else(|| {
775 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
776 })?;
777
778 let node_id = match node_val {
779 #[allow(clippy::cast_sign_loss)]
781 Value::Int64(id) => NodeId(id as u64),
782 _ => {
783 return Err(OperatorError::TypeMismatch {
784 expected: "Int64 (node ID)".to_string(),
785 found: format!("{node_val:?}"),
786 });
787 }
788 };
789
790 if self.detach {
791 let outgoing = self
794 .store
795 .edges_from(node_id, crate::graph::Direction::Outgoing);
796 let incoming = self
797 .store
798 .edges_from(node_id, crate::graph::Direction::Incoming);
799 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
800 self.store.delete_edge_versioned(edge_id, epoch, tx);
801 if let (Some(tracker), Some(tid)) =
802 (&self.write_tracker, self.transaction_id)
803 {
804 tracker.record_edge_write(tid, edge_id)?;
805 }
806 }
807 } else {
808 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
810 if degree > 0 {
811 return Err(OperatorError::ConstraintViolation(format!(
812 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
813 degree
814 )));
815 }
816 }
817
818 self.store.delete_node_versioned(node_id, epoch, tx);
820
821 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
823 tracker.record_node_write(tid, node_id)?;
824 }
825
826 for col_idx in 0..chunk.column_count() {
829 if let (Some(src), Some(dst)) =
830 (chunk.column(col_idx), builder.column_mut(col_idx))
831 {
832 if let Some(val) = src.get_value(row) {
833 dst.push_value(val);
834 } else {
835 dst.push_value(Value::Null);
836 }
837 }
838 }
839 builder.advance_row();
840 }
841
842 return Ok(Some(builder.finish()));
843 }
844 Ok(None)
845 }
846
847 fn reset(&mut self) {
848 self.input.reset();
849 }
850
851 fn name(&self) -> &'static str {
852 "DeleteNode"
853 }
854
855 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
856 self
857 }
858}
859
860pub struct DeleteEdgeOperator {
862 store: Arc<dyn GraphStoreMut>,
864 input: Box<dyn Operator>,
866 edge_column: usize,
868 output_schema: Vec<LogicalType>,
870 viewing_epoch: Option<EpochId>,
872 transaction_id: Option<TransactionId>,
874 write_tracker: Option<SharedWriteTracker>,
876}
877
878impl DeleteEdgeOperator {
879 pub fn new(
881 store: Arc<dyn GraphStoreMut>,
882 input: Box<dyn Operator>,
883 edge_column: usize,
884 output_schema: Vec<LogicalType>,
885 ) -> Self {
886 Self {
887 store,
888 input,
889 edge_column,
890 output_schema,
891 viewing_epoch: None,
892 transaction_id: None,
893 write_tracker: None,
894 }
895 }
896
897 pub fn with_transaction_context(
899 mut self,
900 epoch: EpochId,
901 transaction_id: Option<TransactionId>,
902 ) -> Self {
903 self.viewing_epoch = Some(epoch);
904 self.transaction_id = transaction_id;
905 self
906 }
907
908 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
910 self.write_tracker = Some(tracker);
911 self
912 }
913}
914
915impl Operator for DeleteEdgeOperator {
916 fn next(&mut self) -> OperatorResult {
917 let epoch = self
919 .viewing_epoch
920 .unwrap_or_else(|| self.store.current_epoch());
921 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
922
923 if let Some(chunk) = self.input.next()? {
924 let mut builder =
925 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
926
927 for row in chunk.selected_indices() {
928 let edge_val = chunk
929 .column(self.edge_column)
930 .and_then(|c| c.get_value(row))
931 .ok_or_else(|| {
932 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
933 })?;
934
935 let edge_id = match edge_val {
936 #[allow(clippy::cast_sign_loss)]
938 Value::Int64(id) => EdgeId(id as u64),
939 _ => {
940 return Err(OperatorError::TypeMismatch {
941 expected: "Int64 (edge ID)".to_string(),
942 found: format!("{edge_val:?}"),
943 });
944 }
945 };
946
947 self.store.delete_edge_versioned(edge_id, epoch, tx);
949
950 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
952 tracker.record_edge_write(tid, edge_id)?;
953 }
954
955 for col_idx in 0..chunk.column_count() {
957 if let (Some(src), Some(dst)) =
958 (chunk.column(col_idx), builder.column_mut(col_idx))
959 {
960 if let Some(val) = src.get_value(row) {
961 dst.push_value(val);
962 } else {
963 dst.push_value(Value::Null);
964 }
965 }
966 }
967 builder.advance_row();
968 }
969
970 return Ok(Some(builder.finish()));
971 }
972 Ok(None)
973 }
974
975 fn reset(&mut self) {
976 self.input.reset();
977 }
978
979 fn name(&self) -> &'static str {
980 "DeleteEdge"
981 }
982
983 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
984 self
985 }
986}
987
988pub struct AddLabelOperator {
990 store: Arc<dyn GraphStoreMut>,
992 input: Box<dyn Operator>,
994 node_column: usize,
996 labels: Vec<String>,
998 output_schema: Vec<LogicalType>,
1000 count_column: usize,
1002 viewing_epoch: Option<EpochId>,
1004 transaction_id: Option<TransactionId>,
1006 write_tracker: Option<SharedWriteTracker>,
1008}
1009
1010impl AddLabelOperator {
1011 pub fn new(
1013 store: Arc<dyn GraphStoreMut>,
1014 input: Box<dyn Operator>,
1015 node_column: usize,
1016 labels: Vec<String>,
1017 output_schema: Vec<LogicalType>,
1018 ) -> Self {
1019 let count_column = output_schema.len() - 1;
1020 Self {
1021 store,
1022 input,
1023 node_column,
1024 labels,
1025 count_column,
1026 output_schema,
1027 viewing_epoch: None,
1028 transaction_id: None,
1029 write_tracker: None,
1030 }
1031 }
1032
1033 pub fn with_transaction_context(
1035 mut self,
1036 epoch: EpochId,
1037 transaction_id: Option<TransactionId>,
1038 ) -> Self {
1039 self.viewing_epoch = Some(epoch);
1040 self.transaction_id = transaction_id;
1041 self
1042 }
1043
1044 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1046 self.write_tracker = Some(tracker);
1047 self
1048 }
1049}
1050
1051impl Operator for AddLabelOperator {
1052 fn next(&mut self) -> OperatorResult {
1053 if let Some(chunk) = self.input.next()? {
1054 let mut builder =
1055 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1056
1057 for row in chunk.selected_indices() {
1058 let node_val = chunk
1059 .column(self.node_column)
1060 .and_then(|c| c.get_value(row))
1061 .ok_or_else(|| {
1062 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1063 })?;
1064
1065 let node_id = match node_val {
1066 #[allow(clippy::cast_sign_loss)]
1068 Value::Int64(id) => NodeId(id as u64),
1069 _ => {
1070 return Err(OperatorError::TypeMismatch {
1071 expected: "Int64 (node ID)".to_string(),
1072 found: format!("{node_val:?}"),
1073 });
1074 }
1075 };
1076
1077 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1079 tracker.record_node_write(tid, node_id)?;
1080 }
1081
1082 let mut row_count: i64 = 0;
1084 for label in &self.labels {
1085 let added = if let Some(tid) = self.transaction_id {
1086 self.store.add_label_versioned(node_id, label, tid)
1087 } else {
1088 self.store.add_label(node_id, label)
1089 };
1090 if added {
1091 row_count += 1;
1092 }
1093 }
1094
1095 for col_idx in 0..chunk.column_count() {
1097 if let (Some(src), Some(dst)) =
1098 (chunk.column(col_idx), builder.column_mut(col_idx))
1099 {
1100 if let Some(val) = src.get_value(row) {
1101 dst.push_value(val);
1102 } else {
1103 dst.push_value(Value::Null);
1104 }
1105 }
1106 }
1107 if let Some(dst) = builder.column_mut(self.count_column) {
1109 dst.push_value(Value::Int64(row_count));
1110 }
1111
1112 builder.advance_row();
1113 }
1114
1115 return Ok(Some(builder.finish()));
1116 }
1117 Ok(None)
1118 }
1119
1120 fn reset(&mut self) {
1121 self.input.reset();
1122 }
1123
1124 fn name(&self) -> &'static str {
1125 "AddLabel"
1126 }
1127
1128 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1129 self
1130 }
1131}
1132
1133pub struct RemoveLabelOperator {
1135 store: Arc<dyn GraphStoreMut>,
1137 input: Box<dyn Operator>,
1139 node_column: usize,
1141 labels: Vec<String>,
1143 output_schema: Vec<LogicalType>,
1145 count_column: usize,
1147 viewing_epoch: Option<EpochId>,
1149 transaction_id: Option<TransactionId>,
1151 write_tracker: Option<SharedWriteTracker>,
1153}
1154
1155impl RemoveLabelOperator {
1156 pub fn new(
1158 store: Arc<dyn GraphStoreMut>,
1159 input: Box<dyn Operator>,
1160 node_column: usize,
1161 labels: Vec<String>,
1162 output_schema: Vec<LogicalType>,
1163 ) -> Self {
1164 let count_column = output_schema.len() - 1;
1165 Self {
1166 store,
1167 input,
1168 node_column,
1169 labels,
1170 count_column,
1171 output_schema,
1172 viewing_epoch: None,
1173 transaction_id: None,
1174 write_tracker: None,
1175 }
1176 }
1177
1178 pub fn with_transaction_context(
1180 mut self,
1181 epoch: EpochId,
1182 transaction_id: Option<TransactionId>,
1183 ) -> Self {
1184 self.viewing_epoch = Some(epoch);
1185 self.transaction_id = transaction_id;
1186 self
1187 }
1188
1189 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1191 self.write_tracker = Some(tracker);
1192 self
1193 }
1194}
1195
1196impl Operator for RemoveLabelOperator {
1197 fn next(&mut self) -> OperatorResult {
1198 if let Some(chunk) = self.input.next()? {
1199 let mut builder =
1200 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1201
1202 for row in chunk.selected_indices() {
1203 let node_val = chunk
1204 .column(self.node_column)
1205 .and_then(|c| c.get_value(row))
1206 .ok_or_else(|| {
1207 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1208 })?;
1209
1210 let node_id = match node_val {
1211 #[allow(clippy::cast_sign_loss)]
1213 Value::Int64(id) => NodeId(id as u64),
1214 _ => {
1215 return Err(OperatorError::TypeMismatch {
1216 expected: "Int64 (node ID)".to_string(),
1217 found: format!("{node_val:?}"),
1218 });
1219 }
1220 };
1221
1222 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1224 tracker.record_node_write(tid, node_id)?;
1225 }
1226
1227 let mut row_count: i64 = 0;
1229 for label in &self.labels {
1230 let removed = if let Some(tid) = self.transaction_id {
1231 self.store.remove_label_versioned(node_id, label, tid)
1232 } else {
1233 self.store.remove_label(node_id, label)
1234 };
1235 if removed {
1236 row_count += 1;
1237 }
1238 }
1239
1240 for col_idx in 0..chunk.column_count() {
1242 if let (Some(src), Some(dst)) =
1243 (chunk.column(col_idx), builder.column_mut(col_idx))
1244 {
1245 if let Some(val) = src.get_value(row) {
1246 dst.push_value(val);
1247 } else {
1248 dst.push_value(Value::Null);
1249 }
1250 }
1251 }
1252 if let Some(dst) = builder.column_mut(self.count_column) {
1254 dst.push_value(Value::Int64(row_count));
1255 }
1256
1257 builder.advance_row();
1258 }
1259
1260 return Ok(Some(builder.finish()));
1261 }
1262 Ok(None)
1263 }
1264
1265 fn reset(&mut self) {
1266 self.input.reset();
1267 }
1268
1269 fn name(&self) -> &'static str {
1270 "RemoveLabel"
1271 }
1272
1273 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1274 self
1275 }
1276}
1277
1278pub struct SetPropertyOperator {
1283 store: Arc<dyn GraphStoreMut>,
1285 input: Box<dyn Operator>,
1287 entity_column: usize,
1289 is_edge: bool,
1291 properties: Vec<(String, PropertySource)>,
1293 output_schema: Vec<LogicalType>,
1295 replace: bool,
1297 validator: Option<Arc<dyn ConstraintValidator>>,
1299 labels: Vec<String>,
1301 edge_type_name: Option<String>,
1303 viewing_epoch: Option<EpochId>,
1305 transaction_id: Option<TransactionId>,
1307 write_tracker: Option<SharedWriteTracker>,
1309}
1310
1311impl SetPropertyOperator {
1312 pub fn new_for_node(
1314 store: Arc<dyn GraphStoreMut>,
1315 input: Box<dyn Operator>,
1316 node_column: usize,
1317 properties: Vec<(String, PropertySource)>,
1318 output_schema: Vec<LogicalType>,
1319 ) -> Self {
1320 Self {
1321 store,
1322 input,
1323 entity_column: node_column,
1324 is_edge: false,
1325 properties,
1326 output_schema,
1327 replace: false,
1328 validator: None,
1329 labels: Vec::new(),
1330 edge_type_name: None,
1331 viewing_epoch: None,
1332 transaction_id: None,
1333 write_tracker: None,
1334 }
1335 }
1336
1337 pub fn new_for_edge(
1339 store: Arc<dyn GraphStoreMut>,
1340 input: Box<dyn Operator>,
1341 edge_column: usize,
1342 properties: Vec<(String, PropertySource)>,
1343 output_schema: Vec<LogicalType>,
1344 ) -> Self {
1345 Self {
1346 store,
1347 input,
1348 entity_column: edge_column,
1349 is_edge: true,
1350 properties,
1351 output_schema,
1352 replace: false,
1353 validator: None,
1354 labels: Vec::new(),
1355 edge_type_name: None,
1356 viewing_epoch: None,
1357 transaction_id: None,
1358 write_tracker: None,
1359 }
1360 }
1361
1362 pub fn with_replace(mut self, replace: bool) -> Self {
1364 self.replace = replace;
1365 self
1366 }
1367
1368 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1370 self.validator = Some(validator);
1371 self
1372 }
1373
1374 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1376 self.labels = labels;
1377 self
1378 }
1379
1380 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1382 self.edge_type_name = Some(edge_type);
1383 self
1384 }
1385
1386 pub fn with_transaction_context(
1391 mut self,
1392 epoch: EpochId,
1393 transaction_id: Option<TransactionId>,
1394 ) -> Self {
1395 self.viewing_epoch = Some(epoch);
1396 self.transaction_id = transaction_id;
1397 self
1398 }
1399
1400 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1402 self.write_tracker = Some(tracker);
1403 self
1404 }
1405}
1406
1407impl Operator for SetPropertyOperator {
1408 fn next(&mut self) -> OperatorResult {
1409 if let Some(chunk) = self.input.next()? {
1410 let mut builder =
1411 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1412
1413 for row in chunk.selected_indices() {
1414 let entity_val = chunk
1415 .column(self.entity_column)
1416 .and_then(|c| c.get_value(row))
1417 .ok_or_else(|| {
1418 OperatorError::ColumnNotFound(format!(
1419 "entity column {}",
1420 self.entity_column
1421 ))
1422 })?;
1423
1424 let entity_id = match entity_val {
1425 #[allow(clippy::cast_sign_loss)]
1427 Value::Int64(id) => id as u64,
1428 _ => {
1429 return Err(OperatorError::TypeMismatch {
1430 expected: "Int64 (entity ID)".to_string(),
1431 found: format!("{entity_val:?}"),
1432 });
1433 }
1434 };
1435
1436 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1438 if self.is_edge {
1439 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1440 } else {
1441 tracker.record_node_write(tid, NodeId(entity_id))?;
1442 }
1443 }
1444
1445 let resolved_props: Vec<(String, Value)> = self
1447 .properties
1448 .iter()
1449 .map(|(name, source)| {
1450 let value =
1451 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1452 (name.clone(), value)
1453 })
1454 .collect();
1455
1456 if let Some(ref validator) = self.validator {
1458 if self.is_edge {
1459 if let Some(ref et) = self.edge_type_name {
1460 for (name, value) in &resolved_props {
1461 validator.validate_edge_property(et, name, value)?;
1462 }
1463 }
1464 } else {
1465 for (name, value) in &resolved_props {
1466 validator.validate_node_property(&self.labels, name, value)?;
1467 validator.check_unique_node_property(&self.labels, name, value)?;
1468 }
1469 }
1470 }
1471
1472 let tx_id = self.transaction_id;
1474 for (prop_name, value) in resolved_props {
1475 if prop_name == "*" {
1476 if let Value::Map(map) = value {
1478 if self.replace {
1479 if self.is_edge {
1481 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1482 let keys: Vec<String> = edge
1483 .properties
1484 .iter()
1485 .map(|(k, _)| k.as_str().to_string())
1486 .collect();
1487 for key in keys {
1488 if let Some(tid) = tx_id {
1489 self.store.remove_edge_property_versioned(
1490 EdgeId(entity_id),
1491 &key,
1492 tid,
1493 );
1494 } else {
1495 self.store
1496 .remove_edge_property(EdgeId(entity_id), &key);
1497 }
1498 }
1499 }
1500 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1501 let keys: Vec<String> = node
1502 .properties
1503 .iter()
1504 .map(|(k, _)| k.as_str().to_string())
1505 .collect();
1506 for key in keys {
1507 if let Some(tid) = tx_id {
1508 self.store.remove_node_property_versioned(
1509 NodeId(entity_id),
1510 &key,
1511 tid,
1512 );
1513 } else {
1514 self.store
1515 .remove_node_property(NodeId(entity_id), &key);
1516 }
1517 }
1518 }
1519 }
1520 for (key, val) in map.iter() {
1522 if val.is_null() {
1523 if self.is_edge {
1525 if let Some(tid) = tx_id {
1526 self.store.remove_edge_property_versioned(
1527 EdgeId(entity_id),
1528 key.as_str(),
1529 tid,
1530 );
1531 } else {
1532 self.store.remove_edge_property(
1533 EdgeId(entity_id),
1534 key.as_str(),
1535 );
1536 }
1537 } else if let Some(tid) = tx_id {
1538 self.store.remove_node_property_versioned(
1539 NodeId(entity_id),
1540 key.as_str(),
1541 tid,
1542 );
1543 } else {
1544 self.store
1545 .remove_node_property(NodeId(entity_id), key.as_str());
1546 }
1547 } else if self.is_edge {
1548 if let Some(tid) = tx_id {
1549 self.store.set_edge_property_versioned(
1550 EdgeId(entity_id),
1551 key.as_str(),
1552 val.clone(),
1553 tid,
1554 );
1555 } else {
1556 self.store.set_edge_property(
1557 EdgeId(entity_id),
1558 key.as_str(),
1559 val.clone(),
1560 );
1561 }
1562 } else if let Some(tid) = tx_id {
1563 self.store.set_node_property_versioned(
1564 NodeId(entity_id),
1565 key.as_str(),
1566 val.clone(),
1567 tid,
1568 );
1569 } else {
1570 self.store.set_node_property(
1571 NodeId(entity_id),
1572 key.as_str(),
1573 val.clone(),
1574 );
1575 }
1576 }
1577 }
1578 } else if self.is_edge {
1579 if let Some(tid) = tx_id {
1580 self.store.set_edge_property_versioned(
1581 EdgeId(entity_id),
1582 &prop_name,
1583 value,
1584 tid,
1585 );
1586 } else {
1587 self.store
1588 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1589 }
1590 } else if let Some(tid) = tx_id {
1591 self.store.set_node_property_versioned(
1592 NodeId(entity_id),
1593 &prop_name,
1594 value,
1595 tid,
1596 );
1597 } else {
1598 self.store
1599 .set_node_property(NodeId(entity_id), &prop_name, value);
1600 }
1601 }
1602
1603 for col_idx in 0..chunk.column_count() {
1605 if let (Some(src), Some(dst)) =
1606 (chunk.column(col_idx), builder.column_mut(col_idx))
1607 {
1608 if let Some(val) = src.get_value(row) {
1609 dst.push_value(val);
1610 } else {
1611 dst.push_value(Value::Null);
1612 }
1613 }
1614 }
1615
1616 builder.advance_row();
1617 }
1618
1619 return Ok(Some(builder.finish()));
1620 }
1621 Ok(None)
1622 }
1623
1624 fn reset(&mut self) {
1625 self.input.reset();
1626 }
1627
1628 fn name(&self) -> &'static str {
1629 "SetProperty"
1630 }
1631
1632 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1633 self
1634 }
1635}
1636
1637#[cfg(all(test, feature = "lpg"))]
1638mod tests {
1639 use super::*;
1640 use crate::execution::DataChunk;
1641 use crate::execution::chunk::DataChunkBuilder;
1642 use crate::graph::lpg::LpgStore;
1643
1644 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1647 Arc::new(LpgStore::new().unwrap())
1648 }
1649
1650 struct MockInput {
1651 chunk: Option<DataChunk>,
1652 }
1653
1654 impl MockInput {
1655 fn boxed(chunk: DataChunk) -> Box<Self> {
1656 Box::new(Self { chunk: Some(chunk) })
1657 }
1658 }
1659
1660 impl Operator for MockInput {
1661 fn next(&mut self) -> OperatorResult {
1662 Ok(self.chunk.take())
1663 }
1664 fn reset(&mut self) {}
1665 fn name(&self) -> &'static str {
1666 "MockInput"
1667 }
1668
1669 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1670 self
1671 }
1672 }
1673
1674 struct EmptyInput;
1675 impl Operator for EmptyInput {
1676 fn next(&mut self) -> OperatorResult {
1677 Ok(None)
1678 }
1679 fn reset(&mut self) {}
1680 fn name(&self) -> &'static str {
1681 "EmptyInput"
1682 }
1683
1684 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1685 self
1686 }
1687 }
1688
1689 #[allow(clippy::cast_possible_wrap)]
1691 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1692 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1693 for id in ids {
1694 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1695 builder.advance_row();
1696 }
1697 builder.finish()
1698 }
1699
1700 #[allow(clippy::cast_possible_wrap)]
1702 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1703 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1704 for id in ids {
1705 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1706 builder.advance_row();
1707 }
1708 builder.finish()
1709 }
1710
1711 #[test]
1714 fn test_create_node_standalone() {
1715 let store = create_test_store();
1716
1717 let mut op = CreateNodeOperator::new(
1718 Arc::clone(&store),
1719 None,
1720 vec!["Person".to_string()],
1721 vec![(
1722 "name".to_string(),
1723 PropertySource::Constant(Value::String("Alix".into())),
1724 )],
1725 vec![LogicalType::Int64],
1726 0,
1727 );
1728
1729 let chunk = op.next().unwrap().unwrap();
1730 assert_eq!(chunk.row_count(), 1);
1731
1732 assert!(op.next().unwrap().is_none());
1734
1735 assert_eq!(store.node_count(), 1);
1736 }
1737
1738 #[test]
1739 #[allow(clippy::cast_possible_wrap)]
1741 fn test_create_edge() {
1742 let store = create_test_store();
1743
1744 let node1 = store.create_node(&["Person"]);
1745 let node2 = store.create_node(&["Person"]);
1746
1747 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1748 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1749 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1750 builder.advance_row();
1751
1752 let mut op = CreateEdgeOperator::new(
1753 Arc::clone(&store),
1754 MockInput::boxed(builder.finish()),
1755 0,
1756 1,
1757 "KNOWS".to_string(),
1758 vec![LogicalType::Int64, LogicalType::Int64],
1759 );
1760
1761 let _chunk = op.next().unwrap().unwrap();
1762 assert_eq!(store.edge_count(), 1);
1763 }
1764
1765 #[test]
1766 fn test_delete_node() {
1767 let store = create_test_store();
1768
1769 let node_id = store.create_node(&["Person"]);
1770 assert_eq!(store.node_count(), 1);
1771
1772 let mut op = DeleteNodeOperator::new(
1773 Arc::clone(&store),
1774 MockInput::boxed(node_id_chunk(&[node_id])),
1775 0,
1776 vec![LogicalType::Node],
1777 false,
1778 );
1779
1780 let chunk = op.next().unwrap().unwrap();
1781 assert_eq!(chunk.row_count(), 1);
1783 assert_eq!(store.node_count(), 0);
1784 }
1785
1786 #[test]
1789 fn test_delete_edge() {
1790 let store = create_test_store();
1791
1792 let n1 = store.create_node(&["Person"]);
1793 let n2 = store.create_node(&["Person"]);
1794 let eid = store.create_edge(n1, n2, "KNOWS");
1795 assert_eq!(store.edge_count(), 1);
1796
1797 let mut op = DeleteEdgeOperator::new(
1798 Arc::clone(&store),
1799 MockInput::boxed(edge_id_chunk(&[eid])),
1800 0,
1801 vec![LogicalType::Node],
1802 );
1803
1804 let chunk = op.next().unwrap().unwrap();
1805 assert_eq!(chunk.row_count(), 1);
1806 assert_eq!(store.edge_count(), 0);
1807 }
1808
1809 #[test]
1810 fn test_delete_edge_no_input_returns_none() {
1811 let store = create_test_store();
1812
1813 let mut op = DeleteEdgeOperator::new(
1814 Arc::clone(&store),
1815 Box::new(EmptyInput),
1816 0,
1817 vec![LogicalType::Int64],
1818 );
1819
1820 assert!(op.next().unwrap().is_none());
1821 }
1822
1823 #[test]
1824 fn test_delete_multiple_edges() {
1825 let store = create_test_store();
1826
1827 let n1 = store.create_node(&["N"]);
1828 let n2 = store.create_node(&["N"]);
1829 let e1 = store.create_edge(n1, n2, "R");
1830 let e2 = store.create_edge(n2, n1, "S");
1831 assert_eq!(store.edge_count(), 2);
1832
1833 let mut op = DeleteEdgeOperator::new(
1834 Arc::clone(&store),
1835 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1836 0,
1837 vec![LogicalType::Node],
1838 );
1839
1840 let chunk = op.next().unwrap().unwrap();
1841 assert_eq!(chunk.row_count(), 2);
1842 assert_eq!(store.edge_count(), 0);
1843 }
1844
1845 #[test]
1848 fn test_delete_node_detach() {
1849 let store = create_test_store();
1850
1851 let n1 = store.create_node(&["Person"]);
1852 let n2 = store.create_node(&["Person"]);
1853 store.create_edge(n1, n2, "KNOWS");
1854 store.create_edge(n2, n1, "FOLLOWS");
1855 assert_eq!(store.edge_count(), 2);
1856
1857 let mut op = DeleteNodeOperator::new(
1858 Arc::clone(&store),
1859 MockInput::boxed(node_id_chunk(&[n1])),
1860 0,
1861 vec![LogicalType::Node],
1862 true, );
1864
1865 let chunk = op.next().unwrap().unwrap();
1866 assert_eq!(chunk.row_count(), 1);
1867 assert_eq!(store.node_count(), 1);
1868 assert_eq!(store.edge_count(), 0); }
1870
1871 #[test]
1874 fn test_add_label() {
1875 let store = create_test_store();
1876
1877 let node = store.create_node(&["Person"]);
1878
1879 let mut op = AddLabelOperator::new(
1880 Arc::clone(&store),
1881 MockInput::boxed(node_id_chunk(&[node])),
1882 0,
1883 vec!["Employee".to_string()],
1884 vec![LogicalType::Int64, LogicalType::Int64],
1885 );
1886
1887 let chunk = op.next().unwrap().unwrap();
1888 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1889 assert_eq!(updated, 1);
1890
1891 let node_data = store.get_node(node).unwrap();
1893 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1894 assert!(labels.contains(&"Person"));
1895 assert!(labels.contains(&"Employee"));
1896 }
1897
1898 #[test]
1899 fn test_add_multiple_labels() {
1900 let store = create_test_store();
1901
1902 let node = store.create_node(&["Base"]);
1903
1904 let mut op = AddLabelOperator::new(
1905 Arc::clone(&store),
1906 MockInput::boxed(node_id_chunk(&[node])),
1907 0,
1908 vec!["LabelA".to_string(), "LabelB".to_string()],
1909 vec![LogicalType::Int64, LogicalType::Int64],
1910 );
1911
1912 let chunk = op.next().unwrap().unwrap();
1913 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1914 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1917 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1918 assert!(labels.contains(&"LabelA"));
1919 assert!(labels.contains(&"LabelB"));
1920 }
1921
1922 #[test]
1923 fn test_add_label_no_input_returns_none() {
1924 let store = create_test_store();
1925
1926 let mut op = AddLabelOperator::new(
1927 Arc::clone(&store),
1928 Box::new(EmptyInput),
1929 0,
1930 vec!["Foo".to_string()],
1931 vec![LogicalType::Int64, LogicalType::Int64],
1932 );
1933
1934 assert!(op.next().unwrap().is_none());
1935 }
1936
1937 #[test]
1940 fn test_remove_label() {
1941 let store = create_test_store();
1942
1943 let node = store.create_node(&["Person", "Employee"]);
1944
1945 let mut op = RemoveLabelOperator::new(
1946 Arc::clone(&store),
1947 MockInput::boxed(node_id_chunk(&[node])),
1948 0,
1949 vec!["Employee".to_string()],
1950 vec![LogicalType::Int64, LogicalType::Int64],
1951 );
1952
1953 let chunk = op.next().unwrap().unwrap();
1954 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1955 assert_eq!(updated, 1);
1956
1957 let node_data = store.get_node(node).unwrap();
1959 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1960 assert!(labels.contains(&"Person"));
1961 assert!(!labels.contains(&"Employee"));
1962 }
1963
1964 #[test]
1965 fn test_remove_nonexistent_label() {
1966 let store = create_test_store();
1967
1968 let node = store.create_node(&["Person"]);
1969
1970 let mut op = RemoveLabelOperator::new(
1971 Arc::clone(&store),
1972 MockInput::boxed(node_id_chunk(&[node])),
1973 0,
1974 vec!["NonExistent".to_string()],
1975 vec![LogicalType::Int64, LogicalType::Int64],
1976 );
1977
1978 let chunk = op.next().unwrap().unwrap();
1979 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1980 assert_eq!(updated, 0); }
1982
1983 #[test]
1986 fn test_set_node_property_constant() {
1987 let store = create_test_store();
1988
1989 let node = store.create_node(&["Person"]);
1990
1991 let mut op = SetPropertyOperator::new_for_node(
1992 Arc::clone(&store),
1993 MockInput::boxed(node_id_chunk(&[node])),
1994 0,
1995 vec![(
1996 "name".to_string(),
1997 PropertySource::Constant(Value::String("Alix".into())),
1998 )],
1999 vec![LogicalType::Int64],
2000 );
2001
2002 let chunk = op.next().unwrap().unwrap();
2003 assert_eq!(chunk.row_count(), 1);
2004
2005 let node_data = store.get_node(node).unwrap();
2007 assert_eq!(
2008 node_data
2009 .properties
2010 .get(&grafeo_common::types::PropertyKey::new("name")),
2011 Some(&Value::String("Alix".into()))
2012 );
2013 }
2014
2015 #[test]
2016 #[allow(clippy::cast_possible_wrap)]
2018 fn test_set_node_property_from_column() {
2019 let store = create_test_store();
2020
2021 let node = store.create_node(&["Person"]);
2022
2023 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
2025 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
2026 builder
2027 .column_mut(1)
2028 .unwrap()
2029 .push_value(Value::String("Gus".into()));
2030 builder.advance_row();
2031
2032 let mut op = SetPropertyOperator::new_for_node(
2033 Arc::clone(&store),
2034 MockInput::boxed(builder.finish()),
2035 0,
2036 vec![("name".to_string(), PropertySource::Column(1))],
2037 vec![LogicalType::Int64, LogicalType::String],
2038 );
2039
2040 let chunk = op.next().unwrap().unwrap();
2041 assert_eq!(chunk.row_count(), 1);
2042
2043 let node_data = store.get_node(node).unwrap();
2044 assert_eq!(
2045 node_data
2046 .properties
2047 .get(&grafeo_common::types::PropertyKey::new("name")),
2048 Some(&Value::String("Gus".into()))
2049 );
2050 }
2051
2052 #[test]
2053 fn test_set_edge_property() {
2054 let store = create_test_store();
2055
2056 let n1 = store.create_node(&["N"]);
2057 let n2 = store.create_node(&["N"]);
2058 let eid = store.create_edge(n1, n2, "KNOWS");
2059
2060 let mut op = SetPropertyOperator::new_for_edge(
2061 Arc::clone(&store),
2062 MockInput::boxed(edge_id_chunk(&[eid])),
2063 0,
2064 vec![(
2065 "weight".to_string(),
2066 PropertySource::Constant(Value::Float64(0.75)),
2067 )],
2068 vec![LogicalType::Int64],
2069 );
2070
2071 let chunk = op.next().unwrap().unwrap();
2072 assert_eq!(chunk.row_count(), 1);
2073
2074 let edge_data = store.get_edge(eid).unwrap();
2075 assert_eq!(
2076 edge_data
2077 .properties
2078 .get(&grafeo_common::types::PropertyKey::new("weight")),
2079 Some(&Value::Float64(0.75))
2080 );
2081 }
2082
2083 #[test]
2084 fn test_set_multiple_properties() {
2085 let store = create_test_store();
2086
2087 let node = store.create_node(&["Person"]);
2088
2089 let mut op = SetPropertyOperator::new_for_node(
2090 Arc::clone(&store),
2091 MockInput::boxed(node_id_chunk(&[node])),
2092 0,
2093 vec![
2094 (
2095 "name".to_string(),
2096 PropertySource::Constant(Value::String("Alix".into())),
2097 ),
2098 (
2099 "age".to_string(),
2100 PropertySource::Constant(Value::Int64(30)),
2101 ),
2102 ],
2103 vec![LogicalType::Int64],
2104 );
2105
2106 op.next().unwrap().unwrap();
2107
2108 let node_data = store.get_node(node).unwrap();
2109 assert_eq!(
2110 node_data
2111 .properties
2112 .get(&grafeo_common::types::PropertyKey::new("name")),
2113 Some(&Value::String("Alix".into()))
2114 );
2115 assert_eq!(
2116 node_data
2117 .properties
2118 .get(&grafeo_common::types::PropertyKey::new("age")),
2119 Some(&Value::Int64(30))
2120 );
2121 }
2122
2123 #[test]
2124 fn test_set_property_no_input_returns_none() {
2125 let store = create_test_store();
2126
2127 let mut op = SetPropertyOperator::new_for_node(
2128 Arc::clone(&store),
2129 Box::new(EmptyInput),
2130 0,
2131 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2132 vec![LogicalType::Int64],
2133 );
2134
2135 assert!(op.next().unwrap().is_none());
2136 }
2137
2138 #[test]
2141 fn test_delete_node_without_detach_errors_when_edges_exist() {
2142 let store = create_test_store();
2143
2144 let n1 = store.create_node(&["Person"]);
2145 let n2 = store.create_node(&["Person"]);
2146 store.create_edge(n1, n2, "KNOWS");
2147
2148 let mut op = DeleteNodeOperator::new(
2149 Arc::clone(&store),
2150 MockInput::boxed(node_id_chunk(&[n1])),
2151 0,
2152 vec![LogicalType::Int64],
2153 false, );
2155
2156 let err = op.next().unwrap_err();
2157 match err {
2158 OperatorError::ConstraintViolation(msg) => {
2159 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2160 }
2161 other => panic!("expected ConstraintViolation, got {other:?}"),
2162 }
2163 assert_eq!(store.node_count(), 2);
2165 }
2166
2167 #[test]
2170 fn test_create_node_with_input_operator() {
2171 let store = create_test_store();
2172
2173 let existing = store.create_node(&["Seed"]);
2175
2176 let mut op = CreateNodeOperator::new(
2177 Arc::clone(&store),
2178 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2179 vec!["Created".to_string()],
2180 vec![(
2181 "source".to_string(),
2182 PropertySource::Constant(Value::String("from_input".into())),
2183 )],
2184 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2187
2188 let chunk = op.next().unwrap().unwrap();
2189 assert_eq!(chunk.row_count(), 1);
2190
2191 assert_eq!(store.node_count(), 2);
2193
2194 assert!(op.next().unwrap().is_none());
2196 }
2197
2198 #[test]
2201 #[allow(clippy::cast_possible_wrap, clippy::cast_sign_loss)]
2203 fn test_create_edge_with_properties_and_output_column() {
2204 let store = create_test_store();
2205
2206 let n1 = store.create_node(&["Person"]);
2207 let n2 = store.create_node(&["Person"]);
2208
2209 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2210 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2211 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2212 builder.advance_row();
2213
2214 let mut op = CreateEdgeOperator::new(
2215 Arc::clone(&store),
2216 MockInput::boxed(builder.finish()),
2217 0,
2218 1,
2219 "KNOWS".to_string(),
2220 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2221 )
2222 .with_properties(vec![(
2223 "since".to_string(),
2224 PropertySource::Constant(Value::Int64(2024)),
2225 )])
2226 .with_output_column(2);
2227
2228 let chunk = op.next().unwrap().unwrap();
2229 assert_eq!(chunk.row_count(), 1);
2230 assert_eq!(store.edge_count(), 1);
2231
2232 let edge_id_raw = chunk
2234 .column(2)
2235 .and_then(|c| c.get_int64(0))
2236 .expect("edge ID should be in output column 2");
2237 let edge_id = EdgeId(edge_id_raw as u64);
2238
2239 let edge = store.get_edge(edge_id).expect("edge should exist");
2241 assert_eq!(
2242 edge.properties
2243 .get(&grafeo_common::types::PropertyKey::new("since")),
2244 Some(&Value::Int64(2024))
2245 );
2246 }
2247
2248 #[test]
2251 fn test_set_property_map_replace() {
2252 use std::collections::BTreeMap;
2253
2254 let store = create_test_store();
2255
2256 let node = store.create_node(&["Person"]);
2257 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2258
2259 let mut map = BTreeMap::new();
2260 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2261
2262 let mut op = SetPropertyOperator::new_for_node(
2263 Arc::clone(&store),
2264 MockInput::boxed(node_id_chunk(&[node])),
2265 0,
2266 vec![(
2267 "*".to_string(),
2268 PropertySource::Constant(Value::Map(Arc::new(map))),
2269 )],
2270 vec![LogicalType::Int64],
2271 )
2272 .with_replace(true);
2273
2274 op.next().unwrap().unwrap();
2275
2276 let node_data = store.get_node(node).unwrap();
2277 assert!(
2279 node_data
2280 .properties
2281 .get(&PropertyKey::new("old_prop"))
2282 .is_none()
2283 );
2284 assert_eq!(
2286 node_data.properties.get(&PropertyKey::new("new_key")),
2287 Some(&Value::String("new_val".into()))
2288 );
2289 }
2290
2291 #[test]
2294 fn test_set_property_map_merge() {
2295 use std::collections::BTreeMap;
2296
2297 let store = create_test_store();
2298
2299 let node = store.create_node(&["Person"]);
2300 store.set_node_property(node, "existing", Value::Int64(42));
2301
2302 let mut map = BTreeMap::new();
2303 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2304
2305 let mut op = SetPropertyOperator::new_for_node(
2306 Arc::clone(&store),
2307 MockInput::boxed(node_id_chunk(&[node])),
2308 0,
2309 vec![(
2310 "*".to_string(),
2311 PropertySource::Constant(Value::Map(Arc::new(map))),
2312 )],
2313 vec![LogicalType::Int64],
2314 ); op.next().unwrap().unwrap();
2317
2318 let node_data = store.get_node(node).unwrap();
2319 assert_eq!(
2321 node_data.properties.get(&PropertyKey::new("existing")),
2322 Some(&Value::Int64(42))
2323 );
2324 assert_eq!(
2326 node_data.properties.get(&PropertyKey::new("added")),
2327 Some(&Value::String("hello".into()))
2328 );
2329 }
2330
2331 #[test]
2334 #[allow(clippy::cast_possible_wrap)]
2336 fn test_property_source_property_access() {
2337 let store = create_test_store();
2338
2339 let source_node = store.create_node(&["Source"]);
2340 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2341
2342 let target_node = store.create_node(&["Target"]);
2343
2344 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2346 builder.column_mut(0).unwrap().push_node_id(source_node);
2347 builder
2348 .column_mut(1)
2349 .unwrap()
2350 .push_int64(target_node.0 as i64);
2351 builder.advance_row();
2352
2353 let mut op = SetPropertyOperator::new_for_node(
2354 Arc::clone(&store),
2355 MockInput::boxed(builder.finish()),
2356 1, vec![(
2358 "copied_name".to_string(),
2359 PropertySource::PropertyAccess {
2360 column: 0,
2361 property: "name".to_string(),
2362 },
2363 )],
2364 vec![LogicalType::Node, LogicalType::Int64],
2365 );
2366
2367 op.next().unwrap().unwrap();
2368
2369 let target_data = store.get_node(target_node).unwrap();
2370 assert_eq!(
2371 target_data.properties.get(&PropertyKey::new("copied_name")),
2372 Some(&Value::String("Alix".into()))
2373 );
2374 }
2375
2376 #[test]
2379 fn test_create_node_with_constraint_validator() {
2380 let store = create_test_store();
2381
2382 struct RejectAgeValidator;
2383 impl ConstraintValidator for RejectAgeValidator {
2384 fn validate_node_property(
2385 &self,
2386 _labels: &[String],
2387 key: &str,
2388 _value: &Value,
2389 ) -> Result<(), OperatorError> {
2390 if key == "forbidden" {
2391 return Err(OperatorError::ConstraintViolation(
2392 "property 'forbidden' is not allowed".to_string(),
2393 ));
2394 }
2395 Ok(())
2396 }
2397 fn validate_node_complete(
2398 &self,
2399 _labels: &[String],
2400 _properties: &[(String, Value)],
2401 ) -> Result<(), OperatorError> {
2402 Ok(())
2403 }
2404 fn check_unique_node_property(
2405 &self,
2406 _labels: &[String],
2407 _key: &str,
2408 _value: &Value,
2409 ) -> Result<(), OperatorError> {
2410 Ok(())
2411 }
2412 fn validate_edge_property(
2413 &self,
2414 _edge_type: &str,
2415 _key: &str,
2416 _value: &Value,
2417 ) -> Result<(), OperatorError> {
2418 Ok(())
2419 }
2420 fn validate_edge_complete(
2421 &self,
2422 _edge_type: &str,
2423 _properties: &[(String, Value)],
2424 ) -> Result<(), OperatorError> {
2425 Ok(())
2426 }
2427 }
2428
2429 let mut op = CreateNodeOperator::new(
2431 Arc::clone(&store),
2432 None,
2433 vec!["Thing".to_string()],
2434 vec![(
2435 "name".to_string(),
2436 PropertySource::Constant(Value::String("ok".into())),
2437 )],
2438 vec![LogicalType::Int64],
2439 0,
2440 )
2441 .with_validator(Arc::new(RejectAgeValidator));
2442
2443 assert!(op.next().is_ok());
2444 assert_eq!(store.node_count(), 1);
2445
2446 let mut op = CreateNodeOperator::new(
2448 Arc::clone(&store),
2449 None,
2450 vec!["Thing".to_string()],
2451 vec![(
2452 "forbidden".to_string(),
2453 PropertySource::Constant(Value::Int64(1)),
2454 )],
2455 vec![LogicalType::Int64],
2456 0,
2457 )
2458 .with_validator(Arc::new(RejectAgeValidator));
2459
2460 let err = op.next().unwrap_err();
2461 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2462 }
2465
2466 #[test]
2469 fn test_create_node_reset_allows_re_execution() {
2470 let store = create_test_store();
2471
2472 let mut op = CreateNodeOperator::new(
2473 Arc::clone(&store),
2474 None,
2475 vec!["Person".to_string()],
2476 vec![],
2477 vec![LogicalType::Int64],
2478 0,
2479 );
2480
2481 assert!(op.next().unwrap().is_some());
2483 assert!(op.next().unwrap().is_none());
2484
2485 op.reset();
2487 assert!(op.next().unwrap().is_some());
2488
2489 assert_eq!(store.node_count(), 2);
2490 }
2491
2492 #[test]
2495 fn test_operator_names() {
2496 let store = create_test_store();
2497
2498 let op = CreateNodeOperator::new(
2499 Arc::clone(&store),
2500 None,
2501 vec![],
2502 vec![],
2503 vec![LogicalType::Int64],
2504 0,
2505 );
2506 assert_eq!(op.name(), "CreateNode");
2507
2508 let op = CreateEdgeOperator::new(
2509 Arc::clone(&store),
2510 Box::new(EmptyInput),
2511 0,
2512 1,
2513 "R".to_string(),
2514 vec![LogicalType::Int64],
2515 );
2516 assert_eq!(op.name(), "CreateEdge");
2517
2518 let op = DeleteNodeOperator::new(
2519 Arc::clone(&store),
2520 Box::new(EmptyInput),
2521 0,
2522 vec![LogicalType::Int64],
2523 false,
2524 );
2525 assert_eq!(op.name(), "DeleteNode");
2526
2527 let op = DeleteEdgeOperator::new(
2528 Arc::clone(&store),
2529 Box::new(EmptyInput),
2530 0,
2531 vec![LogicalType::Int64],
2532 );
2533 assert_eq!(op.name(), "DeleteEdge");
2534
2535 let op = AddLabelOperator::new(
2536 Arc::clone(&store),
2537 Box::new(EmptyInput),
2538 0,
2539 vec!["L".to_string()],
2540 vec![LogicalType::Int64],
2541 );
2542 assert_eq!(op.name(), "AddLabel");
2543
2544 let op = RemoveLabelOperator::new(
2545 Arc::clone(&store),
2546 Box::new(EmptyInput),
2547 0,
2548 vec!["L".to_string()],
2549 vec![LogicalType::Int64],
2550 );
2551 assert_eq!(op.name(), "RemoveLabel");
2552
2553 let op = SetPropertyOperator::new_for_node(
2554 Arc::clone(&store),
2555 Box::new(EmptyInput),
2556 0,
2557 vec![],
2558 vec![LogicalType::Int64],
2559 );
2560 assert_eq!(op.name(), "SetProperty");
2561 }
2562
2563 #[test]
2566 fn test_create_node_into_any() {
2567 let store = create_test_store();
2568 let op = CreateNodeOperator::new(
2569 Arc::clone(&store),
2570 None,
2571 vec!["Person".to_string()],
2572 vec![],
2573 vec![LogicalType::Int64],
2574 0,
2575 );
2576 let any = Box::new(op).into_any();
2577 assert!(any.downcast::<CreateNodeOperator>().is_ok());
2578 }
2579
2580 #[test]
2581 fn test_create_edge_into_any() {
2582 let store = create_test_store();
2583 let op = CreateEdgeOperator::new(
2584 Arc::clone(&store),
2585 Box::new(EmptyInput),
2586 0,
2587 1,
2588 "KNOWS".to_string(),
2589 vec![LogicalType::Int64],
2590 );
2591 let any = Box::new(op).into_any();
2592 assert!(any.downcast::<CreateEdgeOperator>().is_ok());
2593 }
2594
2595 #[test]
2596 fn test_delete_node_into_any() {
2597 let store = create_test_store();
2598 let op = DeleteNodeOperator::new(
2599 Arc::clone(&store),
2600 Box::new(EmptyInput),
2601 0,
2602 vec![LogicalType::Int64],
2603 false,
2604 );
2605 let any = Box::new(op).into_any();
2606 assert!(any.downcast::<DeleteNodeOperator>().is_ok());
2607 }
2608
2609 #[test]
2610 fn test_delete_edge_into_any() {
2611 let store = create_test_store();
2612 let op = DeleteEdgeOperator::new(
2613 Arc::clone(&store),
2614 Box::new(EmptyInput),
2615 0,
2616 vec![LogicalType::Int64],
2617 );
2618 let any = Box::new(op).into_any();
2619 assert!(any.downcast::<DeleteEdgeOperator>().is_ok());
2620 }
2621
2622 #[test]
2623 fn test_add_label_into_any() {
2624 let store = create_test_store();
2625 let op = AddLabelOperator::new(
2626 Arc::clone(&store),
2627 Box::new(EmptyInput),
2628 0,
2629 vec!["Label".to_string()],
2630 vec![LogicalType::Int64],
2631 );
2632 let any = Box::new(op).into_any();
2633 assert!(any.downcast::<AddLabelOperator>().is_ok());
2634 }
2635
2636 #[test]
2637 fn test_remove_label_into_any() {
2638 let store = create_test_store();
2639 let op = RemoveLabelOperator::new(
2640 Arc::clone(&store),
2641 Box::new(EmptyInput),
2642 0,
2643 vec!["Label".to_string()],
2644 vec![LogicalType::Int64],
2645 );
2646 let any = Box::new(op).into_any();
2647 assert!(any.downcast::<RemoveLabelOperator>().is_ok());
2648 }
2649
2650 #[test]
2651 fn test_set_property_into_any() {
2652 let store = create_test_store();
2653 let op = SetPropertyOperator::new_for_node(
2654 Arc::clone(&store),
2655 Box::new(EmptyInput),
2656 0,
2657 vec![],
2658 vec![LogicalType::Int64],
2659 );
2660 let any = Box::new(op).into_any();
2661 assert!(any.downcast::<SetPropertyOperator>().is_ok());
2662 }
2663
2664 struct MinimalValidator;
2669
2670 impl ConstraintValidator for MinimalValidator {
2671 fn validate_node_property(
2672 &self,
2673 _labels: &[String],
2674 _key: &str,
2675 _value: &Value,
2676 ) -> Result<(), OperatorError> {
2677 Ok(())
2678 }
2679 fn validate_node_complete(
2680 &self,
2681 _labels: &[String],
2682 _properties: &[(String, Value)],
2683 ) -> Result<(), OperatorError> {
2684 Ok(())
2685 }
2686 fn check_unique_node_property(
2687 &self,
2688 _labels: &[String],
2689 _key: &str,
2690 _value: &Value,
2691 ) -> Result<(), OperatorError> {
2692 Ok(())
2693 }
2694 fn validate_edge_property(
2695 &self,
2696 _edge_type: &str,
2697 _key: &str,
2698 _value: &Value,
2699 ) -> Result<(), OperatorError> {
2700 Ok(())
2701 }
2702 fn validate_edge_complete(
2703 &self,
2704 _edge_type: &str,
2705 _properties: &[(String, Value)],
2706 ) -> Result<(), OperatorError> {
2707 Ok(())
2708 }
2709 }
2710
2711 #[test]
2712 fn test_constraint_validator_default_node_labels_allowed() {
2713 let v = MinimalValidator;
2714 assert!(
2715 v.validate_node_labels_allowed(&["Person".to_string(), "Actor".to_string()])
2716 .is_ok()
2717 );
2718 }
2719
2720 #[test]
2721 fn test_constraint_validator_default_edge_type_allowed() {
2722 let v = MinimalValidator;
2723 assert!(v.validate_edge_type_allowed("KNOWS").is_ok());
2724 }
2725
2726 #[test]
2727 fn test_constraint_validator_default_edge_endpoints() {
2728 let v = MinimalValidator;
2729 assert!(
2730 v.validate_edge_endpoints("KNOWS", &["Person".to_string()], &["Person".to_string()],)
2731 .is_ok()
2732 );
2733 }
2734
2735 #[test]
2736 fn test_constraint_validator_default_inject_defaults() {
2737 let v = MinimalValidator;
2738 let mut props = vec![("name".to_string(), Value::String("Alix".into()))];
2739 v.inject_defaults(&["Person".to_string()], &mut props);
2740 assert_eq!(props.len(), 1);
2742 }
2743
2744 #[test]
2747 fn test_property_source_column() {
2748 let store = LpgStore::new().unwrap();
2749 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
2750 builder.column_mut(0).unwrap().push_int64(42);
2751 builder.advance_row();
2752 let chunk = builder.finish();
2753
2754 let src = PropertySource::Column(0);
2755 assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(42));
2756 }
2757
2758 #[test]
2759 fn test_property_source_constant() {
2760 let store = LpgStore::new().unwrap();
2761 let chunk = DataChunk::empty();
2762
2763 let src = PropertySource::Constant(Value::String("hello".into()));
2764 assert_eq!(
2765 src.resolve(&chunk, 0, &store),
2766 Value::String("hello".into()),
2767 );
2768 }
2769
2770 #[test]
2771 fn test_property_source_column_out_of_bounds() {
2772 let store = LpgStore::new().unwrap();
2773 let chunk = DataChunk::empty();
2774
2775 let src = PropertySource::Column(99);
2776 assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2777 }
2778
2779 #[test]
2780 fn test_property_source_property_access_from_map() {
2781 let store = LpgStore::new().unwrap();
2782 let mut map = std::collections::BTreeMap::new();
2783 map.insert(PropertyKey::new("age"), Value::Int64(30));
2784
2785 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
2786 builder
2787 .column_mut(0)
2788 .unwrap()
2789 .push_value(Value::Map(Arc::new(map)));
2790 builder.advance_row();
2791 let chunk = builder.finish();
2792
2793 let src = PropertySource::PropertyAccess {
2794 column: 0,
2795 property: "age".to_string(),
2796 };
2797 assert_eq!(src.resolve(&chunk, 0, &store), Value::Int64(30));
2798 }
2799
2800 #[test]
2801 fn test_property_source_property_access_missing_column() {
2802 let store = LpgStore::new().unwrap();
2803 let chunk = DataChunk::empty();
2804
2805 let src = PropertySource::PropertyAccess {
2806 column: 99,
2807 property: "name".to_string(),
2808 };
2809 assert_eq!(src.resolve(&chunk, 0, &store), Value::Null);
2810 }
2811}