1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
32 &self,
33 labels: &[String],
34 key: &str,
35 value: &Value,
36 ) -> Result<(), OperatorError>;
37
38 fn validate_node_complete(
46 &self,
47 labels: &[String],
48 properties: &[(String, Value)],
49 ) -> Result<(), OperatorError>;
50
51 fn check_unique_node_property(
57 &self,
58 labels: &[String],
59 key: &str,
60 value: &Value,
61 ) -> Result<(), OperatorError>;
62
63 fn validate_edge_property(
69 &self,
70 edge_type: &str,
71 key: &str,
72 value: &Value,
73 ) -> Result<(), OperatorError>;
74
75 fn validate_edge_complete(
81 &self,
82 edge_type: &str,
83 properties: &[(String, Value)],
84 ) -> Result<(), OperatorError>;
85
86 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
92 let _ = labels;
93 Ok(())
94 }
95
96 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
102 let _ = edge_type;
103 Ok(())
104 }
105
106 fn validate_edge_endpoints(
112 &self,
113 edge_type: &str,
114 source_labels: &[String],
115 target_labels: &[String],
116 ) -> Result<(), OperatorError> {
117 let _ = (edge_type, source_labels, target_labels);
118 Ok(())
119 }
120
121 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
124 let _ = (labels, properties);
125 }
126}
127
128pub struct CreateNodeOperator {
133 store: Arc<dyn GraphStoreMut>,
135 input: Option<Box<dyn Operator>>,
137 labels: Vec<String>,
139 properties: Vec<(String, PropertySource)>,
141 output_schema: Vec<LogicalType>,
143 output_column: usize,
145 executed: bool,
147 viewing_epoch: Option<EpochId>,
149 transaction_id: Option<TransactionId>,
151 validator: Option<Arc<dyn ConstraintValidator>>,
153 write_tracker: Option<SharedWriteTracker>,
155}
156
157#[derive(Debug, Clone)]
159pub enum PropertySource {
160 Column(usize),
162 Constant(Value),
164 PropertyAccess {
166 column: usize,
168 property: String,
170 },
171}
172
173impl PropertySource {
174 pub fn resolve(
176 &self,
177 chunk: &crate::execution::chunk::DataChunk,
178 row: usize,
179 store: &dyn GraphStore,
180 ) -> Value {
181 match self {
182 PropertySource::Column(col_idx) => chunk
183 .column(*col_idx)
184 .and_then(|c| c.get_value(row))
185 .unwrap_or(Value::Null),
186 PropertySource::Constant(v) => v.clone(),
187 PropertySource::PropertyAccess { column, property } => {
188 let Some(col) = chunk.column(*column) else {
189 return Value::Null;
190 };
191 if let Some(node_id) = col.get_node_id(row) {
193 store
194 .get_node(node_id)
195 .and_then(|node| node.get_property(property).cloned())
196 .unwrap_or(Value::Null)
197 } else if let Some(edge_id) = col.get_edge_id(row) {
198 store
199 .get_edge(edge_id)
200 .and_then(|edge| edge.get_property(property).cloned())
201 .unwrap_or(Value::Null)
202 } else if let Some(Value::Map(map)) = col.get_value(row) {
203 let key = PropertyKey::new(property);
204 map.get(&key).cloned().unwrap_or(Value::Null)
205 } else {
206 Value::Null
207 }
208 }
209 }
210 }
211}
212
213impl CreateNodeOperator {
214 pub fn new(
224 store: Arc<dyn GraphStoreMut>,
225 input: Option<Box<dyn Operator>>,
226 labels: Vec<String>,
227 properties: Vec<(String, PropertySource)>,
228 output_schema: Vec<LogicalType>,
229 output_column: usize,
230 ) -> Self {
231 Self {
232 store,
233 input,
234 labels,
235 properties,
236 output_schema,
237 output_column,
238 executed: false,
239 viewing_epoch: None,
240 transaction_id: None,
241 validator: None,
242 write_tracker: None,
243 }
244 }
245
246 pub fn with_transaction_context(
248 mut self,
249 epoch: EpochId,
250 transaction_id: Option<TransactionId>,
251 ) -> Self {
252 self.viewing_epoch = Some(epoch);
253 self.transaction_id = transaction_id;
254 self
255 }
256
257 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
259 self.validator = Some(validator);
260 self
261 }
262
263 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
265 self.write_tracker = Some(tracker);
266 self
267 }
268}
269
270impl CreateNodeOperator {
271 fn validate_and_set_properties(
273 &self,
274 node_id: NodeId,
275 resolved_props: &mut Vec<(String, Value)>,
276 ) -> Result<(), OperatorError> {
277 if let Some(ref validator) = self.validator {
279 validator.validate_node_labels_allowed(&self.labels)?;
280 }
281
282 if let Some(ref validator) = self.validator {
284 validator.inject_defaults(&self.labels, resolved_props);
285 }
286
287 if let Some(ref validator) = self.validator {
289 for (name, value) in resolved_props.iter() {
290 validator.validate_node_property(&self.labels, name, value)?;
291 validator.check_unique_node_property(&self.labels, name, value)?;
292 }
293 validator.validate_node_complete(&self.labels, resolved_props)?;
295 }
296
297 if let Some(tid) = self.transaction_id {
299 for (name, value) in resolved_props.iter() {
300 self.store
301 .set_node_property_versioned(node_id, name, value.clone(), tid);
302 }
303 } else {
304 for (name, value) in resolved_props.iter() {
305 self.store.set_node_property(node_id, name, value.clone());
306 }
307 }
308 Ok(())
309 }
310}
311
312impl Operator for CreateNodeOperator {
313 fn next(&mut self) -> OperatorResult {
314 let epoch = self
316 .viewing_epoch
317 .unwrap_or_else(|| self.store.current_epoch());
318 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
319
320 if let Some(ref mut input) = self.input {
321 if let Some(chunk) = input.next()? {
323 let mut builder =
324 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
325
326 for row in chunk.selected_indices() {
327 let mut resolved_props: Vec<(String, Value)> = self
329 .properties
330 .iter()
331 .map(|(name, source)| {
332 let value =
333 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
334 (name.clone(), value)
335 })
336 .collect();
337
338 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
340 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
341
342 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
344 tracker.record_node_write(tid, node_id)?;
345 }
346
347 self.validate_and_set_properties(node_id, &mut resolved_props)?;
349
350 for col_idx in 0..chunk.column_count() {
352 if col_idx < self.output_column
353 && let (Some(src), Some(dst)) =
354 (chunk.column(col_idx), builder.column_mut(col_idx))
355 {
356 if let Some(val) = src.get_value(row) {
357 dst.push_value(val);
358 } else {
359 dst.push_value(Value::Null);
360 }
361 }
362 }
363
364 if let Some(dst) = builder.column_mut(self.output_column) {
366 dst.push_value(Value::Int64(node_id.0 as i64));
367 }
368
369 builder.advance_row();
370 }
371
372 return Ok(Some(builder.finish()));
373 }
374 Ok(None)
375 } else {
376 if self.executed {
378 return Ok(None);
379 }
380 self.executed = true;
381
382 let mut resolved_props: Vec<(String, Value)> = self
384 .properties
385 .iter()
386 .filter_map(|(name, source)| {
387 if let PropertySource::Constant(value) = source {
388 Some((name.clone(), value.clone()))
389 } else {
390 None
391 }
392 })
393 .collect();
394
395 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
397 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
398
399 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
401 tracker.record_node_write(tid, node_id)?;
402 }
403
404 self.validate_and_set_properties(node_id, &mut resolved_props)?;
406
407 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
409 if let Some(dst) = builder.column_mut(self.output_column) {
410 dst.push_value(Value::Int64(node_id.0 as i64));
411 }
412 builder.advance_row();
413
414 Ok(Some(builder.finish()))
415 }
416 }
417
418 fn reset(&mut self) {
419 if let Some(ref mut input) = self.input {
420 input.reset();
421 }
422 self.executed = false;
423 }
424
425 fn name(&self) -> &'static str {
426 "CreateNode"
427 }
428}
429
430pub struct CreateEdgeOperator {
432 store: Arc<dyn GraphStoreMut>,
434 input: Box<dyn Operator>,
436 from_column: usize,
438 to_column: usize,
440 edge_type: String,
442 properties: Vec<(String, PropertySource)>,
444 output_schema: Vec<LogicalType>,
446 output_column: Option<usize>,
448 viewing_epoch: Option<EpochId>,
450 transaction_id: Option<TransactionId>,
452 validator: Option<Arc<dyn ConstraintValidator>>,
454 write_tracker: Option<SharedWriteTracker>,
456}
457
458impl CreateEdgeOperator {
459 pub fn new(
466 store: Arc<dyn GraphStoreMut>,
467 input: Box<dyn Operator>,
468 from_column: usize,
469 to_column: usize,
470 edge_type: String,
471 output_schema: Vec<LogicalType>,
472 ) -> Self {
473 Self {
474 store,
475 input,
476 from_column,
477 to_column,
478 edge_type,
479 properties: Vec::new(),
480 output_schema,
481 output_column: None,
482 viewing_epoch: None,
483 transaction_id: None,
484 validator: None,
485 write_tracker: None,
486 }
487 }
488
489 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
491 self.properties = properties;
492 self
493 }
494
495 pub fn with_output_column(mut self, column: usize) -> Self {
497 self.output_column = Some(column);
498 self
499 }
500
501 pub fn with_transaction_context(
503 mut self,
504 epoch: EpochId,
505 transaction_id: Option<TransactionId>,
506 ) -> Self {
507 self.viewing_epoch = Some(epoch);
508 self.transaction_id = transaction_id;
509 self
510 }
511
512 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
514 self.validator = Some(validator);
515 self
516 }
517
518 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
520 self.write_tracker = Some(tracker);
521 self
522 }
523}
524
525impl Operator for CreateEdgeOperator {
526 fn next(&mut self) -> OperatorResult {
527 let epoch = self
529 .viewing_epoch
530 .unwrap_or_else(|| self.store.current_epoch());
531 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
532
533 if let Some(chunk) = self.input.next()? {
534 let mut builder =
535 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
536
537 for row in chunk.selected_indices() {
538 let from_id = chunk
540 .column(self.from_column)
541 .and_then(|c| c.get_value(row))
542 .ok_or_else(|| {
543 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
544 })?;
545
546 let to_id = chunk
547 .column(self.to_column)
548 .and_then(|c| c.get_value(row))
549 .ok_or_else(|| {
550 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
551 })?;
552
553 let from_node_id = match from_id {
555 Value::Int64(id) => NodeId(id as u64),
556 _ => {
557 return Err(OperatorError::TypeMismatch {
558 expected: "Int64 (node ID)".to_string(),
559 found: format!("{from_id:?}"),
560 });
561 }
562 };
563
564 let to_node_id = match to_id {
565 Value::Int64(id) => NodeId(id as u64),
566 _ => {
567 return Err(OperatorError::TypeMismatch {
568 expected: "Int64 (node ID)".to_string(),
569 found: format!("{to_id:?}"),
570 });
571 }
572 };
573
574 if let Some(ref validator) = self.validator {
576 validator.validate_edge_type_allowed(&self.edge_type)?;
577
578 let source_labels: Vec<String> = self
580 .store
581 .get_node(from_node_id)
582 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
583 .unwrap_or_default();
584 let target_labels: Vec<String> = self
585 .store
586 .get_node(to_node_id)
587 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
588 .unwrap_or_default();
589 validator.validate_edge_endpoints(
590 &self.edge_type,
591 &source_labels,
592 &target_labels,
593 )?;
594 }
595
596 let resolved_props: Vec<(String, Value)> = self
598 .properties
599 .iter()
600 .map(|(name, source)| {
601 let value =
602 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
603 (name.clone(), value)
604 })
605 .collect();
606
607 if let Some(ref validator) = self.validator {
609 for (name, value) in &resolved_props {
610 validator.validate_edge_property(&self.edge_type, name, value)?;
611 }
612 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
613 }
614
615 let edge_id = self.store.create_edge_versioned(
617 from_node_id,
618 to_node_id,
619 &self.edge_type,
620 epoch,
621 tx,
622 );
623
624 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
626 tracker.record_edge_write(tid, edge_id)?;
627 }
628
629 if let Some(tid) = self.transaction_id {
631 for (name, value) in resolved_props {
632 self.store
633 .set_edge_property_versioned(edge_id, &name, value, tid);
634 }
635 } else {
636 for (name, value) in resolved_props {
637 self.store.set_edge_property(edge_id, &name, value);
638 }
639 }
640
641 for col_idx in 0..chunk.column_count() {
643 if let (Some(src), Some(dst)) =
644 (chunk.column(col_idx), builder.column_mut(col_idx))
645 {
646 if let Some(val) = src.get_value(row) {
647 dst.push_value(val);
648 } else {
649 dst.push_value(Value::Null);
650 }
651 }
652 }
653
654 if let Some(out_col) = self.output_column
656 && let Some(dst) = builder.column_mut(out_col)
657 {
658 dst.push_value(Value::Int64(edge_id.0 as i64));
659 }
660
661 builder.advance_row();
662 }
663
664 return Ok(Some(builder.finish()));
665 }
666 Ok(None)
667 }
668
669 fn reset(&mut self) {
670 self.input.reset();
671 }
672
673 fn name(&self) -> &'static str {
674 "CreateEdge"
675 }
676}
677
678pub struct DeleteNodeOperator {
680 store: Arc<dyn GraphStoreMut>,
682 input: Box<dyn Operator>,
684 node_column: usize,
686 output_schema: Vec<LogicalType>,
688 detach: bool,
690 viewing_epoch: Option<EpochId>,
692 transaction_id: Option<TransactionId>,
694 write_tracker: Option<SharedWriteTracker>,
696}
697
698impl DeleteNodeOperator {
699 pub fn new(
701 store: Arc<dyn GraphStoreMut>,
702 input: Box<dyn Operator>,
703 node_column: usize,
704 output_schema: Vec<LogicalType>,
705 detach: bool,
706 ) -> Self {
707 Self {
708 store,
709 input,
710 node_column,
711 output_schema,
712 detach,
713 viewing_epoch: None,
714 transaction_id: None,
715 write_tracker: None,
716 }
717 }
718
719 pub fn with_transaction_context(
721 mut self,
722 epoch: EpochId,
723 transaction_id: Option<TransactionId>,
724 ) -> Self {
725 self.viewing_epoch = Some(epoch);
726 self.transaction_id = transaction_id;
727 self
728 }
729
730 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
732 self.write_tracker = Some(tracker);
733 self
734 }
735}
736
737impl Operator for DeleteNodeOperator {
738 fn next(&mut self) -> OperatorResult {
739 let epoch = self
741 .viewing_epoch
742 .unwrap_or_else(|| self.store.current_epoch());
743 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
744
745 if let Some(chunk) = self.input.next()? {
746 let mut builder =
747 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
748
749 for row in chunk.selected_indices() {
750 let node_val = chunk
751 .column(self.node_column)
752 .and_then(|c| c.get_value(row))
753 .ok_or_else(|| {
754 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
755 })?;
756
757 let node_id = match node_val {
758 Value::Int64(id) => NodeId(id as u64),
759 _ => {
760 return Err(OperatorError::TypeMismatch {
761 expected: "Int64 (node ID)".to_string(),
762 found: format!("{node_val:?}"),
763 });
764 }
765 };
766
767 if self.detach {
768 let outgoing = self
771 .store
772 .edges_from(node_id, crate::graph::Direction::Outgoing);
773 let incoming = self
774 .store
775 .edges_from(node_id, crate::graph::Direction::Incoming);
776 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
777 self.store.delete_edge_versioned(edge_id, epoch, tx);
778 if let (Some(tracker), Some(tid)) =
779 (&self.write_tracker, self.transaction_id)
780 {
781 tracker.record_edge_write(tid, edge_id)?;
782 }
783 }
784 } else {
785 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
787 if degree > 0 {
788 return Err(OperatorError::ConstraintViolation(format!(
789 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
790 degree
791 )));
792 }
793 }
794
795 self.store.delete_node_versioned(node_id, epoch, tx);
797
798 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
800 tracker.record_node_write(tid, node_id)?;
801 }
802
803 for col_idx in 0..chunk.column_count() {
806 if let (Some(src), Some(dst)) =
807 (chunk.column(col_idx), builder.column_mut(col_idx))
808 {
809 if let Some(val) = src.get_value(row) {
810 dst.push_value(val);
811 } else {
812 dst.push_value(Value::Null);
813 }
814 }
815 }
816 builder.advance_row();
817 }
818
819 return Ok(Some(builder.finish()));
820 }
821 Ok(None)
822 }
823
824 fn reset(&mut self) {
825 self.input.reset();
826 }
827
828 fn name(&self) -> &'static str {
829 "DeleteNode"
830 }
831}
832
833pub struct DeleteEdgeOperator {
835 store: Arc<dyn GraphStoreMut>,
837 input: Box<dyn Operator>,
839 edge_column: usize,
841 output_schema: Vec<LogicalType>,
843 viewing_epoch: Option<EpochId>,
845 transaction_id: Option<TransactionId>,
847 write_tracker: Option<SharedWriteTracker>,
849}
850
851impl DeleteEdgeOperator {
852 pub fn new(
854 store: Arc<dyn GraphStoreMut>,
855 input: Box<dyn Operator>,
856 edge_column: usize,
857 output_schema: Vec<LogicalType>,
858 ) -> Self {
859 Self {
860 store,
861 input,
862 edge_column,
863 output_schema,
864 viewing_epoch: None,
865 transaction_id: None,
866 write_tracker: None,
867 }
868 }
869
870 pub fn with_transaction_context(
872 mut self,
873 epoch: EpochId,
874 transaction_id: Option<TransactionId>,
875 ) -> Self {
876 self.viewing_epoch = Some(epoch);
877 self.transaction_id = transaction_id;
878 self
879 }
880
881 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
883 self.write_tracker = Some(tracker);
884 self
885 }
886}
887
888impl Operator for DeleteEdgeOperator {
889 fn next(&mut self) -> OperatorResult {
890 let epoch = self
892 .viewing_epoch
893 .unwrap_or_else(|| self.store.current_epoch());
894 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
895
896 if let Some(chunk) = self.input.next()? {
897 let mut builder =
898 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
899
900 for row in chunk.selected_indices() {
901 let edge_val = chunk
902 .column(self.edge_column)
903 .and_then(|c| c.get_value(row))
904 .ok_or_else(|| {
905 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
906 })?;
907
908 let edge_id = match edge_val {
909 Value::Int64(id) => EdgeId(id as u64),
910 _ => {
911 return Err(OperatorError::TypeMismatch {
912 expected: "Int64 (edge ID)".to_string(),
913 found: format!("{edge_val:?}"),
914 });
915 }
916 };
917
918 self.store.delete_edge_versioned(edge_id, epoch, tx);
920
921 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
923 tracker.record_edge_write(tid, edge_id)?;
924 }
925
926 for col_idx in 0..chunk.column_count() {
928 if let (Some(src), Some(dst)) =
929 (chunk.column(col_idx), builder.column_mut(col_idx))
930 {
931 if let Some(val) = src.get_value(row) {
932 dst.push_value(val);
933 } else {
934 dst.push_value(Value::Null);
935 }
936 }
937 }
938 builder.advance_row();
939 }
940
941 return Ok(Some(builder.finish()));
942 }
943 Ok(None)
944 }
945
946 fn reset(&mut self) {
947 self.input.reset();
948 }
949
950 fn name(&self) -> &'static str {
951 "DeleteEdge"
952 }
953}
954
955pub struct AddLabelOperator {
957 store: Arc<dyn GraphStoreMut>,
959 input: Box<dyn Operator>,
961 node_column: usize,
963 labels: Vec<String>,
965 output_schema: Vec<LogicalType>,
967 count_column: usize,
969 viewing_epoch: Option<EpochId>,
971 transaction_id: Option<TransactionId>,
973 write_tracker: Option<SharedWriteTracker>,
975}
976
977impl AddLabelOperator {
978 pub fn new(
980 store: Arc<dyn GraphStoreMut>,
981 input: Box<dyn Operator>,
982 node_column: usize,
983 labels: Vec<String>,
984 output_schema: Vec<LogicalType>,
985 ) -> Self {
986 let count_column = output_schema.len() - 1;
987 Self {
988 store,
989 input,
990 node_column,
991 labels,
992 count_column,
993 output_schema,
994 viewing_epoch: None,
995 transaction_id: None,
996 write_tracker: None,
997 }
998 }
999
1000 pub fn with_transaction_context(
1002 mut self,
1003 epoch: EpochId,
1004 transaction_id: Option<TransactionId>,
1005 ) -> Self {
1006 self.viewing_epoch = Some(epoch);
1007 self.transaction_id = transaction_id;
1008 self
1009 }
1010
1011 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1013 self.write_tracker = Some(tracker);
1014 self
1015 }
1016}
1017
1018impl Operator for AddLabelOperator {
1019 fn next(&mut self) -> OperatorResult {
1020 if let Some(chunk) = self.input.next()? {
1021 let mut builder =
1022 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1023
1024 for row in chunk.selected_indices() {
1025 let node_val = chunk
1026 .column(self.node_column)
1027 .and_then(|c| c.get_value(row))
1028 .ok_or_else(|| {
1029 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1030 })?;
1031
1032 let node_id = match node_val {
1033 Value::Int64(id) => NodeId(id as u64),
1034 _ => {
1035 return Err(OperatorError::TypeMismatch {
1036 expected: "Int64 (node ID)".to_string(),
1037 found: format!("{node_val:?}"),
1038 });
1039 }
1040 };
1041
1042 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1044 tracker.record_node_write(tid, node_id)?;
1045 }
1046
1047 let mut row_count: i64 = 0;
1049 for label in &self.labels {
1050 let added = if let Some(tid) = self.transaction_id {
1051 self.store.add_label_versioned(node_id, label, tid)
1052 } else {
1053 self.store.add_label(node_id, label)
1054 };
1055 if added {
1056 row_count += 1;
1057 }
1058 }
1059
1060 for col_idx in 0..chunk.column_count() {
1062 if let (Some(src), Some(dst)) =
1063 (chunk.column(col_idx), builder.column_mut(col_idx))
1064 {
1065 if let Some(val) = src.get_value(row) {
1066 dst.push_value(val);
1067 } else {
1068 dst.push_value(Value::Null);
1069 }
1070 }
1071 }
1072 if let Some(dst) = builder.column_mut(self.count_column) {
1074 dst.push_value(Value::Int64(row_count));
1075 }
1076
1077 builder.advance_row();
1078 }
1079
1080 return Ok(Some(builder.finish()));
1081 }
1082 Ok(None)
1083 }
1084
1085 fn reset(&mut self) {
1086 self.input.reset();
1087 }
1088
1089 fn name(&self) -> &'static str {
1090 "AddLabel"
1091 }
1092}
1093
1094pub struct RemoveLabelOperator {
1096 store: Arc<dyn GraphStoreMut>,
1098 input: Box<dyn Operator>,
1100 node_column: usize,
1102 labels: Vec<String>,
1104 output_schema: Vec<LogicalType>,
1106 count_column: usize,
1108 viewing_epoch: Option<EpochId>,
1110 transaction_id: Option<TransactionId>,
1112 write_tracker: Option<SharedWriteTracker>,
1114}
1115
1116impl RemoveLabelOperator {
1117 pub fn new(
1119 store: Arc<dyn GraphStoreMut>,
1120 input: Box<dyn Operator>,
1121 node_column: usize,
1122 labels: Vec<String>,
1123 output_schema: Vec<LogicalType>,
1124 ) -> Self {
1125 let count_column = output_schema.len() - 1;
1126 Self {
1127 store,
1128 input,
1129 node_column,
1130 labels,
1131 count_column,
1132 output_schema,
1133 viewing_epoch: None,
1134 transaction_id: None,
1135 write_tracker: None,
1136 }
1137 }
1138
1139 pub fn with_transaction_context(
1141 mut self,
1142 epoch: EpochId,
1143 transaction_id: Option<TransactionId>,
1144 ) -> Self {
1145 self.viewing_epoch = Some(epoch);
1146 self.transaction_id = transaction_id;
1147 self
1148 }
1149
1150 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1152 self.write_tracker = Some(tracker);
1153 self
1154 }
1155}
1156
1157impl Operator for RemoveLabelOperator {
1158 fn next(&mut self) -> OperatorResult {
1159 if let Some(chunk) = self.input.next()? {
1160 let mut builder =
1161 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1162
1163 for row in chunk.selected_indices() {
1164 let node_val = chunk
1165 .column(self.node_column)
1166 .and_then(|c| c.get_value(row))
1167 .ok_or_else(|| {
1168 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1169 })?;
1170
1171 let node_id = match node_val {
1172 Value::Int64(id) => NodeId(id as u64),
1173 _ => {
1174 return Err(OperatorError::TypeMismatch {
1175 expected: "Int64 (node ID)".to_string(),
1176 found: format!("{node_val:?}"),
1177 });
1178 }
1179 };
1180
1181 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1183 tracker.record_node_write(tid, node_id)?;
1184 }
1185
1186 let mut row_count: i64 = 0;
1188 for label in &self.labels {
1189 let removed = if let Some(tid) = self.transaction_id {
1190 self.store.remove_label_versioned(node_id, label, tid)
1191 } else {
1192 self.store.remove_label(node_id, label)
1193 };
1194 if removed {
1195 row_count += 1;
1196 }
1197 }
1198
1199 for col_idx in 0..chunk.column_count() {
1201 if let (Some(src), Some(dst)) =
1202 (chunk.column(col_idx), builder.column_mut(col_idx))
1203 {
1204 if let Some(val) = src.get_value(row) {
1205 dst.push_value(val);
1206 } else {
1207 dst.push_value(Value::Null);
1208 }
1209 }
1210 }
1211 if let Some(dst) = builder.column_mut(self.count_column) {
1213 dst.push_value(Value::Int64(row_count));
1214 }
1215
1216 builder.advance_row();
1217 }
1218
1219 return Ok(Some(builder.finish()));
1220 }
1221 Ok(None)
1222 }
1223
1224 fn reset(&mut self) {
1225 self.input.reset();
1226 }
1227
1228 fn name(&self) -> &'static str {
1229 "RemoveLabel"
1230 }
1231}
1232
1233pub struct SetPropertyOperator {
1238 store: Arc<dyn GraphStoreMut>,
1240 input: Box<dyn Operator>,
1242 entity_column: usize,
1244 is_edge: bool,
1246 properties: Vec<(String, PropertySource)>,
1248 output_schema: Vec<LogicalType>,
1250 replace: bool,
1252 validator: Option<Arc<dyn ConstraintValidator>>,
1254 labels: Vec<String>,
1256 edge_type_name: Option<String>,
1258 viewing_epoch: Option<EpochId>,
1260 transaction_id: Option<TransactionId>,
1262 write_tracker: Option<SharedWriteTracker>,
1264}
1265
1266impl SetPropertyOperator {
1267 pub fn new_for_node(
1269 store: Arc<dyn GraphStoreMut>,
1270 input: Box<dyn Operator>,
1271 node_column: usize,
1272 properties: Vec<(String, PropertySource)>,
1273 output_schema: Vec<LogicalType>,
1274 ) -> Self {
1275 Self {
1276 store,
1277 input,
1278 entity_column: node_column,
1279 is_edge: false,
1280 properties,
1281 output_schema,
1282 replace: false,
1283 validator: None,
1284 labels: Vec::new(),
1285 edge_type_name: None,
1286 viewing_epoch: None,
1287 transaction_id: None,
1288 write_tracker: None,
1289 }
1290 }
1291
1292 pub fn new_for_edge(
1294 store: Arc<dyn GraphStoreMut>,
1295 input: Box<dyn Operator>,
1296 edge_column: usize,
1297 properties: Vec<(String, PropertySource)>,
1298 output_schema: Vec<LogicalType>,
1299 ) -> Self {
1300 Self {
1301 store,
1302 input,
1303 entity_column: edge_column,
1304 is_edge: true,
1305 properties,
1306 output_schema,
1307 replace: false,
1308 validator: None,
1309 labels: Vec::new(),
1310 edge_type_name: None,
1311 viewing_epoch: None,
1312 transaction_id: None,
1313 write_tracker: None,
1314 }
1315 }
1316
1317 pub fn with_replace(mut self, replace: bool) -> Self {
1319 self.replace = replace;
1320 self
1321 }
1322
1323 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1325 self.validator = Some(validator);
1326 self
1327 }
1328
1329 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1331 self.labels = labels;
1332 self
1333 }
1334
1335 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1337 self.edge_type_name = Some(edge_type);
1338 self
1339 }
1340
1341 pub fn with_transaction_context(
1346 mut self,
1347 epoch: EpochId,
1348 transaction_id: Option<TransactionId>,
1349 ) -> Self {
1350 self.viewing_epoch = Some(epoch);
1351 self.transaction_id = transaction_id;
1352 self
1353 }
1354
1355 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1357 self.write_tracker = Some(tracker);
1358 self
1359 }
1360}
1361
1362impl Operator for SetPropertyOperator {
1363 fn next(&mut self) -> OperatorResult {
1364 if let Some(chunk) = self.input.next()? {
1365 let mut builder =
1366 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1367
1368 for row in chunk.selected_indices() {
1369 let entity_val = chunk
1370 .column(self.entity_column)
1371 .and_then(|c| c.get_value(row))
1372 .ok_or_else(|| {
1373 OperatorError::ColumnNotFound(format!(
1374 "entity column {}",
1375 self.entity_column
1376 ))
1377 })?;
1378
1379 let entity_id = match entity_val {
1380 Value::Int64(id) => id as u64,
1381 _ => {
1382 return Err(OperatorError::TypeMismatch {
1383 expected: "Int64 (entity ID)".to_string(),
1384 found: format!("{entity_val:?}"),
1385 });
1386 }
1387 };
1388
1389 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1391 if self.is_edge {
1392 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1393 } else {
1394 tracker.record_node_write(tid, NodeId(entity_id))?;
1395 }
1396 }
1397
1398 let resolved_props: Vec<(String, Value)> = self
1400 .properties
1401 .iter()
1402 .map(|(name, source)| {
1403 let value =
1404 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1405 (name.clone(), value)
1406 })
1407 .collect();
1408
1409 if let Some(ref validator) = self.validator {
1411 if self.is_edge {
1412 if let Some(ref et) = self.edge_type_name {
1413 for (name, value) in &resolved_props {
1414 validator.validate_edge_property(et, name, value)?;
1415 }
1416 }
1417 } else {
1418 for (name, value) in &resolved_props {
1419 validator.validate_node_property(&self.labels, name, value)?;
1420 validator.check_unique_node_property(&self.labels, name, value)?;
1421 }
1422 }
1423 }
1424
1425 let tx_id = self.transaction_id;
1427 for (prop_name, value) in resolved_props {
1428 if prop_name == "*" {
1429 if let Value::Map(map) = value {
1431 if self.replace {
1432 if self.is_edge {
1434 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1435 let keys: Vec<String> = edge
1436 .properties
1437 .iter()
1438 .map(|(k, _)| k.as_str().to_string())
1439 .collect();
1440 for key in keys {
1441 if let Some(tid) = tx_id {
1442 self.store.remove_edge_property_versioned(
1443 EdgeId(entity_id),
1444 &key,
1445 tid,
1446 );
1447 } else {
1448 self.store
1449 .remove_edge_property(EdgeId(entity_id), &key);
1450 }
1451 }
1452 }
1453 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1454 let keys: Vec<String> = node
1455 .properties
1456 .iter()
1457 .map(|(k, _)| k.as_str().to_string())
1458 .collect();
1459 for key in keys {
1460 if let Some(tid) = tx_id {
1461 self.store.remove_node_property_versioned(
1462 NodeId(entity_id),
1463 &key,
1464 tid,
1465 );
1466 } else {
1467 self.store
1468 .remove_node_property(NodeId(entity_id), &key);
1469 }
1470 }
1471 }
1472 }
1473 for (key, val) in map.iter() {
1475 if val.is_null() {
1476 if self.is_edge {
1478 if let Some(tid) = tx_id {
1479 self.store.remove_edge_property_versioned(
1480 EdgeId(entity_id),
1481 key.as_str(),
1482 tid,
1483 );
1484 } else {
1485 self.store.remove_edge_property(
1486 EdgeId(entity_id),
1487 key.as_str(),
1488 );
1489 }
1490 } else if let Some(tid) = tx_id {
1491 self.store.remove_node_property_versioned(
1492 NodeId(entity_id),
1493 key.as_str(),
1494 tid,
1495 );
1496 } else {
1497 self.store
1498 .remove_node_property(NodeId(entity_id), key.as_str());
1499 }
1500 } else if self.is_edge {
1501 if let Some(tid) = tx_id {
1502 self.store.set_edge_property_versioned(
1503 EdgeId(entity_id),
1504 key.as_str(),
1505 val.clone(),
1506 tid,
1507 );
1508 } else {
1509 self.store.set_edge_property(
1510 EdgeId(entity_id),
1511 key.as_str(),
1512 val.clone(),
1513 );
1514 }
1515 } else if let Some(tid) = tx_id {
1516 self.store.set_node_property_versioned(
1517 NodeId(entity_id),
1518 key.as_str(),
1519 val.clone(),
1520 tid,
1521 );
1522 } else {
1523 self.store.set_node_property(
1524 NodeId(entity_id),
1525 key.as_str(),
1526 val.clone(),
1527 );
1528 }
1529 }
1530 }
1531 } else if self.is_edge {
1532 if let Some(tid) = tx_id {
1533 self.store.set_edge_property_versioned(
1534 EdgeId(entity_id),
1535 &prop_name,
1536 value,
1537 tid,
1538 );
1539 } else {
1540 self.store
1541 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1542 }
1543 } else if let Some(tid) = tx_id {
1544 self.store.set_node_property_versioned(
1545 NodeId(entity_id),
1546 &prop_name,
1547 value,
1548 tid,
1549 );
1550 } else {
1551 self.store
1552 .set_node_property(NodeId(entity_id), &prop_name, value);
1553 }
1554 }
1555
1556 for col_idx in 0..chunk.column_count() {
1558 if let (Some(src), Some(dst)) =
1559 (chunk.column(col_idx), builder.column_mut(col_idx))
1560 {
1561 if let Some(val) = src.get_value(row) {
1562 dst.push_value(val);
1563 } else {
1564 dst.push_value(Value::Null);
1565 }
1566 }
1567 }
1568
1569 builder.advance_row();
1570 }
1571
1572 return Ok(Some(builder.finish()));
1573 }
1574 Ok(None)
1575 }
1576
1577 fn reset(&mut self) {
1578 self.input.reset();
1579 }
1580
1581 fn name(&self) -> &'static str {
1582 "SetProperty"
1583 }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588 use super::*;
1589 use crate::execution::DataChunk;
1590 use crate::execution::chunk::DataChunkBuilder;
1591 use crate::graph::lpg::LpgStore;
1592
1593 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1596 Arc::new(LpgStore::new().unwrap())
1597 }
1598
1599 struct MockInput {
1600 chunk: Option<DataChunk>,
1601 }
1602
1603 impl MockInput {
1604 fn boxed(chunk: DataChunk) -> Box<Self> {
1605 Box::new(Self { chunk: Some(chunk) })
1606 }
1607 }
1608
1609 impl Operator for MockInput {
1610 fn next(&mut self) -> OperatorResult {
1611 Ok(self.chunk.take())
1612 }
1613 fn reset(&mut self) {}
1614 fn name(&self) -> &'static str {
1615 "MockInput"
1616 }
1617 }
1618
1619 struct EmptyInput;
1620 impl Operator for EmptyInput {
1621 fn next(&mut self) -> OperatorResult {
1622 Ok(None)
1623 }
1624 fn reset(&mut self) {}
1625 fn name(&self) -> &'static str {
1626 "EmptyInput"
1627 }
1628 }
1629
1630 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1631 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1632 for id in ids {
1633 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1634 builder.advance_row();
1635 }
1636 builder.finish()
1637 }
1638
1639 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1640 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1641 for id in ids {
1642 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1643 builder.advance_row();
1644 }
1645 builder.finish()
1646 }
1647
1648 #[test]
1651 fn test_create_node_standalone() {
1652 let store = create_test_store();
1653
1654 let mut op = CreateNodeOperator::new(
1655 Arc::clone(&store),
1656 None,
1657 vec!["Person".to_string()],
1658 vec![(
1659 "name".to_string(),
1660 PropertySource::Constant(Value::String("Alix".into())),
1661 )],
1662 vec![LogicalType::Int64],
1663 0,
1664 );
1665
1666 let chunk = op.next().unwrap().unwrap();
1667 assert_eq!(chunk.row_count(), 1);
1668
1669 assert!(op.next().unwrap().is_none());
1671
1672 assert_eq!(store.node_count(), 1);
1673 }
1674
1675 #[test]
1676 fn test_create_edge() {
1677 let store = create_test_store();
1678
1679 let node1 = store.create_node(&["Person"]);
1680 let node2 = store.create_node(&["Person"]);
1681
1682 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1683 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1684 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1685 builder.advance_row();
1686
1687 let mut op = CreateEdgeOperator::new(
1688 Arc::clone(&store),
1689 MockInput::boxed(builder.finish()),
1690 0,
1691 1,
1692 "KNOWS".to_string(),
1693 vec![LogicalType::Int64, LogicalType::Int64],
1694 );
1695
1696 let _chunk = op.next().unwrap().unwrap();
1697 assert_eq!(store.edge_count(), 1);
1698 }
1699
1700 #[test]
1701 fn test_delete_node() {
1702 let store = create_test_store();
1703
1704 let node_id = store.create_node(&["Person"]);
1705 assert_eq!(store.node_count(), 1);
1706
1707 let mut op = DeleteNodeOperator::new(
1708 Arc::clone(&store),
1709 MockInput::boxed(node_id_chunk(&[node_id])),
1710 0,
1711 vec![LogicalType::Node],
1712 false,
1713 );
1714
1715 let chunk = op.next().unwrap().unwrap();
1716 assert_eq!(chunk.row_count(), 1);
1718 assert_eq!(store.node_count(), 0);
1719 }
1720
1721 #[test]
1724 fn test_delete_edge() {
1725 let store = create_test_store();
1726
1727 let n1 = store.create_node(&["Person"]);
1728 let n2 = store.create_node(&["Person"]);
1729 let eid = store.create_edge(n1, n2, "KNOWS");
1730 assert_eq!(store.edge_count(), 1);
1731
1732 let mut op = DeleteEdgeOperator::new(
1733 Arc::clone(&store),
1734 MockInput::boxed(edge_id_chunk(&[eid])),
1735 0,
1736 vec![LogicalType::Node],
1737 );
1738
1739 let chunk = op.next().unwrap().unwrap();
1740 assert_eq!(chunk.row_count(), 1);
1741 assert_eq!(store.edge_count(), 0);
1742 }
1743
1744 #[test]
1745 fn test_delete_edge_no_input_returns_none() {
1746 let store = create_test_store();
1747
1748 let mut op = DeleteEdgeOperator::new(
1749 Arc::clone(&store),
1750 Box::new(EmptyInput),
1751 0,
1752 vec![LogicalType::Int64],
1753 );
1754
1755 assert!(op.next().unwrap().is_none());
1756 }
1757
1758 #[test]
1759 fn test_delete_multiple_edges() {
1760 let store = create_test_store();
1761
1762 let n1 = store.create_node(&["N"]);
1763 let n2 = store.create_node(&["N"]);
1764 let e1 = store.create_edge(n1, n2, "R");
1765 let e2 = store.create_edge(n2, n1, "S");
1766 assert_eq!(store.edge_count(), 2);
1767
1768 let mut op = DeleteEdgeOperator::new(
1769 Arc::clone(&store),
1770 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1771 0,
1772 vec![LogicalType::Node],
1773 );
1774
1775 let chunk = op.next().unwrap().unwrap();
1776 assert_eq!(chunk.row_count(), 2);
1777 assert_eq!(store.edge_count(), 0);
1778 }
1779
1780 #[test]
1783 fn test_delete_node_detach() {
1784 let store = create_test_store();
1785
1786 let n1 = store.create_node(&["Person"]);
1787 let n2 = store.create_node(&["Person"]);
1788 store.create_edge(n1, n2, "KNOWS");
1789 store.create_edge(n2, n1, "FOLLOWS");
1790 assert_eq!(store.edge_count(), 2);
1791
1792 let mut op = DeleteNodeOperator::new(
1793 Arc::clone(&store),
1794 MockInput::boxed(node_id_chunk(&[n1])),
1795 0,
1796 vec![LogicalType::Node],
1797 true, );
1799
1800 let chunk = op.next().unwrap().unwrap();
1801 assert_eq!(chunk.row_count(), 1);
1802 assert_eq!(store.node_count(), 1);
1803 assert_eq!(store.edge_count(), 0); }
1805
1806 #[test]
1809 fn test_add_label() {
1810 let store = create_test_store();
1811
1812 let node = store.create_node(&["Person"]);
1813
1814 let mut op = AddLabelOperator::new(
1815 Arc::clone(&store),
1816 MockInput::boxed(node_id_chunk(&[node])),
1817 0,
1818 vec!["Employee".to_string()],
1819 vec![LogicalType::Int64, LogicalType::Int64],
1820 );
1821
1822 let chunk = op.next().unwrap().unwrap();
1823 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1824 assert_eq!(updated, 1);
1825
1826 let node_data = store.get_node(node).unwrap();
1828 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1829 assert!(labels.contains(&"Person"));
1830 assert!(labels.contains(&"Employee"));
1831 }
1832
1833 #[test]
1834 fn test_add_multiple_labels() {
1835 let store = create_test_store();
1836
1837 let node = store.create_node(&["Base"]);
1838
1839 let mut op = AddLabelOperator::new(
1840 Arc::clone(&store),
1841 MockInput::boxed(node_id_chunk(&[node])),
1842 0,
1843 vec!["LabelA".to_string(), "LabelB".to_string()],
1844 vec![LogicalType::Int64, LogicalType::Int64],
1845 );
1846
1847 let chunk = op.next().unwrap().unwrap();
1848 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1849 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1852 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1853 assert!(labels.contains(&"LabelA"));
1854 assert!(labels.contains(&"LabelB"));
1855 }
1856
1857 #[test]
1858 fn test_add_label_no_input_returns_none() {
1859 let store = create_test_store();
1860
1861 let mut op = AddLabelOperator::new(
1862 Arc::clone(&store),
1863 Box::new(EmptyInput),
1864 0,
1865 vec!["Foo".to_string()],
1866 vec![LogicalType::Int64, LogicalType::Int64],
1867 );
1868
1869 assert!(op.next().unwrap().is_none());
1870 }
1871
1872 #[test]
1875 fn test_remove_label() {
1876 let store = create_test_store();
1877
1878 let node = store.create_node(&["Person", "Employee"]);
1879
1880 let mut op = RemoveLabelOperator::new(
1881 Arc::clone(&store),
1882 MockInput::boxed(node_id_chunk(&[node])),
1883 0,
1884 vec!["Employee".to_string()],
1885 vec![LogicalType::Int64, LogicalType::Int64],
1886 );
1887
1888 let chunk = op.next().unwrap().unwrap();
1889 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1890 assert_eq!(updated, 1);
1891
1892 let node_data = store.get_node(node).unwrap();
1894 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1895 assert!(labels.contains(&"Person"));
1896 assert!(!labels.contains(&"Employee"));
1897 }
1898
1899 #[test]
1900 fn test_remove_nonexistent_label() {
1901 let store = create_test_store();
1902
1903 let node = store.create_node(&["Person"]);
1904
1905 let mut op = RemoveLabelOperator::new(
1906 Arc::clone(&store),
1907 MockInput::boxed(node_id_chunk(&[node])),
1908 0,
1909 vec!["NonExistent".to_string()],
1910 vec![LogicalType::Int64, LogicalType::Int64],
1911 );
1912
1913 let chunk = op.next().unwrap().unwrap();
1914 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1915 assert_eq!(updated, 0); }
1917
1918 #[test]
1921 fn test_set_node_property_constant() {
1922 let store = create_test_store();
1923
1924 let node = store.create_node(&["Person"]);
1925
1926 let mut op = SetPropertyOperator::new_for_node(
1927 Arc::clone(&store),
1928 MockInput::boxed(node_id_chunk(&[node])),
1929 0,
1930 vec![(
1931 "name".to_string(),
1932 PropertySource::Constant(Value::String("Alix".into())),
1933 )],
1934 vec![LogicalType::Int64],
1935 );
1936
1937 let chunk = op.next().unwrap().unwrap();
1938 assert_eq!(chunk.row_count(), 1);
1939
1940 let node_data = store.get_node(node).unwrap();
1942 assert_eq!(
1943 node_data
1944 .properties
1945 .get(&grafeo_common::types::PropertyKey::new("name")),
1946 Some(&Value::String("Alix".into()))
1947 );
1948 }
1949
1950 #[test]
1951 fn test_set_node_property_from_column() {
1952 let store = create_test_store();
1953
1954 let node = store.create_node(&["Person"]);
1955
1956 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1958 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1959 builder
1960 .column_mut(1)
1961 .unwrap()
1962 .push_value(Value::String("Gus".into()));
1963 builder.advance_row();
1964
1965 let mut op = SetPropertyOperator::new_for_node(
1966 Arc::clone(&store),
1967 MockInput::boxed(builder.finish()),
1968 0,
1969 vec![("name".to_string(), PropertySource::Column(1))],
1970 vec![LogicalType::Int64, LogicalType::String],
1971 );
1972
1973 let chunk = op.next().unwrap().unwrap();
1974 assert_eq!(chunk.row_count(), 1);
1975
1976 let node_data = store.get_node(node).unwrap();
1977 assert_eq!(
1978 node_data
1979 .properties
1980 .get(&grafeo_common::types::PropertyKey::new("name")),
1981 Some(&Value::String("Gus".into()))
1982 );
1983 }
1984
1985 #[test]
1986 fn test_set_edge_property() {
1987 let store = create_test_store();
1988
1989 let n1 = store.create_node(&["N"]);
1990 let n2 = store.create_node(&["N"]);
1991 let eid = store.create_edge(n1, n2, "KNOWS");
1992
1993 let mut op = SetPropertyOperator::new_for_edge(
1994 Arc::clone(&store),
1995 MockInput::boxed(edge_id_chunk(&[eid])),
1996 0,
1997 vec![(
1998 "weight".to_string(),
1999 PropertySource::Constant(Value::Float64(0.75)),
2000 )],
2001 vec![LogicalType::Int64],
2002 );
2003
2004 let chunk = op.next().unwrap().unwrap();
2005 assert_eq!(chunk.row_count(), 1);
2006
2007 let edge_data = store.get_edge(eid).unwrap();
2008 assert_eq!(
2009 edge_data
2010 .properties
2011 .get(&grafeo_common::types::PropertyKey::new("weight")),
2012 Some(&Value::Float64(0.75))
2013 );
2014 }
2015
2016 #[test]
2017 fn test_set_multiple_properties() {
2018 let store = create_test_store();
2019
2020 let node = store.create_node(&["Person"]);
2021
2022 let mut op = SetPropertyOperator::new_for_node(
2023 Arc::clone(&store),
2024 MockInput::boxed(node_id_chunk(&[node])),
2025 0,
2026 vec![
2027 (
2028 "name".to_string(),
2029 PropertySource::Constant(Value::String("Alix".into())),
2030 ),
2031 (
2032 "age".to_string(),
2033 PropertySource::Constant(Value::Int64(30)),
2034 ),
2035 ],
2036 vec![LogicalType::Int64],
2037 );
2038
2039 op.next().unwrap().unwrap();
2040
2041 let node_data = store.get_node(node).unwrap();
2042 assert_eq!(
2043 node_data
2044 .properties
2045 .get(&grafeo_common::types::PropertyKey::new("name")),
2046 Some(&Value::String("Alix".into()))
2047 );
2048 assert_eq!(
2049 node_data
2050 .properties
2051 .get(&grafeo_common::types::PropertyKey::new("age")),
2052 Some(&Value::Int64(30))
2053 );
2054 }
2055
2056 #[test]
2057 fn test_set_property_no_input_returns_none() {
2058 let store = create_test_store();
2059
2060 let mut op = SetPropertyOperator::new_for_node(
2061 Arc::clone(&store),
2062 Box::new(EmptyInput),
2063 0,
2064 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2065 vec![LogicalType::Int64],
2066 );
2067
2068 assert!(op.next().unwrap().is_none());
2069 }
2070
2071 #[test]
2074 fn test_delete_node_without_detach_errors_when_edges_exist() {
2075 let store = create_test_store();
2076
2077 let n1 = store.create_node(&["Person"]);
2078 let n2 = store.create_node(&["Person"]);
2079 store.create_edge(n1, n2, "KNOWS");
2080
2081 let mut op = DeleteNodeOperator::new(
2082 Arc::clone(&store),
2083 MockInput::boxed(node_id_chunk(&[n1])),
2084 0,
2085 vec![LogicalType::Int64],
2086 false, );
2088
2089 let err = op.next().unwrap_err();
2090 match err {
2091 OperatorError::ConstraintViolation(msg) => {
2092 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2093 }
2094 other => panic!("expected ConstraintViolation, got {other:?}"),
2095 }
2096 assert_eq!(store.node_count(), 2);
2098 }
2099
2100 #[test]
2103 fn test_create_node_with_input_operator() {
2104 let store = create_test_store();
2105
2106 let existing = store.create_node(&["Seed"]);
2108
2109 let mut op = CreateNodeOperator::new(
2110 Arc::clone(&store),
2111 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2112 vec!["Created".to_string()],
2113 vec![(
2114 "source".to_string(),
2115 PropertySource::Constant(Value::String("from_input".into())),
2116 )],
2117 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2120
2121 let chunk = op.next().unwrap().unwrap();
2122 assert_eq!(chunk.row_count(), 1);
2123
2124 assert_eq!(store.node_count(), 2);
2126
2127 assert!(op.next().unwrap().is_none());
2129 }
2130
2131 #[test]
2134 fn test_create_edge_with_properties_and_output_column() {
2135 let store = create_test_store();
2136
2137 let n1 = store.create_node(&["Person"]);
2138 let n2 = store.create_node(&["Person"]);
2139
2140 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2141 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2142 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2143 builder.advance_row();
2144
2145 let mut op = CreateEdgeOperator::new(
2146 Arc::clone(&store),
2147 MockInput::boxed(builder.finish()),
2148 0,
2149 1,
2150 "KNOWS".to_string(),
2151 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2152 )
2153 .with_properties(vec![(
2154 "since".to_string(),
2155 PropertySource::Constant(Value::Int64(2024)),
2156 )])
2157 .with_output_column(2);
2158
2159 let chunk = op.next().unwrap().unwrap();
2160 assert_eq!(chunk.row_count(), 1);
2161 assert_eq!(store.edge_count(), 1);
2162
2163 let edge_id_raw = chunk
2165 .column(2)
2166 .and_then(|c| c.get_int64(0))
2167 .expect("edge ID should be in output column 2");
2168 let edge_id = EdgeId(edge_id_raw as u64);
2169
2170 let edge = store.get_edge(edge_id).expect("edge should exist");
2172 assert_eq!(
2173 edge.properties
2174 .get(&grafeo_common::types::PropertyKey::new("since")),
2175 Some(&Value::Int64(2024))
2176 );
2177 }
2178
2179 #[test]
2182 fn test_set_property_map_replace() {
2183 use std::collections::BTreeMap;
2184
2185 let store = create_test_store();
2186
2187 let node = store.create_node(&["Person"]);
2188 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2189
2190 let mut map = BTreeMap::new();
2191 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2192
2193 let mut op = SetPropertyOperator::new_for_node(
2194 Arc::clone(&store),
2195 MockInput::boxed(node_id_chunk(&[node])),
2196 0,
2197 vec![(
2198 "*".to_string(),
2199 PropertySource::Constant(Value::Map(Arc::new(map))),
2200 )],
2201 vec![LogicalType::Int64],
2202 )
2203 .with_replace(true);
2204
2205 op.next().unwrap().unwrap();
2206
2207 let node_data = store.get_node(node).unwrap();
2208 assert!(
2210 node_data
2211 .properties
2212 .get(&PropertyKey::new("old_prop"))
2213 .is_none()
2214 );
2215 assert_eq!(
2217 node_data.properties.get(&PropertyKey::new("new_key")),
2218 Some(&Value::String("new_val".into()))
2219 );
2220 }
2221
2222 #[test]
2225 fn test_set_property_map_merge() {
2226 use std::collections::BTreeMap;
2227
2228 let store = create_test_store();
2229
2230 let node = store.create_node(&["Person"]);
2231 store.set_node_property(node, "existing", Value::Int64(42));
2232
2233 let mut map = BTreeMap::new();
2234 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2235
2236 let mut op = SetPropertyOperator::new_for_node(
2237 Arc::clone(&store),
2238 MockInput::boxed(node_id_chunk(&[node])),
2239 0,
2240 vec![(
2241 "*".to_string(),
2242 PropertySource::Constant(Value::Map(Arc::new(map))),
2243 )],
2244 vec![LogicalType::Int64],
2245 ); op.next().unwrap().unwrap();
2248
2249 let node_data = store.get_node(node).unwrap();
2250 assert_eq!(
2252 node_data.properties.get(&PropertyKey::new("existing")),
2253 Some(&Value::Int64(42))
2254 );
2255 assert_eq!(
2257 node_data.properties.get(&PropertyKey::new("added")),
2258 Some(&Value::String("hello".into()))
2259 );
2260 }
2261
2262 #[test]
2265 fn test_property_source_property_access() {
2266 let store = create_test_store();
2267
2268 let source_node = store.create_node(&["Source"]);
2269 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2270
2271 let target_node = store.create_node(&["Target"]);
2272
2273 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2275 builder.column_mut(0).unwrap().push_node_id(source_node);
2276 builder
2277 .column_mut(1)
2278 .unwrap()
2279 .push_int64(target_node.0 as i64);
2280 builder.advance_row();
2281
2282 let mut op = SetPropertyOperator::new_for_node(
2283 Arc::clone(&store),
2284 MockInput::boxed(builder.finish()),
2285 1, vec![(
2287 "copied_name".to_string(),
2288 PropertySource::PropertyAccess {
2289 column: 0,
2290 property: "name".to_string(),
2291 },
2292 )],
2293 vec![LogicalType::Node, LogicalType::Int64],
2294 );
2295
2296 op.next().unwrap().unwrap();
2297
2298 let target_data = store.get_node(target_node).unwrap();
2299 assert_eq!(
2300 target_data.properties.get(&PropertyKey::new("copied_name")),
2301 Some(&Value::String("Alix".into()))
2302 );
2303 }
2304
2305 #[test]
2308 fn test_create_node_with_constraint_validator() {
2309 let store = create_test_store();
2310
2311 struct RejectAgeValidator;
2312 impl ConstraintValidator for RejectAgeValidator {
2313 fn validate_node_property(
2314 &self,
2315 _labels: &[String],
2316 key: &str,
2317 _value: &Value,
2318 ) -> Result<(), OperatorError> {
2319 if key == "forbidden" {
2320 return Err(OperatorError::ConstraintViolation(
2321 "property 'forbidden' is not allowed".to_string(),
2322 ));
2323 }
2324 Ok(())
2325 }
2326 fn validate_node_complete(
2327 &self,
2328 _labels: &[String],
2329 _properties: &[(String, Value)],
2330 ) -> Result<(), OperatorError> {
2331 Ok(())
2332 }
2333 fn check_unique_node_property(
2334 &self,
2335 _labels: &[String],
2336 _key: &str,
2337 _value: &Value,
2338 ) -> Result<(), OperatorError> {
2339 Ok(())
2340 }
2341 fn validate_edge_property(
2342 &self,
2343 _edge_type: &str,
2344 _key: &str,
2345 _value: &Value,
2346 ) -> Result<(), OperatorError> {
2347 Ok(())
2348 }
2349 fn validate_edge_complete(
2350 &self,
2351 _edge_type: &str,
2352 _properties: &[(String, Value)],
2353 ) -> Result<(), OperatorError> {
2354 Ok(())
2355 }
2356 }
2357
2358 let mut op = CreateNodeOperator::new(
2360 Arc::clone(&store),
2361 None,
2362 vec!["Thing".to_string()],
2363 vec![(
2364 "name".to_string(),
2365 PropertySource::Constant(Value::String("ok".into())),
2366 )],
2367 vec![LogicalType::Int64],
2368 0,
2369 )
2370 .with_validator(Arc::new(RejectAgeValidator));
2371
2372 assert!(op.next().is_ok());
2373 assert_eq!(store.node_count(), 1);
2374
2375 let mut op = CreateNodeOperator::new(
2377 Arc::clone(&store),
2378 None,
2379 vec!["Thing".to_string()],
2380 vec![(
2381 "forbidden".to_string(),
2382 PropertySource::Constant(Value::Int64(1)),
2383 )],
2384 vec![LogicalType::Int64],
2385 0,
2386 )
2387 .with_validator(Arc::new(RejectAgeValidator));
2388
2389 let err = op.next().unwrap_err();
2390 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2391 }
2394
2395 #[test]
2398 fn test_create_node_reset_allows_re_execution() {
2399 let store = create_test_store();
2400
2401 let mut op = CreateNodeOperator::new(
2402 Arc::clone(&store),
2403 None,
2404 vec!["Person".to_string()],
2405 vec![],
2406 vec![LogicalType::Int64],
2407 0,
2408 );
2409
2410 assert!(op.next().unwrap().is_some());
2412 assert!(op.next().unwrap().is_none());
2413
2414 op.reset();
2416 assert!(op.next().unwrap().is_some());
2417
2418 assert_eq!(store.node_count(), 2);
2419 }
2420
2421 #[test]
2424 fn test_operator_names() {
2425 let store = create_test_store();
2426
2427 let op = CreateNodeOperator::new(
2428 Arc::clone(&store),
2429 None,
2430 vec![],
2431 vec![],
2432 vec![LogicalType::Int64],
2433 0,
2434 );
2435 assert_eq!(op.name(), "CreateNode");
2436
2437 let op = CreateEdgeOperator::new(
2438 Arc::clone(&store),
2439 Box::new(EmptyInput),
2440 0,
2441 1,
2442 "R".to_string(),
2443 vec![LogicalType::Int64],
2444 );
2445 assert_eq!(op.name(), "CreateEdge");
2446
2447 let op = DeleteNodeOperator::new(
2448 Arc::clone(&store),
2449 Box::new(EmptyInput),
2450 0,
2451 vec![LogicalType::Int64],
2452 false,
2453 );
2454 assert_eq!(op.name(), "DeleteNode");
2455
2456 let op = DeleteEdgeOperator::new(
2457 Arc::clone(&store),
2458 Box::new(EmptyInput),
2459 0,
2460 vec![LogicalType::Int64],
2461 );
2462 assert_eq!(op.name(), "DeleteEdge");
2463
2464 let op = AddLabelOperator::new(
2465 Arc::clone(&store),
2466 Box::new(EmptyInput),
2467 0,
2468 vec!["L".to_string()],
2469 vec![LogicalType::Int64],
2470 );
2471 assert_eq!(op.name(), "AddLabel");
2472
2473 let op = RemoveLabelOperator::new(
2474 Arc::clone(&store),
2475 Box::new(EmptyInput),
2476 0,
2477 vec!["L".to_string()],
2478 vec![LogicalType::Int64],
2479 );
2480 assert_eq!(op.name(), "RemoveLabel");
2481
2482 let op = SetPropertyOperator::new_for_node(
2483 Arc::clone(&store),
2484 Box::new(EmptyInput),
2485 0,
2486 vec![],
2487 vec![LogicalType::Int64],
2488 );
2489 assert_eq!(op.name(), "SetProperty");
2490 }
2491}