1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
28 &self,
29 labels: &[String],
30 key: &str,
31 value: &Value,
32 ) -> Result<(), OperatorError>;
33
34 fn validate_node_complete(
38 &self,
39 labels: &[String],
40 properties: &[(String, Value)],
41 ) -> Result<(), OperatorError>;
42
43 fn check_unique_node_property(
47 &self,
48 labels: &[String],
49 key: &str,
50 value: &Value,
51 ) -> Result<(), OperatorError>;
52
53 fn validate_edge_property(
55 &self,
56 edge_type: &str,
57 key: &str,
58 value: &Value,
59 ) -> Result<(), OperatorError>;
60
61 fn validate_edge_complete(
63 &self,
64 edge_type: &str,
65 properties: &[(String, Value)],
66 ) -> Result<(), OperatorError>;
67
68 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70 let _ = labels;
71 Ok(())
72 }
73
74 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76 let _ = edge_type;
77 Ok(())
78 }
79
80 fn validate_edge_endpoints(
82 &self,
83 edge_type: &str,
84 source_labels: &[String],
85 target_labels: &[String],
86 ) -> Result<(), OperatorError> {
87 let _ = (edge_type, source_labels, target_labels);
88 Ok(())
89 }
90
91 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94 let _ = (labels, properties);
95 }
96}
97
98pub struct CreateNodeOperator {
103 store: Arc<dyn GraphStoreMut>,
105 input: Option<Box<dyn Operator>>,
107 labels: Vec<String>,
109 properties: Vec<(String, PropertySource)>,
111 output_schema: Vec<LogicalType>,
113 output_column: usize,
115 executed: bool,
117 viewing_epoch: Option<EpochId>,
119 transaction_id: Option<TransactionId>,
121 validator: Option<Arc<dyn ConstraintValidator>>,
123 write_tracker: Option<SharedWriteTracker>,
125}
126
127#[derive(Debug, Clone)]
129pub enum PropertySource {
130 Column(usize),
132 Constant(Value),
134 PropertyAccess {
136 column: usize,
138 property: String,
140 },
141}
142
143impl PropertySource {
144 pub fn resolve(
146 &self,
147 chunk: &crate::execution::chunk::DataChunk,
148 row: usize,
149 store: &dyn GraphStore,
150 ) -> Value {
151 match self {
152 PropertySource::Column(col_idx) => chunk
153 .column(*col_idx)
154 .and_then(|c| c.get_value(row))
155 .unwrap_or(Value::Null),
156 PropertySource::Constant(v) => v.clone(),
157 PropertySource::PropertyAccess { column, property } => {
158 let Some(col) = chunk.column(*column) else {
159 return Value::Null;
160 };
161 if let Some(node_id) = col.get_node_id(row) {
163 store
164 .get_node(node_id)
165 .and_then(|node| node.get_property(property).cloned())
166 .unwrap_or(Value::Null)
167 } else if let Some(edge_id) = col.get_edge_id(row) {
168 store
169 .get_edge(edge_id)
170 .and_then(|edge| edge.get_property(property).cloned())
171 .unwrap_or(Value::Null)
172 } else if let Some(Value::Map(map)) = col.get_value(row) {
173 let key = PropertyKey::new(property);
174 map.get(&key).cloned().unwrap_or(Value::Null)
175 } else {
176 Value::Null
177 }
178 }
179 }
180 }
181}
182
183impl CreateNodeOperator {
184 pub fn new(
194 store: Arc<dyn GraphStoreMut>,
195 input: Option<Box<dyn Operator>>,
196 labels: Vec<String>,
197 properties: Vec<(String, PropertySource)>,
198 output_schema: Vec<LogicalType>,
199 output_column: usize,
200 ) -> Self {
201 Self {
202 store,
203 input,
204 labels,
205 properties,
206 output_schema,
207 output_column,
208 executed: false,
209 viewing_epoch: None,
210 transaction_id: None,
211 validator: None,
212 write_tracker: None,
213 }
214 }
215
216 pub fn with_transaction_context(
218 mut self,
219 epoch: EpochId,
220 transaction_id: Option<TransactionId>,
221 ) -> Self {
222 self.viewing_epoch = Some(epoch);
223 self.transaction_id = transaction_id;
224 self
225 }
226
227 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229 self.validator = Some(validator);
230 self
231 }
232
233 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235 self.write_tracker = Some(tracker);
236 self
237 }
238}
239
240impl CreateNodeOperator {
241 fn validate_and_set_properties(
243 &self,
244 node_id: NodeId,
245 resolved_props: &mut Vec<(String, Value)>,
246 ) -> Result<(), OperatorError> {
247 if let Some(ref validator) = self.validator {
249 validator.validate_node_labels_allowed(&self.labels)?;
250 }
251
252 if let Some(ref validator) = self.validator {
254 validator.inject_defaults(&self.labels, resolved_props);
255 }
256
257 if let Some(ref validator) = self.validator {
259 for (name, value) in resolved_props.iter() {
260 validator.validate_node_property(&self.labels, name, value)?;
261 validator.check_unique_node_property(&self.labels, name, value)?;
262 }
263 validator.validate_node_complete(&self.labels, resolved_props)?;
265 }
266
267 for (name, value) in resolved_props.iter() {
269 self.store.set_node_property(node_id, name, value.clone());
270 }
271 Ok(())
272 }
273}
274
275impl Operator for CreateNodeOperator {
276 fn next(&mut self) -> OperatorResult {
277 let epoch = self
279 .viewing_epoch
280 .unwrap_or_else(|| self.store.current_epoch());
281 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
282
283 if let Some(ref mut input) = self.input {
284 if let Some(chunk) = input.next()? {
286 let mut builder =
287 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
288
289 for row in chunk.selected_indices() {
290 let mut resolved_props: Vec<(String, Value)> = self
292 .properties
293 .iter()
294 .map(|(name, source)| {
295 let value =
296 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
297 (name.clone(), value)
298 })
299 .collect();
300
301 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
303 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
304
305 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
307 tracker.record_node_write(tid, node_id);
308 }
309
310 self.validate_and_set_properties(node_id, &mut resolved_props)?;
312
313 for col_idx in 0..chunk.column_count() {
315 if col_idx < self.output_column
316 && let (Some(src), Some(dst)) =
317 (chunk.column(col_idx), builder.column_mut(col_idx))
318 {
319 if let Some(val) = src.get_value(row) {
320 dst.push_value(val);
321 } else {
322 dst.push_value(Value::Null);
323 }
324 }
325 }
326
327 if let Some(dst) = builder.column_mut(self.output_column) {
329 dst.push_value(Value::Int64(node_id.0 as i64));
330 }
331
332 builder.advance_row();
333 }
334
335 return Ok(Some(builder.finish()));
336 }
337 Ok(None)
338 } else {
339 if self.executed {
341 return Ok(None);
342 }
343 self.executed = true;
344
345 let mut resolved_props: Vec<(String, Value)> = self
347 .properties
348 .iter()
349 .filter_map(|(name, source)| {
350 if let PropertySource::Constant(value) = source {
351 Some((name.clone(), value.clone()))
352 } else {
353 None
354 }
355 })
356 .collect();
357
358 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
360 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
361
362 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
364 tracker.record_node_write(tid, node_id);
365 }
366
367 self.validate_and_set_properties(node_id, &mut resolved_props)?;
369
370 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
372 if let Some(dst) = builder.column_mut(self.output_column) {
373 dst.push_value(Value::Int64(node_id.0 as i64));
374 }
375 builder.advance_row();
376
377 Ok(Some(builder.finish()))
378 }
379 }
380
381 fn reset(&mut self) {
382 if let Some(ref mut input) = self.input {
383 input.reset();
384 }
385 self.executed = false;
386 }
387
388 fn name(&self) -> &'static str {
389 "CreateNode"
390 }
391}
392
393pub struct CreateEdgeOperator {
395 store: Arc<dyn GraphStoreMut>,
397 input: Box<dyn Operator>,
399 from_column: usize,
401 to_column: usize,
403 edge_type: String,
405 properties: Vec<(String, PropertySource)>,
407 output_schema: Vec<LogicalType>,
409 output_column: Option<usize>,
411 viewing_epoch: Option<EpochId>,
413 transaction_id: Option<TransactionId>,
415 validator: Option<Arc<dyn ConstraintValidator>>,
417 write_tracker: Option<SharedWriteTracker>,
419}
420
421impl CreateEdgeOperator {
422 pub fn new(
429 store: Arc<dyn GraphStoreMut>,
430 input: Box<dyn Operator>,
431 from_column: usize,
432 to_column: usize,
433 edge_type: String,
434 output_schema: Vec<LogicalType>,
435 ) -> Self {
436 Self {
437 store,
438 input,
439 from_column,
440 to_column,
441 edge_type,
442 properties: Vec::new(),
443 output_schema,
444 output_column: None,
445 viewing_epoch: None,
446 transaction_id: None,
447 validator: None,
448 write_tracker: None,
449 }
450 }
451
452 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
454 self.properties = properties;
455 self
456 }
457
458 pub fn with_output_column(mut self, column: usize) -> Self {
460 self.output_column = Some(column);
461 self
462 }
463
464 pub fn with_transaction_context(
466 mut self,
467 epoch: EpochId,
468 transaction_id: Option<TransactionId>,
469 ) -> Self {
470 self.viewing_epoch = Some(epoch);
471 self.transaction_id = transaction_id;
472 self
473 }
474
475 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
477 self.validator = Some(validator);
478 self
479 }
480
481 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
483 self.write_tracker = Some(tracker);
484 self
485 }
486}
487
488impl Operator for CreateEdgeOperator {
489 fn next(&mut self) -> OperatorResult {
490 let epoch = self
492 .viewing_epoch
493 .unwrap_or_else(|| self.store.current_epoch());
494 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
495
496 if let Some(chunk) = self.input.next()? {
497 let mut builder =
498 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
499
500 for row in chunk.selected_indices() {
501 let from_id = chunk
503 .column(self.from_column)
504 .and_then(|c| c.get_value(row))
505 .ok_or_else(|| {
506 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
507 })?;
508
509 let to_id = chunk
510 .column(self.to_column)
511 .and_then(|c| c.get_value(row))
512 .ok_or_else(|| {
513 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
514 })?;
515
516 let from_node_id = match from_id {
518 Value::Int64(id) => NodeId(id as u64),
519 _ => {
520 return Err(OperatorError::TypeMismatch {
521 expected: "Int64 (node ID)".to_string(),
522 found: format!("{from_id:?}"),
523 });
524 }
525 };
526
527 let to_node_id = match to_id {
528 Value::Int64(id) => NodeId(id as u64),
529 _ => {
530 return Err(OperatorError::TypeMismatch {
531 expected: "Int64 (node ID)".to_string(),
532 found: format!("{to_id:?}"),
533 });
534 }
535 };
536
537 if let Some(ref validator) = self.validator {
539 validator.validate_edge_type_allowed(&self.edge_type)?;
540
541 let source_labels: Vec<String> = self
543 .store
544 .get_node(from_node_id)
545 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
546 .unwrap_or_default();
547 let target_labels: Vec<String> = self
548 .store
549 .get_node(to_node_id)
550 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
551 .unwrap_or_default();
552 validator.validate_edge_endpoints(
553 &self.edge_type,
554 &source_labels,
555 &target_labels,
556 )?;
557 }
558
559 let resolved_props: Vec<(String, Value)> = self
561 .properties
562 .iter()
563 .map(|(name, source)| {
564 let value =
565 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
566 (name.clone(), value)
567 })
568 .collect();
569
570 if let Some(ref validator) = self.validator {
572 for (name, value) in &resolved_props {
573 validator.validate_edge_property(&self.edge_type, name, value)?;
574 }
575 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
576 }
577
578 let edge_id = self.store.create_edge_versioned(
580 from_node_id,
581 to_node_id,
582 &self.edge_type,
583 epoch,
584 tx,
585 );
586
587 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
589 tracker.record_edge_write(tid, edge_id);
590 }
591
592 for (name, value) in resolved_props {
594 self.store.set_edge_property(edge_id, &name, value);
595 }
596
597 for col_idx in 0..chunk.column_count() {
599 if let (Some(src), Some(dst)) =
600 (chunk.column(col_idx), builder.column_mut(col_idx))
601 {
602 if let Some(val) = src.get_value(row) {
603 dst.push_value(val);
604 } else {
605 dst.push_value(Value::Null);
606 }
607 }
608 }
609
610 if let Some(out_col) = self.output_column
612 && let Some(dst) = builder.column_mut(out_col)
613 {
614 dst.push_value(Value::Int64(edge_id.0 as i64));
615 }
616
617 builder.advance_row();
618 }
619
620 return Ok(Some(builder.finish()));
621 }
622 Ok(None)
623 }
624
625 fn reset(&mut self) {
626 self.input.reset();
627 }
628
629 fn name(&self) -> &'static str {
630 "CreateEdge"
631 }
632}
633
634pub struct DeleteNodeOperator {
636 store: Arc<dyn GraphStoreMut>,
638 input: Box<dyn Operator>,
640 node_column: usize,
642 output_schema: Vec<LogicalType>,
644 detach: bool,
646 viewing_epoch: Option<EpochId>,
648 transaction_id: Option<TransactionId>,
650 write_tracker: Option<SharedWriteTracker>,
652}
653
654impl DeleteNodeOperator {
655 pub fn new(
657 store: Arc<dyn GraphStoreMut>,
658 input: Box<dyn Operator>,
659 node_column: usize,
660 output_schema: Vec<LogicalType>,
661 detach: bool,
662 ) -> Self {
663 Self {
664 store,
665 input,
666 node_column,
667 output_schema,
668 detach,
669 viewing_epoch: None,
670 transaction_id: None,
671 write_tracker: None,
672 }
673 }
674
675 pub fn with_transaction_context(
677 mut self,
678 epoch: EpochId,
679 transaction_id: Option<TransactionId>,
680 ) -> Self {
681 self.viewing_epoch = Some(epoch);
682 self.transaction_id = transaction_id;
683 self
684 }
685
686 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
688 self.write_tracker = Some(tracker);
689 self
690 }
691}
692
693impl Operator for DeleteNodeOperator {
694 fn next(&mut self) -> OperatorResult {
695 let epoch = self
697 .viewing_epoch
698 .unwrap_or_else(|| self.store.current_epoch());
699 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
700
701 if let Some(chunk) = self.input.next()? {
702 let mut builder =
703 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
704
705 for row in chunk.selected_indices() {
706 let node_val = chunk
707 .column(self.node_column)
708 .and_then(|c| c.get_value(row))
709 .ok_or_else(|| {
710 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
711 })?;
712
713 let node_id = match node_val {
714 Value::Int64(id) => NodeId(id as u64),
715 _ => {
716 return Err(OperatorError::TypeMismatch {
717 expected: "Int64 (node ID)".to_string(),
718 found: format!("{node_val:?}"),
719 });
720 }
721 };
722
723 if self.detach {
724 let outgoing = self
727 .store
728 .edges_from(node_id, crate::graph::Direction::Outgoing);
729 let incoming = self
730 .store
731 .edges_from(node_id, crate::graph::Direction::Incoming);
732 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
733 self.store.delete_edge_versioned(edge_id, epoch, tx);
734 if let (Some(tracker), Some(tid)) =
735 (&self.write_tracker, self.transaction_id)
736 {
737 tracker.record_edge_write(tid, edge_id);
738 }
739 }
740 } else {
741 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
743 if degree > 0 {
744 return Err(OperatorError::ConstraintViolation(format!(
745 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
746 degree
747 )));
748 }
749 }
750
751 self.store.delete_node_versioned(node_id, epoch, tx);
753
754 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
756 tracker.record_node_write(tid, node_id);
757 }
758
759 for col_idx in 0..chunk.column_count() {
762 if let (Some(src), Some(dst)) =
763 (chunk.column(col_idx), builder.column_mut(col_idx))
764 {
765 if let Some(val) = src.get_value(row) {
766 dst.push_value(val);
767 } else {
768 dst.push_value(Value::Null);
769 }
770 }
771 }
772 builder.advance_row();
773 }
774
775 return Ok(Some(builder.finish()));
776 }
777 Ok(None)
778 }
779
780 fn reset(&mut self) {
781 self.input.reset();
782 }
783
784 fn name(&self) -> &'static str {
785 "DeleteNode"
786 }
787}
788
789pub struct DeleteEdgeOperator {
791 store: Arc<dyn GraphStoreMut>,
793 input: Box<dyn Operator>,
795 edge_column: usize,
797 output_schema: Vec<LogicalType>,
799 viewing_epoch: Option<EpochId>,
801 transaction_id: Option<TransactionId>,
803 write_tracker: Option<SharedWriteTracker>,
805}
806
807impl DeleteEdgeOperator {
808 pub fn new(
810 store: Arc<dyn GraphStoreMut>,
811 input: Box<dyn Operator>,
812 edge_column: usize,
813 output_schema: Vec<LogicalType>,
814 ) -> Self {
815 Self {
816 store,
817 input,
818 edge_column,
819 output_schema,
820 viewing_epoch: None,
821 transaction_id: None,
822 write_tracker: None,
823 }
824 }
825
826 pub fn with_transaction_context(
828 mut self,
829 epoch: EpochId,
830 transaction_id: Option<TransactionId>,
831 ) -> Self {
832 self.viewing_epoch = Some(epoch);
833 self.transaction_id = transaction_id;
834 self
835 }
836
837 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
839 self.write_tracker = Some(tracker);
840 self
841 }
842}
843
844impl Operator for DeleteEdgeOperator {
845 fn next(&mut self) -> OperatorResult {
846 let epoch = self
848 .viewing_epoch
849 .unwrap_or_else(|| self.store.current_epoch());
850 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
851
852 if let Some(chunk) = self.input.next()? {
853 let mut builder =
854 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
855
856 for row in chunk.selected_indices() {
857 let edge_val = chunk
858 .column(self.edge_column)
859 .and_then(|c| c.get_value(row))
860 .ok_or_else(|| {
861 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
862 })?;
863
864 let edge_id = match edge_val {
865 Value::Int64(id) => EdgeId(id as u64),
866 _ => {
867 return Err(OperatorError::TypeMismatch {
868 expected: "Int64 (edge ID)".to_string(),
869 found: format!("{edge_val:?}"),
870 });
871 }
872 };
873
874 self.store.delete_edge_versioned(edge_id, epoch, tx);
876
877 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
879 tracker.record_edge_write(tid, edge_id);
880 }
881
882 for col_idx in 0..chunk.column_count() {
884 if let (Some(src), Some(dst)) =
885 (chunk.column(col_idx), builder.column_mut(col_idx))
886 {
887 if let Some(val) = src.get_value(row) {
888 dst.push_value(val);
889 } else {
890 dst.push_value(Value::Null);
891 }
892 }
893 }
894 builder.advance_row();
895 }
896
897 return Ok(Some(builder.finish()));
898 }
899 Ok(None)
900 }
901
902 fn reset(&mut self) {
903 self.input.reset();
904 }
905
906 fn name(&self) -> &'static str {
907 "DeleteEdge"
908 }
909}
910
911pub struct AddLabelOperator {
913 store: Arc<dyn GraphStoreMut>,
915 input: Box<dyn Operator>,
917 node_column: usize,
919 labels: Vec<String>,
921 output_schema: Vec<LogicalType>,
923 viewing_epoch: Option<EpochId>,
925 transaction_id: Option<TransactionId>,
927 write_tracker: Option<SharedWriteTracker>,
929}
930
931impl AddLabelOperator {
932 pub fn new(
934 store: Arc<dyn GraphStoreMut>,
935 input: Box<dyn Operator>,
936 node_column: usize,
937 labels: Vec<String>,
938 output_schema: Vec<LogicalType>,
939 ) -> Self {
940 Self {
941 store,
942 input,
943 node_column,
944 labels,
945 output_schema,
946 viewing_epoch: None,
947 transaction_id: None,
948 write_tracker: None,
949 }
950 }
951
952 pub fn with_transaction_context(
954 mut self,
955 epoch: EpochId,
956 transaction_id: Option<TransactionId>,
957 ) -> Self {
958 self.viewing_epoch = Some(epoch);
959 self.transaction_id = transaction_id;
960 self
961 }
962
963 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
965 self.write_tracker = Some(tracker);
966 self
967 }
968}
969
970impl Operator for AddLabelOperator {
971 fn next(&mut self) -> OperatorResult {
972 if let Some(chunk) = self.input.next()? {
973 let mut updated_count = 0;
974
975 for row in chunk.selected_indices() {
976 let node_val = chunk
977 .column(self.node_column)
978 .and_then(|c| c.get_value(row))
979 .ok_or_else(|| {
980 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
981 })?;
982
983 let node_id = match node_val {
984 Value::Int64(id) => NodeId(id as u64),
985 _ => {
986 return Err(OperatorError::TypeMismatch {
987 expected: "Int64 (node ID)".to_string(),
988 found: format!("{node_val:?}"),
989 });
990 }
991 };
992
993 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
995 tracker.record_node_write(tid, node_id);
996 }
997
998 for label in &self.labels {
1000 let added = if let Some(tid) = self.transaction_id {
1001 self.store.add_label_versioned(node_id, label, tid)
1002 } else {
1003 self.store.add_label(node_id, label)
1004 };
1005 if added {
1006 updated_count += 1;
1007 }
1008 }
1009 }
1010
1011 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1013 if let Some(dst) = builder.column_mut(0) {
1014 dst.push_value(Value::Int64(updated_count));
1015 }
1016 builder.advance_row();
1017
1018 return Ok(Some(builder.finish()));
1019 }
1020 Ok(None)
1021 }
1022
1023 fn reset(&mut self) {
1024 self.input.reset();
1025 }
1026
1027 fn name(&self) -> &'static str {
1028 "AddLabel"
1029 }
1030}
1031
1032pub struct RemoveLabelOperator {
1034 store: Arc<dyn GraphStoreMut>,
1036 input: Box<dyn Operator>,
1038 node_column: usize,
1040 labels: Vec<String>,
1042 output_schema: Vec<LogicalType>,
1044 viewing_epoch: Option<EpochId>,
1046 transaction_id: Option<TransactionId>,
1048 write_tracker: Option<SharedWriteTracker>,
1050}
1051
1052impl RemoveLabelOperator {
1053 pub fn new(
1055 store: Arc<dyn GraphStoreMut>,
1056 input: Box<dyn Operator>,
1057 node_column: usize,
1058 labels: Vec<String>,
1059 output_schema: Vec<LogicalType>,
1060 ) -> Self {
1061 Self {
1062 store,
1063 input,
1064 node_column,
1065 labels,
1066 output_schema,
1067 viewing_epoch: None,
1068 transaction_id: None,
1069 write_tracker: None,
1070 }
1071 }
1072
1073 pub fn with_transaction_context(
1075 mut self,
1076 epoch: EpochId,
1077 transaction_id: Option<TransactionId>,
1078 ) -> Self {
1079 self.viewing_epoch = Some(epoch);
1080 self.transaction_id = transaction_id;
1081 self
1082 }
1083
1084 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1086 self.write_tracker = Some(tracker);
1087 self
1088 }
1089}
1090
1091impl Operator for RemoveLabelOperator {
1092 fn next(&mut self) -> OperatorResult {
1093 if let Some(chunk) = self.input.next()? {
1094 let mut updated_count = 0;
1095
1096 for row in chunk.selected_indices() {
1097 let node_val = chunk
1098 .column(self.node_column)
1099 .and_then(|c| c.get_value(row))
1100 .ok_or_else(|| {
1101 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1102 })?;
1103
1104 let node_id = match node_val {
1105 Value::Int64(id) => NodeId(id as u64),
1106 _ => {
1107 return Err(OperatorError::TypeMismatch {
1108 expected: "Int64 (node ID)".to_string(),
1109 found: format!("{node_val:?}"),
1110 });
1111 }
1112 };
1113
1114 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1116 tracker.record_node_write(tid, node_id);
1117 }
1118
1119 for label in &self.labels {
1121 let removed = if let Some(tid) = self.transaction_id {
1122 self.store.remove_label_versioned(node_id, label, tid)
1123 } else {
1124 self.store.remove_label(node_id, label)
1125 };
1126 if removed {
1127 updated_count += 1;
1128 }
1129 }
1130 }
1131
1132 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1134 if let Some(dst) = builder.column_mut(0) {
1135 dst.push_value(Value::Int64(updated_count));
1136 }
1137 builder.advance_row();
1138
1139 return Ok(Some(builder.finish()));
1140 }
1141 Ok(None)
1142 }
1143
1144 fn reset(&mut self) {
1145 self.input.reset();
1146 }
1147
1148 fn name(&self) -> &'static str {
1149 "RemoveLabel"
1150 }
1151}
1152
1153pub struct SetPropertyOperator {
1158 store: Arc<dyn GraphStoreMut>,
1160 input: Box<dyn Operator>,
1162 entity_column: usize,
1164 is_edge: bool,
1166 properties: Vec<(String, PropertySource)>,
1168 output_schema: Vec<LogicalType>,
1170 replace: bool,
1172 validator: Option<Arc<dyn ConstraintValidator>>,
1174 labels: Vec<String>,
1176 edge_type_name: Option<String>,
1178 viewing_epoch: Option<EpochId>,
1180 transaction_id: Option<TransactionId>,
1182 write_tracker: Option<SharedWriteTracker>,
1184}
1185
1186impl SetPropertyOperator {
1187 pub fn new_for_node(
1189 store: Arc<dyn GraphStoreMut>,
1190 input: Box<dyn Operator>,
1191 node_column: usize,
1192 properties: Vec<(String, PropertySource)>,
1193 output_schema: Vec<LogicalType>,
1194 ) -> Self {
1195 Self {
1196 store,
1197 input,
1198 entity_column: node_column,
1199 is_edge: false,
1200 properties,
1201 output_schema,
1202 replace: false,
1203 validator: None,
1204 labels: Vec::new(),
1205 edge_type_name: None,
1206 viewing_epoch: None,
1207 transaction_id: None,
1208 write_tracker: None,
1209 }
1210 }
1211
1212 pub fn new_for_edge(
1214 store: Arc<dyn GraphStoreMut>,
1215 input: Box<dyn Operator>,
1216 edge_column: usize,
1217 properties: Vec<(String, PropertySource)>,
1218 output_schema: Vec<LogicalType>,
1219 ) -> Self {
1220 Self {
1221 store,
1222 input,
1223 entity_column: edge_column,
1224 is_edge: true,
1225 properties,
1226 output_schema,
1227 replace: false,
1228 validator: None,
1229 labels: Vec::new(),
1230 edge_type_name: None,
1231 viewing_epoch: None,
1232 transaction_id: None,
1233 write_tracker: None,
1234 }
1235 }
1236
1237 pub fn with_replace(mut self, replace: bool) -> Self {
1239 self.replace = replace;
1240 self
1241 }
1242
1243 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1245 self.validator = Some(validator);
1246 self
1247 }
1248
1249 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1251 self.labels = labels;
1252 self
1253 }
1254
1255 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1257 self.edge_type_name = Some(edge_type);
1258 self
1259 }
1260
1261 pub fn with_transaction_context(
1266 mut self,
1267 epoch: EpochId,
1268 transaction_id: Option<TransactionId>,
1269 ) -> Self {
1270 self.viewing_epoch = Some(epoch);
1271 self.transaction_id = transaction_id;
1272 self
1273 }
1274
1275 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1277 self.write_tracker = Some(tracker);
1278 self
1279 }
1280}
1281
1282impl Operator for SetPropertyOperator {
1283 fn next(&mut self) -> OperatorResult {
1284 if let Some(chunk) = self.input.next()? {
1285 let mut builder =
1286 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1287
1288 for row in chunk.selected_indices() {
1289 let entity_val = chunk
1290 .column(self.entity_column)
1291 .and_then(|c| c.get_value(row))
1292 .ok_or_else(|| {
1293 OperatorError::ColumnNotFound(format!(
1294 "entity column {}",
1295 self.entity_column
1296 ))
1297 })?;
1298
1299 let entity_id = match entity_val {
1300 Value::Int64(id) => id as u64,
1301 _ => {
1302 return Err(OperatorError::TypeMismatch {
1303 expected: "Int64 (entity ID)".to_string(),
1304 found: format!("{entity_val:?}"),
1305 });
1306 }
1307 };
1308
1309 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1311 if self.is_edge {
1312 tracker.record_edge_write(tid, EdgeId(entity_id));
1313 } else {
1314 tracker.record_node_write(tid, NodeId(entity_id));
1315 }
1316 }
1317
1318 let resolved_props: Vec<(String, Value)> = self
1320 .properties
1321 .iter()
1322 .map(|(name, source)| {
1323 let value =
1324 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1325 (name.clone(), value)
1326 })
1327 .collect();
1328
1329 if let Some(ref validator) = self.validator {
1331 if self.is_edge {
1332 if let Some(ref et) = self.edge_type_name {
1333 for (name, value) in &resolved_props {
1334 validator.validate_edge_property(et, name, value)?;
1335 }
1336 }
1337 } else {
1338 for (name, value) in &resolved_props {
1339 validator.validate_node_property(&self.labels, name, value)?;
1340 validator.check_unique_node_property(&self.labels, name, value)?;
1341 }
1342 }
1343 }
1344
1345 let tx_id = self.transaction_id;
1347 for (prop_name, value) in resolved_props {
1348 if prop_name == "*" {
1349 if let Value::Map(map) = value {
1351 if self.replace {
1352 if self.is_edge {
1354 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1355 let keys: Vec<String> = edge
1356 .properties
1357 .iter()
1358 .map(|(k, _)| k.as_str().to_string())
1359 .collect();
1360 for key in keys {
1361 if let Some(tid) = tx_id {
1362 self.store.remove_edge_property_versioned(
1363 EdgeId(entity_id),
1364 &key,
1365 tid,
1366 );
1367 } else {
1368 self.store
1369 .remove_edge_property(EdgeId(entity_id), &key);
1370 }
1371 }
1372 }
1373 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1374 let keys: Vec<String> = node
1375 .properties
1376 .iter()
1377 .map(|(k, _)| k.as_str().to_string())
1378 .collect();
1379 for key in keys {
1380 if let Some(tid) = tx_id {
1381 self.store.remove_node_property_versioned(
1382 NodeId(entity_id),
1383 &key,
1384 tid,
1385 );
1386 } else {
1387 self.store
1388 .remove_node_property(NodeId(entity_id), &key);
1389 }
1390 }
1391 }
1392 }
1393 for (key, val) in map.iter() {
1395 if self.is_edge {
1396 if let Some(tid) = tx_id {
1397 self.store.set_edge_property_versioned(
1398 EdgeId(entity_id),
1399 key.as_str(),
1400 val.clone(),
1401 tid,
1402 );
1403 } else {
1404 self.store.set_edge_property(
1405 EdgeId(entity_id),
1406 key.as_str(),
1407 val.clone(),
1408 );
1409 }
1410 } else if let Some(tid) = tx_id {
1411 self.store.set_node_property_versioned(
1412 NodeId(entity_id),
1413 key.as_str(),
1414 val.clone(),
1415 tid,
1416 );
1417 } else {
1418 self.store.set_node_property(
1419 NodeId(entity_id),
1420 key.as_str(),
1421 val.clone(),
1422 );
1423 }
1424 }
1425 }
1426 } else if self.is_edge {
1427 if let Some(tid) = tx_id {
1428 self.store.set_edge_property_versioned(
1429 EdgeId(entity_id),
1430 &prop_name,
1431 value,
1432 tid,
1433 );
1434 } else {
1435 self.store
1436 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1437 }
1438 } else if let Some(tid) = tx_id {
1439 self.store.set_node_property_versioned(
1440 NodeId(entity_id),
1441 &prop_name,
1442 value,
1443 tid,
1444 );
1445 } else {
1446 self.store
1447 .set_node_property(NodeId(entity_id), &prop_name, value);
1448 }
1449 }
1450
1451 for col_idx in 0..chunk.column_count() {
1453 if let (Some(src), Some(dst)) =
1454 (chunk.column(col_idx), builder.column_mut(col_idx))
1455 {
1456 if let Some(val) = src.get_value(row) {
1457 dst.push_value(val);
1458 } else {
1459 dst.push_value(Value::Null);
1460 }
1461 }
1462 }
1463
1464 builder.advance_row();
1465 }
1466
1467 return Ok(Some(builder.finish()));
1468 }
1469 Ok(None)
1470 }
1471
1472 fn reset(&mut self) {
1473 self.input.reset();
1474 }
1475
1476 fn name(&self) -> &'static str {
1477 "SetProperty"
1478 }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483 use super::*;
1484 use crate::execution::DataChunk;
1485 use crate::execution::chunk::DataChunkBuilder;
1486 use crate::graph::lpg::LpgStore;
1487
1488 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1491 Arc::new(LpgStore::new().unwrap())
1492 }
1493
1494 struct MockInput {
1495 chunk: Option<DataChunk>,
1496 }
1497
1498 impl MockInput {
1499 fn boxed(chunk: DataChunk) -> Box<Self> {
1500 Box::new(Self { chunk: Some(chunk) })
1501 }
1502 }
1503
1504 impl Operator for MockInput {
1505 fn next(&mut self) -> OperatorResult {
1506 Ok(self.chunk.take())
1507 }
1508 fn reset(&mut self) {}
1509 fn name(&self) -> &'static str {
1510 "MockInput"
1511 }
1512 }
1513
1514 struct EmptyInput;
1515 impl Operator for EmptyInput {
1516 fn next(&mut self) -> OperatorResult {
1517 Ok(None)
1518 }
1519 fn reset(&mut self) {}
1520 fn name(&self) -> &'static str {
1521 "EmptyInput"
1522 }
1523 }
1524
1525 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1526 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1527 for id in ids {
1528 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1529 builder.advance_row();
1530 }
1531 builder.finish()
1532 }
1533
1534 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1535 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1536 for id in ids {
1537 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1538 builder.advance_row();
1539 }
1540 builder.finish()
1541 }
1542
1543 #[test]
1546 fn test_create_node_standalone() {
1547 let store = create_test_store();
1548
1549 let mut op = CreateNodeOperator::new(
1550 Arc::clone(&store),
1551 None,
1552 vec!["Person".to_string()],
1553 vec![(
1554 "name".to_string(),
1555 PropertySource::Constant(Value::String("Alix".into())),
1556 )],
1557 vec![LogicalType::Int64],
1558 0,
1559 );
1560
1561 let chunk = op.next().unwrap().unwrap();
1562 assert_eq!(chunk.row_count(), 1);
1563
1564 assert!(op.next().unwrap().is_none());
1566
1567 assert_eq!(store.node_count(), 1);
1568 }
1569
1570 #[test]
1571 fn test_create_edge() {
1572 let store = create_test_store();
1573
1574 let node1 = store.create_node(&["Person"]);
1575 let node2 = store.create_node(&["Person"]);
1576
1577 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1578 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1579 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1580 builder.advance_row();
1581
1582 let mut op = CreateEdgeOperator::new(
1583 Arc::clone(&store),
1584 MockInput::boxed(builder.finish()),
1585 0,
1586 1,
1587 "KNOWS".to_string(),
1588 vec![LogicalType::Int64, LogicalType::Int64],
1589 );
1590
1591 let _chunk = op.next().unwrap().unwrap();
1592 assert_eq!(store.edge_count(), 1);
1593 }
1594
1595 #[test]
1596 fn test_delete_node() {
1597 let store = create_test_store();
1598
1599 let node_id = store.create_node(&["Person"]);
1600 assert_eq!(store.node_count(), 1);
1601
1602 let mut op = DeleteNodeOperator::new(
1603 Arc::clone(&store),
1604 MockInput::boxed(node_id_chunk(&[node_id])),
1605 0,
1606 vec![LogicalType::Node],
1607 false,
1608 );
1609
1610 let chunk = op.next().unwrap().unwrap();
1611 assert_eq!(chunk.row_count(), 1);
1613 assert_eq!(store.node_count(), 0);
1614 }
1615
1616 #[test]
1619 fn test_delete_edge() {
1620 let store = create_test_store();
1621
1622 let n1 = store.create_node(&["Person"]);
1623 let n2 = store.create_node(&["Person"]);
1624 let eid = store.create_edge(n1, n2, "KNOWS");
1625 assert_eq!(store.edge_count(), 1);
1626
1627 let mut op = DeleteEdgeOperator::new(
1628 Arc::clone(&store),
1629 MockInput::boxed(edge_id_chunk(&[eid])),
1630 0,
1631 vec![LogicalType::Node],
1632 );
1633
1634 let chunk = op.next().unwrap().unwrap();
1635 assert_eq!(chunk.row_count(), 1);
1636 assert_eq!(store.edge_count(), 0);
1637 }
1638
1639 #[test]
1640 fn test_delete_edge_no_input_returns_none() {
1641 let store = create_test_store();
1642
1643 let mut op = DeleteEdgeOperator::new(
1644 Arc::clone(&store),
1645 Box::new(EmptyInput),
1646 0,
1647 vec![LogicalType::Int64],
1648 );
1649
1650 assert!(op.next().unwrap().is_none());
1651 }
1652
1653 #[test]
1654 fn test_delete_multiple_edges() {
1655 let store = create_test_store();
1656
1657 let n1 = store.create_node(&["N"]);
1658 let n2 = store.create_node(&["N"]);
1659 let e1 = store.create_edge(n1, n2, "R");
1660 let e2 = store.create_edge(n2, n1, "S");
1661 assert_eq!(store.edge_count(), 2);
1662
1663 let mut op = DeleteEdgeOperator::new(
1664 Arc::clone(&store),
1665 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1666 0,
1667 vec![LogicalType::Node],
1668 );
1669
1670 let chunk = op.next().unwrap().unwrap();
1671 assert_eq!(chunk.row_count(), 2);
1672 assert_eq!(store.edge_count(), 0);
1673 }
1674
1675 #[test]
1678 fn test_delete_node_detach() {
1679 let store = create_test_store();
1680
1681 let n1 = store.create_node(&["Person"]);
1682 let n2 = store.create_node(&["Person"]);
1683 store.create_edge(n1, n2, "KNOWS");
1684 store.create_edge(n2, n1, "FOLLOWS");
1685 assert_eq!(store.edge_count(), 2);
1686
1687 let mut op = DeleteNodeOperator::new(
1688 Arc::clone(&store),
1689 MockInput::boxed(node_id_chunk(&[n1])),
1690 0,
1691 vec![LogicalType::Node],
1692 true, );
1694
1695 let chunk = op.next().unwrap().unwrap();
1696 assert_eq!(chunk.row_count(), 1);
1697 assert_eq!(store.node_count(), 1);
1698 assert_eq!(store.edge_count(), 0); }
1700
1701 #[test]
1704 fn test_add_label() {
1705 let store = create_test_store();
1706
1707 let node = store.create_node(&["Person"]);
1708
1709 let mut op = AddLabelOperator::new(
1710 Arc::clone(&store),
1711 MockInput::boxed(node_id_chunk(&[node])),
1712 0,
1713 vec!["Employee".to_string()],
1714 vec![LogicalType::Int64],
1715 );
1716
1717 let chunk = op.next().unwrap().unwrap();
1718 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1719 assert_eq!(updated, 1);
1720
1721 let node_data = store.get_node(node).unwrap();
1723 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1724 assert!(labels.contains(&"Person"));
1725 assert!(labels.contains(&"Employee"));
1726 }
1727
1728 #[test]
1729 fn test_add_multiple_labels() {
1730 let store = create_test_store();
1731
1732 let node = store.create_node(&["Base"]);
1733
1734 let mut op = AddLabelOperator::new(
1735 Arc::clone(&store),
1736 MockInput::boxed(node_id_chunk(&[node])),
1737 0,
1738 vec!["LabelA".to_string(), "LabelB".to_string()],
1739 vec![LogicalType::Int64],
1740 );
1741
1742 let chunk = op.next().unwrap().unwrap();
1743 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1744 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1747 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1748 assert!(labels.contains(&"LabelA"));
1749 assert!(labels.contains(&"LabelB"));
1750 }
1751
1752 #[test]
1753 fn test_add_label_no_input_returns_none() {
1754 let store = create_test_store();
1755
1756 let mut op = AddLabelOperator::new(
1757 Arc::clone(&store),
1758 Box::new(EmptyInput),
1759 0,
1760 vec!["Foo".to_string()],
1761 vec![LogicalType::Int64],
1762 );
1763
1764 assert!(op.next().unwrap().is_none());
1765 }
1766
1767 #[test]
1770 fn test_remove_label() {
1771 let store = create_test_store();
1772
1773 let node = store.create_node(&["Person", "Employee"]);
1774
1775 let mut op = RemoveLabelOperator::new(
1776 Arc::clone(&store),
1777 MockInput::boxed(node_id_chunk(&[node])),
1778 0,
1779 vec!["Employee".to_string()],
1780 vec![LogicalType::Int64],
1781 );
1782
1783 let chunk = op.next().unwrap().unwrap();
1784 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1785 assert_eq!(updated, 1);
1786
1787 let node_data = store.get_node(node).unwrap();
1789 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1790 assert!(labels.contains(&"Person"));
1791 assert!(!labels.contains(&"Employee"));
1792 }
1793
1794 #[test]
1795 fn test_remove_nonexistent_label() {
1796 let store = create_test_store();
1797
1798 let node = store.create_node(&["Person"]);
1799
1800 let mut op = RemoveLabelOperator::new(
1801 Arc::clone(&store),
1802 MockInput::boxed(node_id_chunk(&[node])),
1803 0,
1804 vec!["NonExistent".to_string()],
1805 vec![LogicalType::Int64],
1806 );
1807
1808 let chunk = op.next().unwrap().unwrap();
1809 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1810 assert_eq!(updated, 0); }
1812
1813 #[test]
1816 fn test_set_node_property_constant() {
1817 let store = create_test_store();
1818
1819 let node = store.create_node(&["Person"]);
1820
1821 let mut op = SetPropertyOperator::new_for_node(
1822 Arc::clone(&store),
1823 MockInput::boxed(node_id_chunk(&[node])),
1824 0,
1825 vec![(
1826 "name".to_string(),
1827 PropertySource::Constant(Value::String("Alix".into())),
1828 )],
1829 vec![LogicalType::Int64],
1830 );
1831
1832 let chunk = op.next().unwrap().unwrap();
1833 assert_eq!(chunk.row_count(), 1);
1834
1835 let node_data = store.get_node(node).unwrap();
1837 assert_eq!(
1838 node_data
1839 .properties
1840 .get(&grafeo_common::types::PropertyKey::new("name")),
1841 Some(&Value::String("Alix".into()))
1842 );
1843 }
1844
1845 #[test]
1846 fn test_set_node_property_from_column() {
1847 let store = create_test_store();
1848
1849 let node = store.create_node(&["Person"]);
1850
1851 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1853 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1854 builder
1855 .column_mut(1)
1856 .unwrap()
1857 .push_value(Value::String("Gus".into()));
1858 builder.advance_row();
1859
1860 let mut op = SetPropertyOperator::new_for_node(
1861 Arc::clone(&store),
1862 MockInput::boxed(builder.finish()),
1863 0,
1864 vec![("name".to_string(), PropertySource::Column(1))],
1865 vec![LogicalType::Int64, LogicalType::String],
1866 );
1867
1868 let chunk = op.next().unwrap().unwrap();
1869 assert_eq!(chunk.row_count(), 1);
1870
1871 let node_data = store.get_node(node).unwrap();
1872 assert_eq!(
1873 node_data
1874 .properties
1875 .get(&grafeo_common::types::PropertyKey::new("name")),
1876 Some(&Value::String("Gus".into()))
1877 );
1878 }
1879
1880 #[test]
1881 fn test_set_edge_property() {
1882 let store = create_test_store();
1883
1884 let n1 = store.create_node(&["N"]);
1885 let n2 = store.create_node(&["N"]);
1886 let eid = store.create_edge(n1, n2, "KNOWS");
1887
1888 let mut op = SetPropertyOperator::new_for_edge(
1889 Arc::clone(&store),
1890 MockInput::boxed(edge_id_chunk(&[eid])),
1891 0,
1892 vec![(
1893 "weight".to_string(),
1894 PropertySource::Constant(Value::Float64(0.75)),
1895 )],
1896 vec![LogicalType::Int64],
1897 );
1898
1899 let chunk = op.next().unwrap().unwrap();
1900 assert_eq!(chunk.row_count(), 1);
1901
1902 let edge_data = store.get_edge(eid).unwrap();
1903 assert_eq!(
1904 edge_data
1905 .properties
1906 .get(&grafeo_common::types::PropertyKey::new("weight")),
1907 Some(&Value::Float64(0.75))
1908 );
1909 }
1910
1911 #[test]
1912 fn test_set_multiple_properties() {
1913 let store = create_test_store();
1914
1915 let node = store.create_node(&["Person"]);
1916
1917 let mut op = SetPropertyOperator::new_for_node(
1918 Arc::clone(&store),
1919 MockInput::boxed(node_id_chunk(&[node])),
1920 0,
1921 vec![
1922 (
1923 "name".to_string(),
1924 PropertySource::Constant(Value::String("Alix".into())),
1925 ),
1926 (
1927 "age".to_string(),
1928 PropertySource::Constant(Value::Int64(30)),
1929 ),
1930 ],
1931 vec![LogicalType::Int64],
1932 );
1933
1934 op.next().unwrap().unwrap();
1935
1936 let node_data = store.get_node(node).unwrap();
1937 assert_eq!(
1938 node_data
1939 .properties
1940 .get(&grafeo_common::types::PropertyKey::new("name")),
1941 Some(&Value::String("Alix".into()))
1942 );
1943 assert_eq!(
1944 node_data
1945 .properties
1946 .get(&grafeo_common::types::PropertyKey::new("age")),
1947 Some(&Value::Int64(30))
1948 );
1949 }
1950
1951 #[test]
1952 fn test_set_property_no_input_returns_none() {
1953 let store = create_test_store();
1954
1955 let mut op = SetPropertyOperator::new_for_node(
1956 Arc::clone(&store),
1957 Box::new(EmptyInput),
1958 0,
1959 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1960 vec![LogicalType::Int64],
1961 );
1962
1963 assert!(op.next().unwrap().is_none());
1964 }
1965
1966 #[test]
1969 fn test_delete_node_without_detach_errors_when_edges_exist() {
1970 let store = create_test_store();
1971
1972 let n1 = store.create_node(&["Person"]);
1973 let n2 = store.create_node(&["Person"]);
1974 store.create_edge(n1, n2, "KNOWS");
1975
1976 let mut op = DeleteNodeOperator::new(
1977 Arc::clone(&store),
1978 MockInput::boxed(node_id_chunk(&[n1])),
1979 0,
1980 vec![LogicalType::Int64],
1981 false, );
1983
1984 let err = op.next().unwrap_err();
1985 match err {
1986 OperatorError::ConstraintViolation(msg) => {
1987 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
1988 }
1989 other => panic!("expected ConstraintViolation, got {other:?}"),
1990 }
1991 assert_eq!(store.node_count(), 2);
1993 }
1994
1995 #[test]
1998 fn test_create_node_with_input_operator() {
1999 let store = create_test_store();
2000
2001 let existing = store.create_node(&["Seed"]);
2003
2004 let mut op = CreateNodeOperator::new(
2005 Arc::clone(&store),
2006 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2007 vec!["Created".to_string()],
2008 vec![(
2009 "source".to_string(),
2010 PropertySource::Constant(Value::String("from_input".into())),
2011 )],
2012 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2015
2016 let chunk = op.next().unwrap().unwrap();
2017 assert_eq!(chunk.row_count(), 1);
2018
2019 assert_eq!(store.node_count(), 2);
2021
2022 assert!(op.next().unwrap().is_none());
2024 }
2025
2026 #[test]
2029 fn test_create_edge_with_properties_and_output_column() {
2030 let store = create_test_store();
2031
2032 let n1 = store.create_node(&["Person"]);
2033 let n2 = store.create_node(&["Person"]);
2034
2035 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2036 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2037 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2038 builder.advance_row();
2039
2040 let mut op = CreateEdgeOperator::new(
2041 Arc::clone(&store),
2042 MockInput::boxed(builder.finish()),
2043 0,
2044 1,
2045 "KNOWS".to_string(),
2046 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2047 )
2048 .with_properties(vec![(
2049 "since".to_string(),
2050 PropertySource::Constant(Value::Int64(2024)),
2051 )])
2052 .with_output_column(2);
2053
2054 let chunk = op.next().unwrap().unwrap();
2055 assert_eq!(chunk.row_count(), 1);
2056 assert_eq!(store.edge_count(), 1);
2057
2058 let edge_id_raw = chunk
2060 .column(2)
2061 .and_then(|c| c.get_int64(0))
2062 .expect("edge ID should be in output column 2");
2063 let edge_id = EdgeId(edge_id_raw as u64);
2064
2065 let edge = store.get_edge(edge_id).expect("edge should exist");
2067 assert_eq!(
2068 edge.properties
2069 .get(&grafeo_common::types::PropertyKey::new("since")),
2070 Some(&Value::Int64(2024))
2071 );
2072 }
2073
2074 #[test]
2077 fn test_set_property_map_replace() {
2078 use std::collections::BTreeMap;
2079
2080 let store = create_test_store();
2081
2082 let node = store.create_node(&["Person"]);
2083 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2084
2085 let mut map = BTreeMap::new();
2086 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2087
2088 let mut op = SetPropertyOperator::new_for_node(
2089 Arc::clone(&store),
2090 MockInput::boxed(node_id_chunk(&[node])),
2091 0,
2092 vec![(
2093 "*".to_string(),
2094 PropertySource::Constant(Value::Map(Arc::new(map))),
2095 )],
2096 vec![LogicalType::Int64],
2097 )
2098 .with_replace(true);
2099
2100 op.next().unwrap().unwrap();
2101
2102 let node_data = store.get_node(node).unwrap();
2103 assert!(
2105 node_data
2106 .properties
2107 .get(&PropertyKey::new("old_prop"))
2108 .is_none()
2109 );
2110 assert_eq!(
2112 node_data.properties.get(&PropertyKey::new("new_key")),
2113 Some(&Value::String("new_val".into()))
2114 );
2115 }
2116
2117 #[test]
2120 fn test_set_property_map_merge() {
2121 use std::collections::BTreeMap;
2122
2123 let store = create_test_store();
2124
2125 let node = store.create_node(&["Person"]);
2126 store.set_node_property(node, "existing", Value::Int64(42));
2127
2128 let mut map = BTreeMap::new();
2129 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2130
2131 let mut op = SetPropertyOperator::new_for_node(
2132 Arc::clone(&store),
2133 MockInput::boxed(node_id_chunk(&[node])),
2134 0,
2135 vec![(
2136 "*".to_string(),
2137 PropertySource::Constant(Value::Map(Arc::new(map))),
2138 )],
2139 vec![LogicalType::Int64],
2140 ); op.next().unwrap().unwrap();
2143
2144 let node_data = store.get_node(node).unwrap();
2145 assert_eq!(
2147 node_data.properties.get(&PropertyKey::new("existing")),
2148 Some(&Value::Int64(42))
2149 );
2150 assert_eq!(
2152 node_data.properties.get(&PropertyKey::new("added")),
2153 Some(&Value::String("hello".into()))
2154 );
2155 }
2156
2157 #[test]
2160 fn test_property_source_property_access() {
2161 let store = create_test_store();
2162
2163 let source_node = store.create_node(&["Source"]);
2164 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2165
2166 let target_node = store.create_node(&["Target"]);
2167
2168 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2170 builder.column_mut(0).unwrap().push_node_id(source_node);
2171 builder
2172 .column_mut(1)
2173 .unwrap()
2174 .push_int64(target_node.0 as i64);
2175 builder.advance_row();
2176
2177 let mut op = SetPropertyOperator::new_for_node(
2178 Arc::clone(&store),
2179 MockInput::boxed(builder.finish()),
2180 1, vec![(
2182 "copied_name".to_string(),
2183 PropertySource::PropertyAccess {
2184 column: 0,
2185 property: "name".to_string(),
2186 },
2187 )],
2188 vec![LogicalType::Node, LogicalType::Int64],
2189 );
2190
2191 op.next().unwrap().unwrap();
2192
2193 let target_data = store.get_node(target_node).unwrap();
2194 assert_eq!(
2195 target_data.properties.get(&PropertyKey::new("copied_name")),
2196 Some(&Value::String("Alix".into()))
2197 );
2198 }
2199
2200 #[test]
2203 fn test_create_node_with_constraint_validator() {
2204 let store = create_test_store();
2205
2206 struct RejectAgeValidator;
2207 impl ConstraintValidator for RejectAgeValidator {
2208 fn validate_node_property(
2209 &self,
2210 _labels: &[String],
2211 key: &str,
2212 _value: &Value,
2213 ) -> Result<(), OperatorError> {
2214 if key == "forbidden" {
2215 return Err(OperatorError::ConstraintViolation(
2216 "property 'forbidden' is not allowed".to_string(),
2217 ));
2218 }
2219 Ok(())
2220 }
2221 fn validate_node_complete(
2222 &self,
2223 _labels: &[String],
2224 _properties: &[(String, Value)],
2225 ) -> Result<(), OperatorError> {
2226 Ok(())
2227 }
2228 fn check_unique_node_property(
2229 &self,
2230 _labels: &[String],
2231 _key: &str,
2232 _value: &Value,
2233 ) -> Result<(), OperatorError> {
2234 Ok(())
2235 }
2236 fn validate_edge_property(
2237 &self,
2238 _edge_type: &str,
2239 _key: &str,
2240 _value: &Value,
2241 ) -> Result<(), OperatorError> {
2242 Ok(())
2243 }
2244 fn validate_edge_complete(
2245 &self,
2246 _edge_type: &str,
2247 _properties: &[(String, Value)],
2248 ) -> Result<(), OperatorError> {
2249 Ok(())
2250 }
2251 }
2252
2253 let mut op = CreateNodeOperator::new(
2255 Arc::clone(&store),
2256 None,
2257 vec!["Thing".to_string()],
2258 vec![(
2259 "name".to_string(),
2260 PropertySource::Constant(Value::String("ok".into())),
2261 )],
2262 vec![LogicalType::Int64],
2263 0,
2264 )
2265 .with_validator(Arc::new(RejectAgeValidator));
2266
2267 assert!(op.next().is_ok());
2268 assert_eq!(store.node_count(), 1);
2269
2270 let mut op = CreateNodeOperator::new(
2272 Arc::clone(&store),
2273 None,
2274 vec!["Thing".to_string()],
2275 vec![(
2276 "forbidden".to_string(),
2277 PropertySource::Constant(Value::Int64(1)),
2278 )],
2279 vec![LogicalType::Int64],
2280 0,
2281 )
2282 .with_validator(Arc::new(RejectAgeValidator));
2283
2284 let err = op.next().unwrap_err();
2285 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2286 }
2289
2290 #[test]
2293 fn test_create_node_reset_allows_re_execution() {
2294 let store = create_test_store();
2295
2296 let mut op = CreateNodeOperator::new(
2297 Arc::clone(&store),
2298 None,
2299 vec!["Person".to_string()],
2300 vec![],
2301 vec![LogicalType::Int64],
2302 0,
2303 );
2304
2305 assert!(op.next().unwrap().is_some());
2307 assert!(op.next().unwrap().is_none());
2308
2309 op.reset();
2311 assert!(op.next().unwrap().is_some());
2312
2313 assert_eq!(store.node_count(), 2);
2314 }
2315
2316 #[test]
2319 fn test_operator_names() {
2320 let store = create_test_store();
2321
2322 let op = CreateNodeOperator::new(
2323 Arc::clone(&store),
2324 None,
2325 vec![],
2326 vec![],
2327 vec![LogicalType::Int64],
2328 0,
2329 );
2330 assert_eq!(op.name(), "CreateNode");
2331
2332 let op = CreateEdgeOperator::new(
2333 Arc::clone(&store),
2334 Box::new(EmptyInput),
2335 0,
2336 1,
2337 "R".to_string(),
2338 vec![LogicalType::Int64],
2339 );
2340 assert_eq!(op.name(), "CreateEdge");
2341
2342 let op = DeleteNodeOperator::new(
2343 Arc::clone(&store),
2344 Box::new(EmptyInput),
2345 0,
2346 vec![LogicalType::Int64],
2347 false,
2348 );
2349 assert_eq!(op.name(), "DeleteNode");
2350
2351 let op = DeleteEdgeOperator::new(
2352 Arc::clone(&store),
2353 Box::new(EmptyInput),
2354 0,
2355 vec![LogicalType::Int64],
2356 );
2357 assert_eq!(op.name(), "DeleteEdge");
2358
2359 let op = AddLabelOperator::new(
2360 Arc::clone(&store),
2361 Box::new(EmptyInput),
2362 0,
2363 vec!["L".to_string()],
2364 vec![LogicalType::Int64],
2365 );
2366 assert_eq!(op.name(), "AddLabel");
2367
2368 let op = RemoveLabelOperator::new(
2369 Arc::clone(&store),
2370 Box::new(EmptyInput),
2371 0,
2372 vec!["L".to_string()],
2373 vec![LogicalType::Int64],
2374 );
2375 assert_eq!(op.name(), "RemoveLabel");
2376
2377 let op = SetPropertyOperator::new_for_node(
2378 Arc::clone(&store),
2379 Box::new(EmptyInput),
2380 0,
2381 vec![],
2382 vec![LogicalType::Int64],
2383 );
2384 assert_eq!(op.name(), "SetProperty");
2385 }
2386}