1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
32 &self,
33 labels: &[String],
34 key: &str,
35 value: &Value,
36 ) -> Result<(), OperatorError>;
37
38 fn validate_node_complete(
46 &self,
47 labels: &[String],
48 properties: &[(String, Value)],
49 ) -> Result<(), OperatorError>;
50
51 fn check_unique_node_property(
57 &self,
58 labels: &[String],
59 key: &str,
60 value: &Value,
61 ) -> Result<(), OperatorError>;
62
63 fn validate_edge_property(
69 &self,
70 edge_type: &str,
71 key: &str,
72 value: &Value,
73 ) -> Result<(), OperatorError>;
74
75 fn validate_edge_complete(
81 &self,
82 edge_type: &str,
83 properties: &[(String, Value)],
84 ) -> Result<(), OperatorError>;
85
86 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92 let _ = labels;
93 Ok(())
94 }
95
96 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102 let _ = edge_type;
103 Ok(())
104 }
105
106 fn validate_edge_endpoints(
112 &self,
113 edge_type: &str,
114 source_labels: &[String],
115 target_labels: &[String],
116 ) -> Result<(), OperatorError> {
117 let _ = (edge_type, source_labels, target_labels);
118 Ok(())
119 }
120
121 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124 let _ = (labels, properties);
125 }
126}
127
128pub struct CreateNodeOperator {
133 store: Arc<dyn GraphStoreMut>,
135 input: Option<Box<dyn Operator>>,
137 labels: Vec<String>,
139 properties: Vec<(String, PropertySource)>,
141 output_schema: Vec<LogicalType>,
143 output_column: usize,
145 executed: bool,
147 viewing_epoch: Option<EpochId>,
149 transaction_id: Option<TransactionId>,
151 validator: Option<Arc<dyn ConstraintValidator>>,
153 write_tracker: Option<SharedWriteTracker>,
155}
156
157#[derive(Debug, Clone)]
159#[non_exhaustive]
160pub enum PropertySource {
161 Column(usize),
163 Constant(Value),
165 PropertyAccess {
167 column: usize,
169 property: String,
171 },
172}
173
174impl PropertySource {
175 pub fn resolve(
177 &self,
178 chunk: &crate::execution::chunk::DataChunk,
179 row: usize,
180 store: &dyn GraphStore,
181 ) -> Value {
182 match self {
183 PropertySource::Column(col_idx) => chunk
184 .column(*col_idx)
185 .and_then(|c| c.get_value(row))
186 .unwrap_or(Value::Null),
187 PropertySource::Constant(v) => v.clone(),
188 PropertySource::PropertyAccess { column, property } => {
189 let Some(col) = chunk.column(*column) else {
190 return Value::Null;
191 };
192 if let Some(node_id) = col.get_node_id(row) {
194 store
195 .get_node(node_id)
196 .and_then(|node| node.get_property(property).cloned())
197 .unwrap_or(Value::Null)
198 } else if let Some(edge_id) = col.get_edge_id(row) {
199 store
200 .get_edge(edge_id)
201 .and_then(|edge| edge.get_property(property).cloned())
202 .unwrap_or(Value::Null)
203 } else if let Some(Value::Map(map)) = col.get_value(row) {
204 let key = PropertyKey::new(property);
205 map.get(&key).cloned().unwrap_or(Value::Null)
206 } else {
207 Value::Null
208 }
209 }
210 }
211 }
212}
213
214impl CreateNodeOperator {
215 pub fn new(
225 store: Arc<dyn GraphStoreMut>,
226 input: Option<Box<dyn Operator>>,
227 labels: Vec<String>,
228 properties: Vec<(String, PropertySource)>,
229 output_schema: Vec<LogicalType>,
230 output_column: usize,
231 ) -> Self {
232 Self {
233 store,
234 input,
235 labels,
236 properties,
237 output_schema,
238 output_column,
239 executed: false,
240 viewing_epoch: None,
241 transaction_id: None,
242 validator: None,
243 write_tracker: None,
244 }
245 }
246
247 pub fn with_transaction_context(
249 mut self,
250 epoch: EpochId,
251 transaction_id: Option<TransactionId>,
252 ) -> Self {
253 self.viewing_epoch = Some(epoch);
254 self.transaction_id = transaction_id;
255 self
256 }
257
258 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
260 self.validator = Some(validator);
261 self
262 }
263
264 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
266 self.write_tracker = Some(tracker);
267 self
268 }
269}
270
271impl CreateNodeOperator {
272 fn validate_and_set_properties(
274 &self,
275 node_id: NodeId,
276 resolved_props: &mut Vec<(String, Value)>,
277 ) -> Result<(), OperatorError> {
278 if let Some(ref validator) = self.validator {
280 validator.validate_node_labels_allowed(&self.labels)?;
281 }
282
283 if let Some(ref validator) = self.validator {
285 validator.inject_defaults(&self.labels, resolved_props);
286 }
287
288 if let Some(ref validator) = self.validator {
290 for (name, value) in resolved_props.iter() {
291 validator.validate_node_property(&self.labels, name, value)?;
292 validator.check_unique_node_property(&self.labels, name, value)?;
293 }
294 validator.validate_node_complete(&self.labels, resolved_props)?;
296 }
297
298 if let Some(tid) = self.transaction_id {
300 for (name, value) in resolved_props.iter() {
301 self.store
302 .set_node_property_versioned(node_id, name, value.clone(), tid);
303 }
304 } else {
305 for (name, value) in resolved_props.iter() {
306 self.store.set_node_property(node_id, name, value.clone());
307 }
308 }
309 Ok(())
310 }
311}
312
313impl Operator for CreateNodeOperator {
314 fn next(&mut self) -> OperatorResult {
315 let epoch = self
317 .viewing_epoch
318 .unwrap_or_else(|| self.store.current_epoch());
319 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
320
321 if let Some(ref mut input) = self.input {
322 if let Some(chunk) = input.next()? {
324 let mut builder =
325 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
326
327 for row in chunk.selected_indices() {
328 let mut resolved_props: Vec<(String, Value)> = self
330 .properties
331 .iter()
332 .map(|(name, source)| {
333 let value =
334 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
335 (name.clone(), value)
336 })
337 .collect();
338
339 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
341 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
342
343 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
345 tracker.record_node_write(tid, node_id)?;
346 }
347
348 self.validate_and_set_properties(node_id, &mut resolved_props)?;
350
351 for col_idx in 0..chunk.column_count() {
353 if col_idx < self.output_column
354 && let (Some(src), Some(dst)) =
355 (chunk.column(col_idx), builder.column_mut(col_idx))
356 {
357 if let Some(val) = src.get_value(row) {
358 dst.push_value(val);
359 } else {
360 dst.push_value(Value::Null);
361 }
362 }
363 }
364
365 if let Some(dst) = builder.column_mut(self.output_column) {
367 dst.push_value(Value::Int64(node_id.0 as i64));
368 }
369
370 builder.advance_row();
371 }
372
373 return Ok(Some(builder.finish()));
374 }
375 Ok(None)
376 } else {
377 if self.executed {
379 return Ok(None);
380 }
381 self.executed = true;
382
383 let mut resolved_props: Vec<(String, Value)> = self
385 .properties
386 .iter()
387 .filter_map(|(name, source)| {
388 if let PropertySource::Constant(value) = source {
389 Some((name.clone(), value.clone()))
390 } else {
391 None
392 }
393 })
394 .collect();
395
396 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
398 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
399
400 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
402 tracker.record_node_write(tid, node_id)?;
403 }
404
405 self.validate_and_set_properties(node_id, &mut resolved_props)?;
407
408 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
410 if let Some(dst) = builder.column_mut(self.output_column) {
411 dst.push_value(Value::Int64(node_id.0 as i64));
412 }
413 builder.advance_row();
414
415 Ok(Some(builder.finish()))
416 }
417 }
418
419 fn reset(&mut self) {
420 if let Some(ref mut input) = self.input {
421 input.reset();
422 }
423 self.executed = false;
424 }
425
426 fn name(&self) -> &'static str {
427 "CreateNode"
428 }
429}
430
431pub struct CreateEdgeOperator {
433 store: Arc<dyn GraphStoreMut>,
435 input: Box<dyn Operator>,
437 from_column: usize,
439 to_column: usize,
441 edge_type: String,
443 properties: Vec<(String, PropertySource)>,
445 output_schema: Vec<LogicalType>,
447 output_column: Option<usize>,
449 viewing_epoch: Option<EpochId>,
451 transaction_id: Option<TransactionId>,
453 validator: Option<Arc<dyn ConstraintValidator>>,
455 write_tracker: Option<SharedWriteTracker>,
457}
458
459impl CreateEdgeOperator {
460 pub fn new(
467 store: Arc<dyn GraphStoreMut>,
468 input: Box<dyn Operator>,
469 from_column: usize,
470 to_column: usize,
471 edge_type: String,
472 output_schema: Vec<LogicalType>,
473 ) -> Self {
474 Self {
475 store,
476 input,
477 from_column,
478 to_column,
479 edge_type,
480 properties: Vec::new(),
481 output_schema,
482 output_column: None,
483 viewing_epoch: None,
484 transaction_id: None,
485 validator: None,
486 write_tracker: None,
487 }
488 }
489
490 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
492 self.properties = properties;
493 self
494 }
495
496 pub fn with_output_column(mut self, column: usize) -> Self {
498 self.output_column = Some(column);
499 self
500 }
501
502 pub fn with_transaction_context(
504 mut self,
505 epoch: EpochId,
506 transaction_id: Option<TransactionId>,
507 ) -> Self {
508 self.viewing_epoch = Some(epoch);
509 self.transaction_id = transaction_id;
510 self
511 }
512
513 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
515 self.validator = Some(validator);
516 self
517 }
518
519 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
521 self.write_tracker = Some(tracker);
522 self
523 }
524}
525
526impl Operator for CreateEdgeOperator {
527 fn next(&mut self) -> OperatorResult {
528 let epoch = self
530 .viewing_epoch
531 .unwrap_or_else(|| self.store.current_epoch());
532 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
533
534 if let Some(chunk) = self.input.next()? {
535 let mut builder =
536 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
537
538 for row in chunk.selected_indices() {
539 let from_id = chunk
541 .column(self.from_column)
542 .and_then(|c| c.get_value(row))
543 .ok_or_else(|| {
544 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
545 })?;
546
547 let to_id = chunk
548 .column(self.to_column)
549 .and_then(|c| c.get_value(row))
550 .ok_or_else(|| {
551 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
552 })?;
553
554 let from_node_id = match from_id {
556 Value::Int64(id) => NodeId(id as u64),
557 _ => {
558 return Err(OperatorError::TypeMismatch {
559 expected: "Int64 (node ID)".to_string(),
560 found: format!("{from_id:?}"),
561 });
562 }
563 };
564
565 let to_node_id = match to_id {
566 Value::Int64(id) => NodeId(id as u64),
567 _ => {
568 return Err(OperatorError::TypeMismatch {
569 expected: "Int64 (node ID)".to_string(),
570 found: format!("{to_id:?}"),
571 });
572 }
573 };
574
575 if let Some(ref validator) = self.validator {
577 validator.validate_edge_type_allowed(&self.edge_type)?;
578
579 let source_labels: Vec<String> = self
581 .store
582 .get_node(from_node_id)
583 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
584 .unwrap_or_default();
585 let target_labels: Vec<String> = self
586 .store
587 .get_node(to_node_id)
588 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
589 .unwrap_or_default();
590 validator.validate_edge_endpoints(
591 &self.edge_type,
592 &source_labels,
593 &target_labels,
594 )?;
595 }
596
597 let resolved_props: Vec<(String, Value)> = self
599 .properties
600 .iter()
601 .map(|(name, source)| {
602 let value =
603 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
604 (name.clone(), value)
605 })
606 .collect();
607
608 if let Some(ref validator) = self.validator {
610 for (name, value) in &resolved_props {
611 validator.validate_edge_property(&self.edge_type, name, value)?;
612 }
613 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
614 }
615
616 let edge_id = self.store.create_edge_versioned(
618 from_node_id,
619 to_node_id,
620 &self.edge_type,
621 epoch,
622 tx,
623 );
624
625 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
627 tracker.record_edge_write(tid, edge_id)?;
628 }
629
630 if let Some(tid) = self.transaction_id {
632 for (name, value) in resolved_props {
633 self.store
634 .set_edge_property_versioned(edge_id, &name, value, tid);
635 }
636 } else {
637 for (name, value) in resolved_props {
638 self.store.set_edge_property(edge_id, &name, value);
639 }
640 }
641
642 for col_idx in 0..chunk.column_count() {
644 if let (Some(src), Some(dst)) =
645 (chunk.column(col_idx), builder.column_mut(col_idx))
646 {
647 if let Some(val) = src.get_value(row) {
648 dst.push_value(val);
649 } else {
650 dst.push_value(Value::Null);
651 }
652 }
653 }
654
655 if let Some(out_col) = self.output_column
657 && let Some(dst) = builder.column_mut(out_col)
658 {
659 dst.push_value(Value::Int64(edge_id.0 as i64));
660 }
661
662 builder.advance_row();
663 }
664
665 return Ok(Some(builder.finish()));
666 }
667 Ok(None)
668 }
669
670 fn reset(&mut self) {
671 self.input.reset();
672 }
673
674 fn name(&self) -> &'static str {
675 "CreateEdge"
676 }
677}
678
679pub struct DeleteNodeOperator {
681 store: Arc<dyn GraphStoreMut>,
683 input: Box<dyn Operator>,
685 node_column: usize,
687 output_schema: Vec<LogicalType>,
689 detach: bool,
691 viewing_epoch: Option<EpochId>,
693 transaction_id: Option<TransactionId>,
695 write_tracker: Option<SharedWriteTracker>,
697}
698
699impl DeleteNodeOperator {
700 pub fn new(
702 store: Arc<dyn GraphStoreMut>,
703 input: Box<dyn Operator>,
704 node_column: usize,
705 output_schema: Vec<LogicalType>,
706 detach: bool,
707 ) -> Self {
708 Self {
709 store,
710 input,
711 node_column,
712 output_schema,
713 detach,
714 viewing_epoch: None,
715 transaction_id: None,
716 write_tracker: None,
717 }
718 }
719
720 pub fn with_transaction_context(
722 mut self,
723 epoch: EpochId,
724 transaction_id: Option<TransactionId>,
725 ) -> Self {
726 self.viewing_epoch = Some(epoch);
727 self.transaction_id = transaction_id;
728 self
729 }
730
731 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
733 self.write_tracker = Some(tracker);
734 self
735 }
736}
737
738impl Operator for DeleteNodeOperator {
739 fn next(&mut self) -> OperatorResult {
740 let epoch = self
742 .viewing_epoch
743 .unwrap_or_else(|| self.store.current_epoch());
744 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
745
746 if let Some(chunk) = self.input.next()? {
747 let mut builder =
748 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
749
750 for row in chunk.selected_indices() {
751 let node_val = chunk
752 .column(self.node_column)
753 .and_then(|c| c.get_value(row))
754 .ok_or_else(|| {
755 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
756 })?;
757
758 let node_id = match node_val {
759 Value::Int64(id) => NodeId(id as u64),
760 _ => {
761 return Err(OperatorError::TypeMismatch {
762 expected: "Int64 (node ID)".to_string(),
763 found: format!("{node_val:?}"),
764 });
765 }
766 };
767
768 if self.detach {
769 let outgoing = self
772 .store
773 .edges_from(node_id, crate::graph::Direction::Outgoing);
774 let incoming = self
775 .store
776 .edges_from(node_id, crate::graph::Direction::Incoming);
777 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
778 self.store.delete_edge_versioned(edge_id, epoch, tx);
779 if let (Some(tracker), Some(tid)) =
780 (&self.write_tracker, self.transaction_id)
781 {
782 tracker.record_edge_write(tid, edge_id)?;
783 }
784 }
785 } else {
786 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
788 if degree > 0 {
789 return Err(OperatorError::ConstraintViolation(format!(
790 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
791 degree
792 )));
793 }
794 }
795
796 self.store.delete_node_versioned(node_id, epoch, tx);
798
799 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
801 tracker.record_node_write(tid, node_id)?;
802 }
803
804 for col_idx in 0..chunk.column_count() {
807 if let (Some(src), Some(dst)) =
808 (chunk.column(col_idx), builder.column_mut(col_idx))
809 {
810 if let Some(val) = src.get_value(row) {
811 dst.push_value(val);
812 } else {
813 dst.push_value(Value::Null);
814 }
815 }
816 }
817 builder.advance_row();
818 }
819
820 return Ok(Some(builder.finish()));
821 }
822 Ok(None)
823 }
824
825 fn reset(&mut self) {
826 self.input.reset();
827 }
828
829 fn name(&self) -> &'static str {
830 "DeleteNode"
831 }
832}
833
834pub struct DeleteEdgeOperator {
836 store: Arc<dyn GraphStoreMut>,
838 input: Box<dyn Operator>,
840 edge_column: usize,
842 output_schema: Vec<LogicalType>,
844 viewing_epoch: Option<EpochId>,
846 transaction_id: Option<TransactionId>,
848 write_tracker: Option<SharedWriteTracker>,
850}
851
852impl DeleteEdgeOperator {
853 pub fn new(
855 store: Arc<dyn GraphStoreMut>,
856 input: Box<dyn Operator>,
857 edge_column: usize,
858 output_schema: Vec<LogicalType>,
859 ) -> Self {
860 Self {
861 store,
862 input,
863 edge_column,
864 output_schema,
865 viewing_epoch: None,
866 transaction_id: None,
867 write_tracker: None,
868 }
869 }
870
871 pub fn with_transaction_context(
873 mut self,
874 epoch: EpochId,
875 transaction_id: Option<TransactionId>,
876 ) -> Self {
877 self.viewing_epoch = Some(epoch);
878 self.transaction_id = transaction_id;
879 self
880 }
881
882 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
884 self.write_tracker = Some(tracker);
885 self
886 }
887}
888
889impl Operator for DeleteEdgeOperator {
890 fn next(&mut self) -> OperatorResult {
891 let epoch = self
893 .viewing_epoch
894 .unwrap_or_else(|| self.store.current_epoch());
895 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
896
897 if let Some(chunk) = self.input.next()? {
898 let mut builder =
899 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
900
901 for row in chunk.selected_indices() {
902 let edge_val = chunk
903 .column(self.edge_column)
904 .and_then(|c| c.get_value(row))
905 .ok_or_else(|| {
906 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
907 })?;
908
909 let edge_id = match edge_val {
910 Value::Int64(id) => EdgeId(id as u64),
911 _ => {
912 return Err(OperatorError::TypeMismatch {
913 expected: "Int64 (edge ID)".to_string(),
914 found: format!("{edge_val:?}"),
915 });
916 }
917 };
918
919 self.store.delete_edge_versioned(edge_id, epoch, tx);
921
922 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
924 tracker.record_edge_write(tid, edge_id)?;
925 }
926
927 for col_idx in 0..chunk.column_count() {
929 if let (Some(src), Some(dst)) =
930 (chunk.column(col_idx), builder.column_mut(col_idx))
931 {
932 if let Some(val) = src.get_value(row) {
933 dst.push_value(val);
934 } else {
935 dst.push_value(Value::Null);
936 }
937 }
938 }
939 builder.advance_row();
940 }
941
942 return Ok(Some(builder.finish()));
943 }
944 Ok(None)
945 }
946
947 fn reset(&mut self) {
948 self.input.reset();
949 }
950
951 fn name(&self) -> &'static str {
952 "DeleteEdge"
953 }
954}
955
956pub struct AddLabelOperator {
958 store: Arc<dyn GraphStoreMut>,
960 input: Box<dyn Operator>,
962 node_column: usize,
964 labels: Vec<String>,
966 output_schema: Vec<LogicalType>,
968 count_column: usize,
970 viewing_epoch: Option<EpochId>,
972 transaction_id: Option<TransactionId>,
974 write_tracker: Option<SharedWriteTracker>,
976}
977
978impl AddLabelOperator {
979 pub fn new(
981 store: Arc<dyn GraphStoreMut>,
982 input: Box<dyn Operator>,
983 node_column: usize,
984 labels: Vec<String>,
985 output_schema: Vec<LogicalType>,
986 ) -> Self {
987 let count_column = output_schema.len() - 1;
988 Self {
989 store,
990 input,
991 node_column,
992 labels,
993 count_column,
994 output_schema,
995 viewing_epoch: None,
996 transaction_id: None,
997 write_tracker: None,
998 }
999 }
1000
1001 pub fn with_transaction_context(
1003 mut self,
1004 epoch: EpochId,
1005 transaction_id: Option<TransactionId>,
1006 ) -> Self {
1007 self.viewing_epoch = Some(epoch);
1008 self.transaction_id = transaction_id;
1009 self
1010 }
1011
1012 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1014 self.write_tracker = Some(tracker);
1015 self
1016 }
1017}
1018
1019impl Operator for AddLabelOperator {
1020 fn next(&mut self) -> OperatorResult {
1021 if let Some(chunk) = self.input.next()? {
1022 let mut builder =
1023 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1024
1025 for row in chunk.selected_indices() {
1026 let node_val = chunk
1027 .column(self.node_column)
1028 .and_then(|c| c.get_value(row))
1029 .ok_or_else(|| {
1030 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1031 })?;
1032
1033 let node_id = match node_val {
1034 Value::Int64(id) => NodeId(id as u64),
1035 _ => {
1036 return Err(OperatorError::TypeMismatch {
1037 expected: "Int64 (node ID)".to_string(),
1038 found: format!("{node_val:?}"),
1039 });
1040 }
1041 };
1042
1043 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1045 tracker.record_node_write(tid, node_id)?;
1046 }
1047
1048 let mut row_count: i64 = 0;
1050 for label in &self.labels {
1051 let added = if let Some(tid) = self.transaction_id {
1052 self.store.add_label_versioned(node_id, label, tid)
1053 } else {
1054 self.store.add_label(node_id, label)
1055 };
1056 if added {
1057 row_count += 1;
1058 }
1059 }
1060
1061 for col_idx in 0..chunk.column_count() {
1063 if let (Some(src), Some(dst)) =
1064 (chunk.column(col_idx), builder.column_mut(col_idx))
1065 {
1066 if let Some(val) = src.get_value(row) {
1067 dst.push_value(val);
1068 } else {
1069 dst.push_value(Value::Null);
1070 }
1071 }
1072 }
1073 if let Some(dst) = builder.column_mut(self.count_column) {
1075 dst.push_value(Value::Int64(row_count));
1076 }
1077
1078 builder.advance_row();
1079 }
1080
1081 return Ok(Some(builder.finish()));
1082 }
1083 Ok(None)
1084 }
1085
1086 fn reset(&mut self) {
1087 self.input.reset();
1088 }
1089
1090 fn name(&self) -> &'static str {
1091 "AddLabel"
1092 }
1093}
1094
1095pub struct RemoveLabelOperator {
1097 store: Arc<dyn GraphStoreMut>,
1099 input: Box<dyn Operator>,
1101 node_column: usize,
1103 labels: Vec<String>,
1105 output_schema: Vec<LogicalType>,
1107 count_column: usize,
1109 viewing_epoch: Option<EpochId>,
1111 transaction_id: Option<TransactionId>,
1113 write_tracker: Option<SharedWriteTracker>,
1115}
1116
1117impl RemoveLabelOperator {
1118 pub fn new(
1120 store: Arc<dyn GraphStoreMut>,
1121 input: Box<dyn Operator>,
1122 node_column: usize,
1123 labels: Vec<String>,
1124 output_schema: Vec<LogicalType>,
1125 ) -> Self {
1126 let count_column = output_schema.len() - 1;
1127 Self {
1128 store,
1129 input,
1130 node_column,
1131 labels,
1132 count_column,
1133 output_schema,
1134 viewing_epoch: None,
1135 transaction_id: None,
1136 write_tracker: None,
1137 }
1138 }
1139
1140 pub fn with_transaction_context(
1142 mut self,
1143 epoch: EpochId,
1144 transaction_id: Option<TransactionId>,
1145 ) -> Self {
1146 self.viewing_epoch = Some(epoch);
1147 self.transaction_id = transaction_id;
1148 self
1149 }
1150
1151 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1153 self.write_tracker = Some(tracker);
1154 self
1155 }
1156}
1157
1158impl Operator for RemoveLabelOperator {
1159 fn next(&mut self) -> OperatorResult {
1160 if let Some(chunk) = self.input.next()? {
1161 let mut builder =
1162 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1163
1164 for row in chunk.selected_indices() {
1165 let node_val = chunk
1166 .column(self.node_column)
1167 .and_then(|c| c.get_value(row))
1168 .ok_or_else(|| {
1169 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1170 })?;
1171
1172 let node_id = match node_val {
1173 Value::Int64(id) => NodeId(id as u64),
1174 _ => {
1175 return Err(OperatorError::TypeMismatch {
1176 expected: "Int64 (node ID)".to_string(),
1177 found: format!("{node_val:?}"),
1178 });
1179 }
1180 };
1181
1182 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1184 tracker.record_node_write(tid, node_id)?;
1185 }
1186
1187 let mut row_count: i64 = 0;
1189 for label in &self.labels {
1190 let removed = if let Some(tid) = self.transaction_id {
1191 self.store.remove_label_versioned(node_id, label, tid)
1192 } else {
1193 self.store.remove_label(node_id, label)
1194 };
1195 if removed {
1196 row_count += 1;
1197 }
1198 }
1199
1200 for col_idx in 0..chunk.column_count() {
1202 if let (Some(src), Some(dst)) =
1203 (chunk.column(col_idx), builder.column_mut(col_idx))
1204 {
1205 if let Some(val) = src.get_value(row) {
1206 dst.push_value(val);
1207 } else {
1208 dst.push_value(Value::Null);
1209 }
1210 }
1211 }
1212 if let Some(dst) = builder.column_mut(self.count_column) {
1214 dst.push_value(Value::Int64(row_count));
1215 }
1216
1217 builder.advance_row();
1218 }
1219
1220 return Ok(Some(builder.finish()));
1221 }
1222 Ok(None)
1223 }
1224
1225 fn reset(&mut self) {
1226 self.input.reset();
1227 }
1228
1229 fn name(&self) -> &'static str {
1230 "RemoveLabel"
1231 }
1232}
1233
1234pub struct SetPropertyOperator {
1239 store: Arc<dyn GraphStoreMut>,
1241 input: Box<dyn Operator>,
1243 entity_column: usize,
1245 is_edge: bool,
1247 properties: Vec<(String, PropertySource)>,
1249 output_schema: Vec<LogicalType>,
1251 replace: bool,
1253 validator: Option<Arc<dyn ConstraintValidator>>,
1255 labels: Vec<String>,
1257 edge_type_name: Option<String>,
1259 viewing_epoch: Option<EpochId>,
1261 transaction_id: Option<TransactionId>,
1263 write_tracker: Option<SharedWriteTracker>,
1265}
1266
1267impl SetPropertyOperator {
1268 pub fn new_for_node(
1270 store: Arc<dyn GraphStoreMut>,
1271 input: Box<dyn Operator>,
1272 node_column: usize,
1273 properties: Vec<(String, PropertySource)>,
1274 output_schema: Vec<LogicalType>,
1275 ) -> Self {
1276 Self {
1277 store,
1278 input,
1279 entity_column: node_column,
1280 is_edge: false,
1281 properties,
1282 output_schema,
1283 replace: false,
1284 validator: None,
1285 labels: Vec::new(),
1286 edge_type_name: None,
1287 viewing_epoch: None,
1288 transaction_id: None,
1289 write_tracker: None,
1290 }
1291 }
1292
1293 pub fn new_for_edge(
1295 store: Arc<dyn GraphStoreMut>,
1296 input: Box<dyn Operator>,
1297 edge_column: usize,
1298 properties: Vec<(String, PropertySource)>,
1299 output_schema: Vec<LogicalType>,
1300 ) -> Self {
1301 Self {
1302 store,
1303 input,
1304 entity_column: edge_column,
1305 is_edge: true,
1306 properties,
1307 output_schema,
1308 replace: false,
1309 validator: None,
1310 labels: Vec::new(),
1311 edge_type_name: None,
1312 viewing_epoch: None,
1313 transaction_id: None,
1314 write_tracker: None,
1315 }
1316 }
1317
1318 pub fn with_replace(mut self, replace: bool) -> Self {
1320 self.replace = replace;
1321 self
1322 }
1323
1324 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1326 self.validator = Some(validator);
1327 self
1328 }
1329
1330 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1332 self.labels = labels;
1333 self
1334 }
1335
1336 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1338 self.edge_type_name = Some(edge_type);
1339 self
1340 }
1341
1342 pub fn with_transaction_context(
1347 mut self,
1348 epoch: EpochId,
1349 transaction_id: Option<TransactionId>,
1350 ) -> Self {
1351 self.viewing_epoch = Some(epoch);
1352 self.transaction_id = transaction_id;
1353 self
1354 }
1355
1356 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1358 self.write_tracker = Some(tracker);
1359 self
1360 }
1361}
1362
1363impl Operator for SetPropertyOperator {
1364 fn next(&mut self) -> OperatorResult {
1365 if let Some(chunk) = self.input.next()? {
1366 let mut builder =
1367 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1368
1369 for row in chunk.selected_indices() {
1370 let entity_val = chunk
1371 .column(self.entity_column)
1372 .and_then(|c| c.get_value(row))
1373 .ok_or_else(|| {
1374 OperatorError::ColumnNotFound(format!(
1375 "entity column {}",
1376 self.entity_column
1377 ))
1378 })?;
1379
1380 let entity_id = match entity_val {
1381 Value::Int64(id) => id as u64,
1382 _ => {
1383 return Err(OperatorError::TypeMismatch {
1384 expected: "Int64 (entity ID)".to_string(),
1385 found: format!("{entity_val:?}"),
1386 });
1387 }
1388 };
1389
1390 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1392 if self.is_edge {
1393 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1394 } else {
1395 tracker.record_node_write(tid, NodeId(entity_id))?;
1396 }
1397 }
1398
1399 let resolved_props: Vec<(String, Value)> = self
1401 .properties
1402 .iter()
1403 .map(|(name, source)| {
1404 let value =
1405 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1406 (name.clone(), value)
1407 })
1408 .collect();
1409
1410 if let Some(ref validator) = self.validator {
1412 if self.is_edge {
1413 if let Some(ref et) = self.edge_type_name {
1414 for (name, value) in &resolved_props {
1415 validator.validate_edge_property(et, name, value)?;
1416 }
1417 }
1418 } else {
1419 for (name, value) in &resolved_props {
1420 validator.validate_node_property(&self.labels, name, value)?;
1421 validator.check_unique_node_property(&self.labels, name, value)?;
1422 }
1423 }
1424 }
1425
1426 let tx_id = self.transaction_id;
1428 for (prop_name, value) in resolved_props {
1429 if prop_name == "*" {
1430 if let Value::Map(map) = value {
1432 if self.replace {
1433 if self.is_edge {
1435 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1436 let keys: Vec<String> = edge
1437 .properties
1438 .iter()
1439 .map(|(k, _)| k.as_str().to_string())
1440 .collect();
1441 for key in keys {
1442 if let Some(tid) = tx_id {
1443 self.store.remove_edge_property_versioned(
1444 EdgeId(entity_id),
1445 &key,
1446 tid,
1447 );
1448 } else {
1449 self.store
1450 .remove_edge_property(EdgeId(entity_id), &key);
1451 }
1452 }
1453 }
1454 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1455 let keys: Vec<String> = node
1456 .properties
1457 .iter()
1458 .map(|(k, _)| k.as_str().to_string())
1459 .collect();
1460 for key in keys {
1461 if let Some(tid) = tx_id {
1462 self.store.remove_node_property_versioned(
1463 NodeId(entity_id),
1464 &key,
1465 tid,
1466 );
1467 } else {
1468 self.store
1469 .remove_node_property(NodeId(entity_id), &key);
1470 }
1471 }
1472 }
1473 }
1474 for (key, val) in map.iter() {
1476 if val.is_null() {
1477 if self.is_edge {
1479 if let Some(tid) = tx_id {
1480 self.store.remove_edge_property_versioned(
1481 EdgeId(entity_id),
1482 key.as_str(),
1483 tid,
1484 );
1485 } else {
1486 self.store.remove_edge_property(
1487 EdgeId(entity_id),
1488 key.as_str(),
1489 );
1490 }
1491 } else if let Some(tid) = tx_id {
1492 self.store.remove_node_property_versioned(
1493 NodeId(entity_id),
1494 key.as_str(),
1495 tid,
1496 );
1497 } else {
1498 self.store
1499 .remove_node_property(NodeId(entity_id), key.as_str());
1500 }
1501 } else if self.is_edge {
1502 if let Some(tid) = tx_id {
1503 self.store.set_edge_property_versioned(
1504 EdgeId(entity_id),
1505 key.as_str(),
1506 val.clone(),
1507 tid,
1508 );
1509 } else {
1510 self.store.set_edge_property(
1511 EdgeId(entity_id),
1512 key.as_str(),
1513 val.clone(),
1514 );
1515 }
1516 } else if let Some(tid) = tx_id {
1517 self.store.set_node_property_versioned(
1518 NodeId(entity_id),
1519 key.as_str(),
1520 val.clone(),
1521 tid,
1522 );
1523 } else {
1524 self.store.set_node_property(
1525 NodeId(entity_id),
1526 key.as_str(),
1527 val.clone(),
1528 );
1529 }
1530 }
1531 }
1532 } else if self.is_edge {
1533 if let Some(tid) = tx_id {
1534 self.store.set_edge_property_versioned(
1535 EdgeId(entity_id),
1536 &prop_name,
1537 value,
1538 tid,
1539 );
1540 } else {
1541 self.store
1542 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1543 }
1544 } else if let Some(tid) = tx_id {
1545 self.store.set_node_property_versioned(
1546 NodeId(entity_id),
1547 &prop_name,
1548 value,
1549 tid,
1550 );
1551 } else {
1552 self.store
1553 .set_node_property(NodeId(entity_id), &prop_name, value);
1554 }
1555 }
1556
1557 for col_idx in 0..chunk.column_count() {
1559 if let (Some(src), Some(dst)) =
1560 (chunk.column(col_idx), builder.column_mut(col_idx))
1561 {
1562 if let Some(val) = src.get_value(row) {
1563 dst.push_value(val);
1564 } else {
1565 dst.push_value(Value::Null);
1566 }
1567 }
1568 }
1569
1570 builder.advance_row();
1571 }
1572
1573 return Ok(Some(builder.finish()));
1574 }
1575 Ok(None)
1576 }
1577
1578 fn reset(&mut self) {
1579 self.input.reset();
1580 }
1581
1582 fn name(&self) -> &'static str {
1583 "SetProperty"
1584 }
1585}
1586
1587#[cfg(all(test, feature = "lpg"))]
1588mod tests {
1589 use super::*;
1590 use crate::execution::DataChunk;
1591 use crate::execution::chunk::DataChunkBuilder;
1592 use crate::graph::lpg::LpgStore;
1593
1594 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1597 Arc::new(LpgStore::new().unwrap())
1598 }
1599
1600 struct MockInput {
1601 chunk: Option<DataChunk>,
1602 }
1603
1604 impl MockInput {
1605 fn boxed(chunk: DataChunk) -> Box<Self> {
1606 Box::new(Self { chunk: Some(chunk) })
1607 }
1608 }
1609
1610 impl Operator for MockInput {
1611 fn next(&mut self) -> OperatorResult {
1612 Ok(self.chunk.take())
1613 }
1614 fn reset(&mut self) {}
1615 fn name(&self) -> &'static str {
1616 "MockInput"
1617 }
1618 }
1619
1620 struct EmptyInput;
1621 impl Operator for EmptyInput {
1622 fn next(&mut self) -> OperatorResult {
1623 Ok(None)
1624 }
1625 fn reset(&mut self) {}
1626 fn name(&self) -> &'static str {
1627 "EmptyInput"
1628 }
1629 }
1630
1631 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1632 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1633 for id in ids {
1634 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1635 builder.advance_row();
1636 }
1637 builder.finish()
1638 }
1639
1640 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1641 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1642 for id in ids {
1643 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1644 builder.advance_row();
1645 }
1646 builder.finish()
1647 }
1648
1649 #[test]
1652 fn test_create_node_standalone() {
1653 let store = create_test_store();
1654
1655 let mut op = CreateNodeOperator::new(
1656 Arc::clone(&store),
1657 None,
1658 vec!["Person".to_string()],
1659 vec![(
1660 "name".to_string(),
1661 PropertySource::Constant(Value::String("Alix".into())),
1662 )],
1663 vec![LogicalType::Int64],
1664 0,
1665 );
1666
1667 let chunk = op.next().unwrap().unwrap();
1668 assert_eq!(chunk.row_count(), 1);
1669
1670 assert!(op.next().unwrap().is_none());
1672
1673 assert_eq!(store.node_count(), 1);
1674 }
1675
1676 #[test]
1677 fn test_create_edge() {
1678 let store = create_test_store();
1679
1680 let node1 = store.create_node(&["Person"]);
1681 let node2 = store.create_node(&["Person"]);
1682
1683 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1684 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1685 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1686 builder.advance_row();
1687
1688 let mut op = CreateEdgeOperator::new(
1689 Arc::clone(&store),
1690 MockInput::boxed(builder.finish()),
1691 0,
1692 1,
1693 "KNOWS".to_string(),
1694 vec![LogicalType::Int64, LogicalType::Int64],
1695 );
1696
1697 let _chunk = op.next().unwrap().unwrap();
1698 assert_eq!(store.edge_count(), 1);
1699 }
1700
1701 #[test]
1702 fn test_delete_node() {
1703 let store = create_test_store();
1704
1705 let node_id = store.create_node(&["Person"]);
1706 assert_eq!(store.node_count(), 1);
1707
1708 let mut op = DeleteNodeOperator::new(
1709 Arc::clone(&store),
1710 MockInput::boxed(node_id_chunk(&[node_id])),
1711 0,
1712 vec![LogicalType::Node],
1713 false,
1714 );
1715
1716 let chunk = op.next().unwrap().unwrap();
1717 assert_eq!(chunk.row_count(), 1);
1719 assert_eq!(store.node_count(), 0);
1720 }
1721
1722 #[test]
1725 fn test_delete_edge() {
1726 let store = create_test_store();
1727
1728 let n1 = store.create_node(&["Person"]);
1729 let n2 = store.create_node(&["Person"]);
1730 let eid = store.create_edge(n1, n2, "KNOWS");
1731 assert_eq!(store.edge_count(), 1);
1732
1733 let mut op = DeleteEdgeOperator::new(
1734 Arc::clone(&store),
1735 MockInput::boxed(edge_id_chunk(&[eid])),
1736 0,
1737 vec![LogicalType::Node],
1738 );
1739
1740 let chunk = op.next().unwrap().unwrap();
1741 assert_eq!(chunk.row_count(), 1);
1742 assert_eq!(store.edge_count(), 0);
1743 }
1744
1745 #[test]
1746 fn test_delete_edge_no_input_returns_none() {
1747 let store = create_test_store();
1748
1749 let mut op = DeleteEdgeOperator::new(
1750 Arc::clone(&store),
1751 Box::new(EmptyInput),
1752 0,
1753 vec![LogicalType::Int64],
1754 );
1755
1756 assert!(op.next().unwrap().is_none());
1757 }
1758
1759 #[test]
1760 fn test_delete_multiple_edges() {
1761 let store = create_test_store();
1762
1763 let n1 = store.create_node(&["N"]);
1764 let n2 = store.create_node(&["N"]);
1765 let e1 = store.create_edge(n1, n2, "R");
1766 let e2 = store.create_edge(n2, n1, "S");
1767 assert_eq!(store.edge_count(), 2);
1768
1769 let mut op = DeleteEdgeOperator::new(
1770 Arc::clone(&store),
1771 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1772 0,
1773 vec![LogicalType::Node],
1774 );
1775
1776 let chunk = op.next().unwrap().unwrap();
1777 assert_eq!(chunk.row_count(), 2);
1778 assert_eq!(store.edge_count(), 0);
1779 }
1780
1781 #[test]
1784 fn test_delete_node_detach() {
1785 let store = create_test_store();
1786
1787 let n1 = store.create_node(&["Person"]);
1788 let n2 = store.create_node(&["Person"]);
1789 store.create_edge(n1, n2, "KNOWS");
1790 store.create_edge(n2, n1, "FOLLOWS");
1791 assert_eq!(store.edge_count(), 2);
1792
1793 let mut op = DeleteNodeOperator::new(
1794 Arc::clone(&store),
1795 MockInput::boxed(node_id_chunk(&[n1])),
1796 0,
1797 vec![LogicalType::Node],
1798 true, );
1800
1801 let chunk = op.next().unwrap().unwrap();
1802 assert_eq!(chunk.row_count(), 1);
1803 assert_eq!(store.node_count(), 1);
1804 assert_eq!(store.edge_count(), 0); }
1806
1807 #[test]
1810 fn test_add_label() {
1811 let store = create_test_store();
1812
1813 let node = store.create_node(&["Person"]);
1814
1815 let mut op = AddLabelOperator::new(
1816 Arc::clone(&store),
1817 MockInput::boxed(node_id_chunk(&[node])),
1818 0,
1819 vec!["Employee".to_string()],
1820 vec![LogicalType::Int64, LogicalType::Int64],
1821 );
1822
1823 let chunk = op.next().unwrap().unwrap();
1824 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1825 assert_eq!(updated, 1);
1826
1827 let node_data = store.get_node(node).unwrap();
1829 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1830 assert!(labels.contains(&"Person"));
1831 assert!(labels.contains(&"Employee"));
1832 }
1833
1834 #[test]
1835 fn test_add_multiple_labels() {
1836 let store = create_test_store();
1837
1838 let node = store.create_node(&["Base"]);
1839
1840 let mut op = AddLabelOperator::new(
1841 Arc::clone(&store),
1842 MockInput::boxed(node_id_chunk(&[node])),
1843 0,
1844 vec!["LabelA".to_string(), "LabelB".to_string()],
1845 vec![LogicalType::Int64, LogicalType::Int64],
1846 );
1847
1848 let chunk = op.next().unwrap().unwrap();
1849 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1850 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1853 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1854 assert!(labels.contains(&"LabelA"));
1855 assert!(labels.contains(&"LabelB"));
1856 }
1857
1858 #[test]
1859 fn test_add_label_no_input_returns_none() {
1860 let store = create_test_store();
1861
1862 let mut op = AddLabelOperator::new(
1863 Arc::clone(&store),
1864 Box::new(EmptyInput),
1865 0,
1866 vec!["Foo".to_string()],
1867 vec![LogicalType::Int64, LogicalType::Int64],
1868 );
1869
1870 assert!(op.next().unwrap().is_none());
1871 }
1872
1873 #[test]
1876 fn test_remove_label() {
1877 let store = create_test_store();
1878
1879 let node = store.create_node(&["Person", "Employee"]);
1880
1881 let mut op = RemoveLabelOperator::new(
1882 Arc::clone(&store),
1883 MockInput::boxed(node_id_chunk(&[node])),
1884 0,
1885 vec!["Employee".to_string()],
1886 vec![LogicalType::Int64, LogicalType::Int64],
1887 );
1888
1889 let chunk = op.next().unwrap().unwrap();
1890 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1891 assert_eq!(updated, 1);
1892
1893 let node_data = store.get_node(node).unwrap();
1895 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1896 assert!(labels.contains(&"Person"));
1897 assert!(!labels.contains(&"Employee"));
1898 }
1899
1900 #[test]
1901 fn test_remove_nonexistent_label() {
1902 let store = create_test_store();
1903
1904 let node = store.create_node(&["Person"]);
1905
1906 let mut op = RemoveLabelOperator::new(
1907 Arc::clone(&store),
1908 MockInput::boxed(node_id_chunk(&[node])),
1909 0,
1910 vec!["NonExistent".to_string()],
1911 vec![LogicalType::Int64, LogicalType::Int64],
1912 );
1913
1914 let chunk = op.next().unwrap().unwrap();
1915 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1916 assert_eq!(updated, 0); }
1918
1919 #[test]
1922 fn test_set_node_property_constant() {
1923 let store = create_test_store();
1924
1925 let node = store.create_node(&["Person"]);
1926
1927 let mut op = SetPropertyOperator::new_for_node(
1928 Arc::clone(&store),
1929 MockInput::boxed(node_id_chunk(&[node])),
1930 0,
1931 vec![(
1932 "name".to_string(),
1933 PropertySource::Constant(Value::String("Alix".into())),
1934 )],
1935 vec![LogicalType::Int64],
1936 );
1937
1938 let chunk = op.next().unwrap().unwrap();
1939 assert_eq!(chunk.row_count(), 1);
1940
1941 let node_data = store.get_node(node).unwrap();
1943 assert_eq!(
1944 node_data
1945 .properties
1946 .get(&grafeo_common::types::PropertyKey::new("name")),
1947 Some(&Value::String("Alix".into()))
1948 );
1949 }
1950
1951 #[test]
1952 fn test_set_node_property_from_column() {
1953 let store = create_test_store();
1954
1955 let node = store.create_node(&["Person"]);
1956
1957 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1959 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1960 builder
1961 .column_mut(1)
1962 .unwrap()
1963 .push_value(Value::String("Gus".into()));
1964 builder.advance_row();
1965
1966 let mut op = SetPropertyOperator::new_for_node(
1967 Arc::clone(&store),
1968 MockInput::boxed(builder.finish()),
1969 0,
1970 vec![("name".to_string(), PropertySource::Column(1))],
1971 vec![LogicalType::Int64, LogicalType::String],
1972 );
1973
1974 let chunk = op.next().unwrap().unwrap();
1975 assert_eq!(chunk.row_count(), 1);
1976
1977 let node_data = store.get_node(node).unwrap();
1978 assert_eq!(
1979 node_data
1980 .properties
1981 .get(&grafeo_common::types::PropertyKey::new("name")),
1982 Some(&Value::String("Gus".into()))
1983 );
1984 }
1985
1986 #[test]
1987 fn test_set_edge_property() {
1988 let store = create_test_store();
1989
1990 let n1 = store.create_node(&["N"]);
1991 let n2 = store.create_node(&["N"]);
1992 let eid = store.create_edge(n1, n2, "KNOWS");
1993
1994 let mut op = SetPropertyOperator::new_for_edge(
1995 Arc::clone(&store),
1996 MockInput::boxed(edge_id_chunk(&[eid])),
1997 0,
1998 vec![(
1999 "weight".to_string(),
2000 PropertySource::Constant(Value::Float64(0.75)),
2001 )],
2002 vec![LogicalType::Int64],
2003 );
2004
2005 let chunk = op.next().unwrap().unwrap();
2006 assert_eq!(chunk.row_count(), 1);
2007
2008 let edge_data = store.get_edge(eid).unwrap();
2009 assert_eq!(
2010 edge_data
2011 .properties
2012 .get(&grafeo_common::types::PropertyKey::new("weight")),
2013 Some(&Value::Float64(0.75))
2014 );
2015 }
2016
2017 #[test]
2018 fn test_set_multiple_properties() {
2019 let store = create_test_store();
2020
2021 let node = store.create_node(&["Person"]);
2022
2023 let mut op = SetPropertyOperator::new_for_node(
2024 Arc::clone(&store),
2025 MockInput::boxed(node_id_chunk(&[node])),
2026 0,
2027 vec![
2028 (
2029 "name".to_string(),
2030 PropertySource::Constant(Value::String("Alix".into())),
2031 ),
2032 (
2033 "age".to_string(),
2034 PropertySource::Constant(Value::Int64(30)),
2035 ),
2036 ],
2037 vec![LogicalType::Int64],
2038 );
2039
2040 op.next().unwrap().unwrap();
2041
2042 let node_data = store.get_node(node).unwrap();
2043 assert_eq!(
2044 node_data
2045 .properties
2046 .get(&grafeo_common::types::PropertyKey::new("name")),
2047 Some(&Value::String("Alix".into()))
2048 );
2049 assert_eq!(
2050 node_data
2051 .properties
2052 .get(&grafeo_common::types::PropertyKey::new("age")),
2053 Some(&Value::Int64(30))
2054 );
2055 }
2056
2057 #[test]
2058 fn test_set_property_no_input_returns_none() {
2059 let store = create_test_store();
2060
2061 let mut op = SetPropertyOperator::new_for_node(
2062 Arc::clone(&store),
2063 Box::new(EmptyInput),
2064 0,
2065 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2066 vec![LogicalType::Int64],
2067 );
2068
2069 assert!(op.next().unwrap().is_none());
2070 }
2071
2072 #[test]
2075 fn test_delete_node_without_detach_errors_when_edges_exist() {
2076 let store = create_test_store();
2077
2078 let n1 = store.create_node(&["Person"]);
2079 let n2 = store.create_node(&["Person"]);
2080 store.create_edge(n1, n2, "KNOWS");
2081
2082 let mut op = DeleteNodeOperator::new(
2083 Arc::clone(&store),
2084 MockInput::boxed(node_id_chunk(&[n1])),
2085 0,
2086 vec![LogicalType::Int64],
2087 false, );
2089
2090 let err = op.next().unwrap_err();
2091 match err {
2092 OperatorError::ConstraintViolation(msg) => {
2093 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2094 }
2095 other => panic!("expected ConstraintViolation, got {other:?}"),
2096 }
2097 assert_eq!(store.node_count(), 2);
2099 }
2100
2101 #[test]
2104 fn test_create_node_with_input_operator() {
2105 let store = create_test_store();
2106
2107 let existing = store.create_node(&["Seed"]);
2109
2110 let mut op = CreateNodeOperator::new(
2111 Arc::clone(&store),
2112 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2113 vec!["Created".to_string()],
2114 vec![(
2115 "source".to_string(),
2116 PropertySource::Constant(Value::String("from_input".into())),
2117 )],
2118 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2121
2122 let chunk = op.next().unwrap().unwrap();
2123 assert_eq!(chunk.row_count(), 1);
2124
2125 assert_eq!(store.node_count(), 2);
2127
2128 assert!(op.next().unwrap().is_none());
2130 }
2131
2132 #[test]
2135 fn test_create_edge_with_properties_and_output_column() {
2136 let store = create_test_store();
2137
2138 let n1 = store.create_node(&["Person"]);
2139 let n2 = store.create_node(&["Person"]);
2140
2141 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2142 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2143 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2144 builder.advance_row();
2145
2146 let mut op = CreateEdgeOperator::new(
2147 Arc::clone(&store),
2148 MockInput::boxed(builder.finish()),
2149 0,
2150 1,
2151 "KNOWS".to_string(),
2152 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2153 )
2154 .with_properties(vec![(
2155 "since".to_string(),
2156 PropertySource::Constant(Value::Int64(2024)),
2157 )])
2158 .with_output_column(2);
2159
2160 let chunk = op.next().unwrap().unwrap();
2161 assert_eq!(chunk.row_count(), 1);
2162 assert_eq!(store.edge_count(), 1);
2163
2164 let edge_id_raw = chunk
2166 .column(2)
2167 .and_then(|c| c.get_int64(0))
2168 .expect("edge ID should be in output column 2");
2169 let edge_id = EdgeId(edge_id_raw as u64);
2170
2171 let edge = store.get_edge(edge_id).expect("edge should exist");
2173 assert_eq!(
2174 edge.properties
2175 .get(&grafeo_common::types::PropertyKey::new("since")),
2176 Some(&Value::Int64(2024))
2177 );
2178 }
2179
2180 #[test]
2183 fn test_set_property_map_replace() {
2184 use std::collections::BTreeMap;
2185
2186 let store = create_test_store();
2187
2188 let node = store.create_node(&["Person"]);
2189 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2190
2191 let mut map = BTreeMap::new();
2192 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2193
2194 let mut op = SetPropertyOperator::new_for_node(
2195 Arc::clone(&store),
2196 MockInput::boxed(node_id_chunk(&[node])),
2197 0,
2198 vec![(
2199 "*".to_string(),
2200 PropertySource::Constant(Value::Map(Arc::new(map))),
2201 )],
2202 vec![LogicalType::Int64],
2203 )
2204 .with_replace(true);
2205
2206 op.next().unwrap().unwrap();
2207
2208 let node_data = store.get_node(node).unwrap();
2209 assert!(
2211 node_data
2212 .properties
2213 .get(&PropertyKey::new("old_prop"))
2214 .is_none()
2215 );
2216 assert_eq!(
2218 node_data.properties.get(&PropertyKey::new("new_key")),
2219 Some(&Value::String("new_val".into()))
2220 );
2221 }
2222
2223 #[test]
2226 fn test_set_property_map_merge() {
2227 use std::collections::BTreeMap;
2228
2229 let store = create_test_store();
2230
2231 let node = store.create_node(&["Person"]);
2232 store.set_node_property(node, "existing", Value::Int64(42));
2233
2234 let mut map = BTreeMap::new();
2235 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2236
2237 let mut op = SetPropertyOperator::new_for_node(
2238 Arc::clone(&store),
2239 MockInput::boxed(node_id_chunk(&[node])),
2240 0,
2241 vec![(
2242 "*".to_string(),
2243 PropertySource::Constant(Value::Map(Arc::new(map))),
2244 )],
2245 vec![LogicalType::Int64],
2246 ); op.next().unwrap().unwrap();
2249
2250 let node_data = store.get_node(node).unwrap();
2251 assert_eq!(
2253 node_data.properties.get(&PropertyKey::new("existing")),
2254 Some(&Value::Int64(42))
2255 );
2256 assert_eq!(
2258 node_data.properties.get(&PropertyKey::new("added")),
2259 Some(&Value::String("hello".into()))
2260 );
2261 }
2262
2263 #[test]
2266 fn test_property_source_property_access() {
2267 let store = create_test_store();
2268
2269 let source_node = store.create_node(&["Source"]);
2270 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2271
2272 let target_node = store.create_node(&["Target"]);
2273
2274 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2276 builder.column_mut(0).unwrap().push_node_id(source_node);
2277 builder
2278 .column_mut(1)
2279 .unwrap()
2280 .push_int64(target_node.0 as i64);
2281 builder.advance_row();
2282
2283 let mut op = SetPropertyOperator::new_for_node(
2284 Arc::clone(&store),
2285 MockInput::boxed(builder.finish()),
2286 1, vec![(
2288 "copied_name".to_string(),
2289 PropertySource::PropertyAccess {
2290 column: 0,
2291 property: "name".to_string(),
2292 },
2293 )],
2294 vec![LogicalType::Node, LogicalType::Int64],
2295 );
2296
2297 op.next().unwrap().unwrap();
2298
2299 let target_data = store.get_node(target_node).unwrap();
2300 assert_eq!(
2301 target_data.properties.get(&PropertyKey::new("copied_name")),
2302 Some(&Value::String("Alix".into()))
2303 );
2304 }
2305
2306 #[test]
2309 fn test_create_node_with_constraint_validator() {
2310 let store = create_test_store();
2311
2312 struct RejectAgeValidator;
2313 impl ConstraintValidator for RejectAgeValidator {
2314 fn validate_node_property(
2315 &self,
2316 _labels: &[String],
2317 key: &str,
2318 _value: &Value,
2319 ) -> Result<(), OperatorError> {
2320 if key == "forbidden" {
2321 return Err(OperatorError::ConstraintViolation(
2322 "property 'forbidden' is not allowed".to_string(),
2323 ));
2324 }
2325 Ok(())
2326 }
2327 fn validate_node_complete(
2328 &self,
2329 _labels: &[String],
2330 _properties: &[(String, Value)],
2331 ) -> Result<(), OperatorError> {
2332 Ok(())
2333 }
2334 fn check_unique_node_property(
2335 &self,
2336 _labels: &[String],
2337 _key: &str,
2338 _value: &Value,
2339 ) -> Result<(), OperatorError> {
2340 Ok(())
2341 }
2342 fn validate_edge_property(
2343 &self,
2344 _edge_type: &str,
2345 _key: &str,
2346 _value: &Value,
2347 ) -> Result<(), OperatorError> {
2348 Ok(())
2349 }
2350 fn validate_edge_complete(
2351 &self,
2352 _edge_type: &str,
2353 _properties: &[(String, Value)],
2354 ) -> Result<(), OperatorError> {
2355 Ok(())
2356 }
2357 }
2358
2359 let mut op = CreateNodeOperator::new(
2361 Arc::clone(&store),
2362 None,
2363 vec!["Thing".to_string()],
2364 vec![(
2365 "name".to_string(),
2366 PropertySource::Constant(Value::String("ok".into())),
2367 )],
2368 vec![LogicalType::Int64],
2369 0,
2370 )
2371 .with_validator(Arc::new(RejectAgeValidator));
2372
2373 assert!(op.next().is_ok());
2374 assert_eq!(store.node_count(), 1);
2375
2376 let mut op = CreateNodeOperator::new(
2378 Arc::clone(&store),
2379 None,
2380 vec!["Thing".to_string()],
2381 vec![(
2382 "forbidden".to_string(),
2383 PropertySource::Constant(Value::Int64(1)),
2384 )],
2385 vec![LogicalType::Int64],
2386 0,
2387 )
2388 .with_validator(Arc::new(RejectAgeValidator));
2389
2390 let err = op.next().unwrap_err();
2391 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2392 }
2395
2396 #[test]
2399 fn test_create_node_reset_allows_re_execution() {
2400 let store = create_test_store();
2401
2402 let mut op = CreateNodeOperator::new(
2403 Arc::clone(&store),
2404 None,
2405 vec!["Person".to_string()],
2406 vec![],
2407 vec![LogicalType::Int64],
2408 0,
2409 );
2410
2411 assert!(op.next().unwrap().is_some());
2413 assert!(op.next().unwrap().is_none());
2414
2415 op.reset();
2417 assert!(op.next().unwrap().is_some());
2418
2419 assert_eq!(store.node_count(), 2);
2420 }
2421
2422 #[test]
2425 fn test_operator_names() {
2426 let store = create_test_store();
2427
2428 let op = CreateNodeOperator::new(
2429 Arc::clone(&store),
2430 None,
2431 vec![],
2432 vec![],
2433 vec![LogicalType::Int64],
2434 0,
2435 );
2436 assert_eq!(op.name(), "CreateNode");
2437
2438 let op = CreateEdgeOperator::new(
2439 Arc::clone(&store),
2440 Box::new(EmptyInput),
2441 0,
2442 1,
2443 "R".to_string(),
2444 vec![LogicalType::Int64],
2445 );
2446 assert_eq!(op.name(), "CreateEdge");
2447
2448 let op = DeleteNodeOperator::new(
2449 Arc::clone(&store),
2450 Box::new(EmptyInput),
2451 0,
2452 vec![LogicalType::Int64],
2453 false,
2454 );
2455 assert_eq!(op.name(), "DeleteNode");
2456
2457 let op = DeleteEdgeOperator::new(
2458 Arc::clone(&store),
2459 Box::new(EmptyInput),
2460 0,
2461 vec![LogicalType::Int64],
2462 );
2463 assert_eq!(op.name(), "DeleteEdge");
2464
2465 let op = AddLabelOperator::new(
2466 Arc::clone(&store),
2467 Box::new(EmptyInput),
2468 0,
2469 vec!["L".to_string()],
2470 vec![LogicalType::Int64],
2471 );
2472 assert_eq!(op.name(), "AddLabel");
2473
2474 let op = RemoveLabelOperator::new(
2475 Arc::clone(&store),
2476 Box::new(EmptyInput),
2477 0,
2478 vec!["L".to_string()],
2479 vec![LogicalType::Int64],
2480 );
2481 assert_eq!(op.name(), "RemoveLabel");
2482
2483 let op = SetPropertyOperator::new_for_node(
2484 Arc::clone(&store),
2485 Box::new(EmptyInput),
2486 0,
2487 vec![],
2488 vec![LogicalType::Int64],
2489 );
2490 assert_eq!(op.name(), "SetProperty");
2491 }
2492}