1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
28 &self,
29 labels: &[String],
30 key: &str,
31 value: &Value,
32 ) -> Result<(), OperatorError>;
33
34 fn validate_node_complete(
38 &self,
39 labels: &[String],
40 properties: &[(String, Value)],
41 ) -> Result<(), OperatorError>;
42
43 fn check_unique_node_property(
47 &self,
48 labels: &[String],
49 key: &str,
50 value: &Value,
51 ) -> Result<(), OperatorError>;
52
53 fn validate_edge_property(
55 &self,
56 edge_type: &str,
57 key: &str,
58 value: &Value,
59 ) -> Result<(), OperatorError>;
60
61 fn validate_edge_complete(
63 &self,
64 edge_type: &str,
65 properties: &[(String, Value)],
66 ) -> Result<(), OperatorError>;
67
68 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70 let _ = labels;
71 Ok(())
72 }
73
74 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76 let _ = edge_type;
77 Ok(())
78 }
79
80 fn validate_edge_endpoints(
82 &self,
83 edge_type: &str,
84 source_labels: &[String],
85 target_labels: &[String],
86 ) -> Result<(), OperatorError> {
87 let _ = (edge_type, source_labels, target_labels);
88 Ok(())
89 }
90
91 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94 let _ = (labels, properties);
95 }
96}
97
98pub struct CreateNodeOperator {
103 store: Arc<dyn GraphStoreMut>,
105 input: Option<Box<dyn Operator>>,
107 labels: Vec<String>,
109 properties: Vec<(String, PropertySource)>,
111 output_schema: Vec<LogicalType>,
113 output_column: usize,
115 executed: bool,
117 viewing_epoch: Option<EpochId>,
119 transaction_id: Option<TransactionId>,
121 validator: Option<Arc<dyn ConstraintValidator>>,
123 write_tracker: Option<SharedWriteTracker>,
125}
126
127#[derive(Debug, Clone)]
129pub enum PropertySource {
130 Column(usize),
132 Constant(Value),
134 PropertyAccess {
136 column: usize,
138 property: String,
140 },
141}
142
143impl PropertySource {
144 pub fn resolve(
146 &self,
147 chunk: &crate::execution::chunk::DataChunk,
148 row: usize,
149 store: &dyn GraphStore,
150 ) -> Value {
151 match self {
152 PropertySource::Column(col_idx) => chunk
153 .column(*col_idx)
154 .and_then(|c| c.get_value(row))
155 .unwrap_or(Value::Null),
156 PropertySource::Constant(v) => v.clone(),
157 PropertySource::PropertyAccess { column, property } => {
158 let Some(col) = chunk.column(*column) else {
159 return Value::Null;
160 };
161 if let Some(node_id) = col.get_node_id(row) {
163 store
164 .get_node(node_id)
165 .and_then(|node| node.get_property(property).cloned())
166 .unwrap_or(Value::Null)
167 } else if let Some(edge_id) = col.get_edge_id(row) {
168 store
169 .get_edge(edge_id)
170 .and_then(|edge| edge.get_property(property).cloned())
171 .unwrap_or(Value::Null)
172 } else if let Some(Value::Map(map)) = col.get_value(row) {
173 let key = PropertyKey::new(property);
174 map.get(&key).cloned().unwrap_or(Value::Null)
175 } else {
176 Value::Null
177 }
178 }
179 }
180 }
181}
182
183impl CreateNodeOperator {
184 pub fn new(
194 store: Arc<dyn GraphStoreMut>,
195 input: Option<Box<dyn Operator>>,
196 labels: Vec<String>,
197 properties: Vec<(String, PropertySource)>,
198 output_schema: Vec<LogicalType>,
199 output_column: usize,
200 ) -> Self {
201 Self {
202 store,
203 input,
204 labels,
205 properties,
206 output_schema,
207 output_column,
208 executed: false,
209 viewing_epoch: None,
210 transaction_id: None,
211 validator: None,
212 write_tracker: None,
213 }
214 }
215
216 pub fn with_transaction_context(
218 mut self,
219 epoch: EpochId,
220 transaction_id: Option<TransactionId>,
221 ) -> Self {
222 self.viewing_epoch = Some(epoch);
223 self.transaction_id = transaction_id;
224 self
225 }
226
227 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229 self.validator = Some(validator);
230 self
231 }
232
233 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235 self.write_tracker = Some(tracker);
236 self
237 }
238}
239
240impl CreateNodeOperator {
241 fn validate_and_set_properties(
243 &self,
244 node_id: NodeId,
245 resolved_props: &mut Vec<(String, Value)>,
246 ) -> Result<(), OperatorError> {
247 if let Some(ref validator) = self.validator {
249 validator.validate_node_labels_allowed(&self.labels)?;
250 }
251
252 if let Some(ref validator) = self.validator {
254 validator.inject_defaults(&self.labels, resolved_props);
255 }
256
257 if let Some(ref validator) = self.validator {
259 for (name, value) in resolved_props.iter() {
260 validator.validate_node_property(&self.labels, name, value)?;
261 validator.check_unique_node_property(&self.labels, name, value)?;
262 }
263 validator.validate_node_complete(&self.labels, resolved_props)?;
265 }
266
267 if let Some(tid) = self.transaction_id {
269 for (name, value) in resolved_props.iter() {
270 self.store
271 .set_node_property_versioned(node_id, name, value.clone(), tid);
272 }
273 } else {
274 for (name, value) in resolved_props.iter() {
275 self.store.set_node_property(node_id, name, value.clone());
276 }
277 }
278 Ok(())
279 }
280}
281
282impl Operator for CreateNodeOperator {
283 fn next(&mut self) -> OperatorResult {
284 let epoch = self
286 .viewing_epoch
287 .unwrap_or_else(|| self.store.current_epoch());
288 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
289
290 if let Some(ref mut input) = self.input {
291 if let Some(chunk) = input.next()? {
293 let mut builder =
294 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
295
296 for row in chunk.selected_indices() {
297 let mut resolved_props: Vec<(String, Value)> = self
299 .properties
300 .iter()
301 .map(|(name, source)| {
302 let value =
303 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
304 (name.clone(), value)
305 })
306 .collect();
307
308 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
310 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
311
312 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
314 tracker.record_node_write(tid, node_id)?;
315 }
316
317 self.validate_and_set_properties(node_id, &mut resolved_props)?;
319
320 for col_idx in 0..chunk.column_count() {
322 if col_idx < self.output_column
323 && let (Some(src), Some(dst)) =
324 (chunk.column(col_idx), builder.column_mut(col_idx))
325 {
326 if let Some(val) = src.get_value(row) {
327 dst.push_value(val);
328 } else {
329 dst.push_value(Value::Null);
330 }
331 }
332 }
333
334 if let Some(dst) = builder.column_mut(self.output_column) {
336 dst.push_value(Value::Int64(node_id.0 as i64));
337 }
338
339 builder.advance_row();
340 }
341
342 return Ok(Some(builder.finish()));
343 }
344 Ok(None)
345 } else {
346 if self.executed {
348 return Ok(None);
349 }
350 self.executed = true;
351
352 let mut resolved_props: Vec<(String, Value)> = self
354 .properties
355 .iter()
356 .filter_map(|(name, source)| {
357 if let PropertySource::Constant(value) = source {
358 Some((name.clone(), value.clone()))
359 } else {
360 None
361 }
362 })
363 .collect();
364
365 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
367 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
368
369 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
371 tracker.record_node_write(tid, node_id)?;
372 }
373
374 self.validate_and_set_properties(node_id, &mut resolved_props)?;
376
377 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
379 if let Some(dst) = builder.column_mut(self.output_column) {
380 dst.push_value(Value::Int64(node_id.0 as i64));
381 }
382 builder.advance_row();
383
384 Ok(Some(builder.finish()))
385 }
386 }
387
388 fn reset(&mut self) {
389 if let Some(ref mut input) = self.input {
390 input.reset();
391 }
392 self.executed = false;
393 }
394
395 fn name(&self) -> &'static str {
396 "CreateNode"
397 }
398}
399
400pub struct CreateEdgeOperator {
402 store: Arc<dyn GraphStoreMut>,
404 input: Box<dyn Operator>,
406 from_column: usize,
408 to_column: usize,
410 edge_type: String,
412 properties: Vec<(String, PropertySource)>,
414 output_schema: Vec<LogicalType>,
416 output_column: Option<usize>,
418 viewing_epoch: Option<EpochId>,
420 transaction_id: Option<TransactionId>,
422 validator: Option<Arc<dyn ConstraintValidator>>,
424 write_tracker: Option<SharedWriteTracker>,
426}
427
428impl CreateEdgeOperator {
429 pub fn new(
436 store: Arc<dyn GraphStoreMut>,
437 input: Box<dyn Operator>,
438 from_column: usize,
439 to_column: usize,
440 edge_type: String,
441 output_schema: Vec<LogicalType>,
442 ) -> Self {
443 Self {
444 store,
445 input,
446 from_column,
447 to_column,
448 edge_type,
449 properties: Vec::new(),
450 output_schema,
451 output_column: None,
452 viewing_epoch: None,
453 transaction_id: None,
454 validator: None,
455 write_tracker: None,
456 }
457 }
458
459 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
461 self.properties = properties;
462 self
463 }
464
465 pub fn with_output_column(mut self, column: usize) -> Self {
467 self.output_column = Some(column);
468 self
469 }
470
471 pub fn with_transaction_context(
473 mut self,
474 epoch: EpochId,
475 transaction_id: Option<TransactionId>,
476 ) -> Self {
477 self.viewing_epoch = Some(epoch);
478 self.transaction_id = transaction_id;
479 self
480 }
481
482 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
484 self.validator = Some(validator);
485 self
486 }
487
488 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
490 self.write_tracker = Some(tracker);
491 self
492 }
493}
494
495impl Operator for CreateEdgeOperator {
496 fn next(&mut self) -> OperatorResult {
497 let epoch = self
499 .viewing_epoch
500 .unwrap_or_else(|| self.store.current_epoch());
501 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
502
503 if let Some(chunk) = self.input.next()? {
504 let mut builder =
505 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
506
507 for row in chunk.selected_indices() {
508 let from_id = chunk
510 .column(self.from_column)
511 .and_then(|c| c.get_value(row))
512 .ok_or_else(|| {
513 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
514 })?;
515
516 let to_id = chunk
517 .column(self.to_column)
518 .and_then(|c| c.get_value(row))
519 .ok_or_else(|| {
520 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
521 })?;
522
523 let from_node_id = match from_id {
525 Value::Int64(id) => NodeId(id as u64),
526 _ => {
527 return Err(OperatorError::TypeMismatch {
528 expected: "Int64 (node ID)".to_string(),
529 found: format!("{from_id:?}"),
530 });
531 }
532 };
533
534 let to_node_id = match to_id {
535 Value::Int64(id) => NodeId(id as u64),
536 _ => {
537 return Err(OperatorError::TypeMismatch {
538 expected: "Int64 (node ID)".to_string(),
539 found: format!("{to_id:?}"),
540 });
541 }
542 };
543
544 if let Some(ref validator) = self.validator {
546 validator.validate_edge_type_allowed(&self.edge_type)?;
547
548 let source_labels: Vec<String> = self
550 .store
551 .get_node(from_node_id)
552 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
553 .unwrap_or_default();
554 let target_labels: Vec<String> = self
555 .store
556 .get_node(to_node_id)
557 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
558 .unwrap_or_default();
559 validator.validate_edge_endpoints(
560 &self.edge_type,
561 &source_labels,
562 &target_labels,
563 )?;
564 }
565
566 let resolved_props: Vec<(String, Value)> = self
568 .properties
569 .iter()
570 .map(|(name, source)| {
571 let value =
572 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
573 (name.clone(), value)
574 })
575 .collect();
576
577 if let Some(ref validator) = self.validator {
579 for (name, value) in &resolved_props {
580 validator.validate_edge_property(&self.edge_type, name, value)?;
581 }
582 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
583 }
584
585 let edge_id = self.store.create_edge_versioned(
587 from_node_id,
588 to_node_id,
589 &self.edge_type,
590 epoch,
591 tx,
592 );
593
594 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
596 tracker.record_edge_write(tid, edge_id)?;
597 }
598
599 if let Some(tid) = self.transaction_id {
601 for (name, value) in resolved_props {
602 self.store
603 .set_edge_property_versioned(edge_id, &name, value, tid);
604 }
605 } else {
606 for (name, value) in resolved_props {
607 self.store.set_edge_property(edge_id, &name, value);
608 }
609 }
610
611 for col_idx in 0..chunk.column_count() {
613 if let (Some(src), Some(dst)) =
614 (chunk.column(col_idx), builder.column_mut(col_idx))
615 {
616 if let Some(val) = src.get_value(row) {
617 dst.push_value(val);
618 } else {
619 dst.push_value(Value::Null);
620 }
621 }
622 }
623
624 if let Some(out_col) = self.output_column
626 && let Some(dst) = builder.column_mut(out_col)
627 {
628 dst.push_value(Value::Int64(edge_id.0 as i64));
629 }
630
631 builder.advance_row();
632 }
633
634 return Ok(Some(builder.finish()));
635 }
636 Ok(None)
637 }
638
639 fn reset(&mut self) {
640 self.input.reset();
641 }
642
643 fn name(&self) -> &'static str {
644 "CreateEdge"
645 }
646}
647
648pub struct DeleteNodeOperator {
650 store: Arc<dyn GraphStoreMut>,
652 input: Box<dyn Operator>,
654 node_column: usize,
656 output_schema: Vec<LogicalType>,
658 detach: bool,
660 viewing_epoch: Option<EpochId>,
662 transaction_id: Option<TransactionId>,
664 write_tracker: Option<SharedWriteTracker>,
666}
667
668impl DeleteNodeOperator {
669 pub fn new(
671 store: Arc<dyn GraphStoreMut>,
672 input: Box<dyn Operator>,
673 node_column: usize,
674 output_schema: Vec<LogicalType>,
675 detach: bool,
676 ) -> Self {
677 Self {
678 store,
679 input,
680 node_column,
681 output_schema,
682 detach,
683 viewing_epoch: None,
684 transaction_id: None,
685 write_tracker: None,
686 }
687 }
688
689 pub fn with_transaction_context(
691 mut self,
692 epoch: EpochId,
693 transaction_id: Option<TransactionId>,
694 ) -> Self {
695 self.viewing_epoch = Some(epoch);
696 self.transaction_id = transaction_id;
697 self
698 }
699
700 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
702 self.write_tracker = Some(tracker);
703 self
704 }
705}
706
707impl Operator for DeleteNodeOperator {
708 fn next(&mut self) -> OperatorResult {
709 let epoch = self
711 .viewing_epoch
712 .unwrap_or_else(|| self.store.current_epoch());
713 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
714
715 if let Some(chunk) = self.input.next()? {
716 let mut builder =
717 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
718
719 for row in chunk.selected_indices() {
720 let node_val = chunk
721 .column(self.node_column)
722 .and_then(|c| c.get_value(row))
723 .ok_or_else(|| {
724 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
725 })?;
726
727 let node_id = match node_val {
728 Value::Int64(id) => NodeId(id as u64),
729 _ => {
730 return Err(OperatorError::TypeMismatch {
731 expected: "Int64 (node ID)".to_string(),
732 found: format!("{node_val:?}"),
733 });
734 }
735 };
736
737 if self.detach {
738 let outgoing = self
741 .store
742 .edges_from(node_id, crate::graph::Direction::Outgoing);
743 let incoming = self
744 .store
745 .edges_from(node_id, crate::graph::Direction::Incoming);
746 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
747 self.store.delete_edge_versioned(edge_id, epoch, tx);
748 if let (Some(tracker), Some(tid)) =
749 (&self.write_tracker, self.transaction_id)
750 {
751 tracker.record_edge_write(tid, edge_id)?;
752 }
753 }
754 } else {
755 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
757 if degree > 0 {
758 return Err(OperatorError::ConstraintViolation(format!(
759 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
760 degree
761 )));
762 }
763 }
764
765 self.store.delete_node_versioned(node_id, epoch, tx);
767
768 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
770 tracker.record_node_write(tid, node_id)?;
771 }
772
773 for col_idx in 0..chunk.column_count() {
776 if let (Some(src), Some(dst)) =
777 (chunk.column(col_idx), builder.column_mut(col_idx))
778 {
779 if let Some(val) = src.get_value(row) {
780 dst.push_value(val);
781 } else {
782 dst.push_value(Value::Null);
783 }
784 }
785 }
786 builder.advance_row();
787 }
788
789 return Ok(Some(builder.finish()));
790 }
791 Ok(None)
792 }
793
794 fn reset(&mut self) {
795 self.input.reset();
796 }
797
798 fn name(&self) -> &'static str {
799 "DeleteNode"
800 }
801}
802
803pub struct DeleteEdgeOperator {
805 store: Arc<dyn GraphStoreMut>,
807 input: Box<dyn Operator>,
809 edge_column: usize,
811 output_schema: Vec<LogicalType>,
813 viewing_epoch: Option<EpochId>,
815 transaction_id: Option<TransactionId>,
817 write_tracker: Option<SharedWriteTracker>,
819}
820
821impl DeleteEdgeOperator {
822 pub fn new(
824 store: Arc<dyn GraphStoreMut>,
825 input: Box<dyn Operator>,
826 edge_column: usize,
827 output_schema: Vec<LogicalType>,
828 ) -> Self {
829 Self {
830 store,
831 input,
832 edge_column,
833 output_schema,
834 viewing_epoch: None,
835 transaction_id: None,
836 write_tracker: None,
837 }
838 }
839
840 pub fn with_transaction_context(
842 mut self,
843 epoch: EpochId,
844 transaction_id: Option<TransactionId>,
845 ) -> Self {
846 self.viewing_epoch = Some(epoch);
847 self.transaction_id = transaction_id;
848 self
849 }
850
851 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
853 self.write_tracker = Some(tracker);
854 self
855 }
856}
857
858impl Operator for DeleteEdgeOperator {
859 fn next(&mut self) -> OperatorResult {
860 let epoch = self
862 .viewing_epoch
863 .unwrap_or_else(|| self.store.current_epoch());
864 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
865
866 if let Some(chunk) = self.input.next()? {
867 let mut builder =
868 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
869
870 for row in chunk.selected_indices() {
871 let edge_val = chunk
872 .column(self.edge_column)
873 .and_then(|c| c.get_value(row))
874 .ok_or_else(|| {
875 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
876 })?;
877
878 let edge_id = match edge_val {
879 Value::Int64(id) => EdgeId(id as u64),
880 _ => {
881 return Err(OperatorError::TypeMismatch {
882 expected: "Int64 (edge ID)".to_string(),
883 found: format!("{edge_val:?}"),
884 });
885 }
886 };
887
888 self.store.delete_edge_versioned(edge_id, epoch, tx);
890
891 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
893 tracker.record_edge_write(tid, edge_id)?;
894 }
895
896 for col_idx in 0..chunk.column_count() {
898 if let (Some(src), Some(dst)) =
899 (chunk.column(col_idx), builder.column_mut(col_idx))
900 {
901 if let Some(val) = src.get_value(row) {
902 dst.push_value(val);
903 } else {
904 dst.push_value(Value::Null);
905 }
906 }
907 }
908 builder.advance_row();
909 }
910
911 return Ok(Some(builder.finish()));
912 }
913 Ok(None)
914 }
915
916 fn reset(&mut self) {
917 self.input.reset();
918 }
919
920 fn name(&self) -> &'static str {
921 "DeleteEdge"
922 }
923}
924
925pub struct AddLabelOperator {
927 store: Arc<dyn GraphStoreMut>,
929 input: Box<dyn Operator>,
931 node_column: usize,
933 labels: Vec<String>,
935 output_schema: Vec<LogicalType>,
937 viewing_epoch: Option<EpochId>,
939 transaction_id: Option<TransactionId>,
941 write_tracker: Option<SharedWriteTracker>,
943}
944
945impl AddLabelOperator {
946 pub fn new(
948 store: Arc<dyn GraphStoreMut>,
949 input: Box<dyn Operator>,
950 node_column: usize,
951 labels: Vec<String>,
952 output_schema: Vec<LogicalType>,
953 ) -> Self {
954 Self {
955 store,
956 input,
957 node_column,
958 labels,
959 output_schema,
960 viewing_epoch: None,
961 transaction_id: None,
962 write_tracker: None,
963 }
964 }
965
966 pub fn with_transaction_context(
968 mut self,
969 epoch: EpochId,
970 transaction_id: Option<TransactionId>,
971 ) -> Self {
972 self.viewing_epoch = Some(epoch);
973 self.transaction_id = transaction_id;
974 self
975 }
976
977 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
979 self.write_tracker = Some(tracker);
980 self
981 }
982}
983
984impl Operator for AddLabelOperator {
985 fn next(&mut self) -> OperatorResult {
986 if let Some(chunk) = self.input.next()? {
987 let mut updated_count = 0;
988
989 for row in chunk.selected_indices() {
990 let node_val = chunk
991 .column(self.node_column)
992 .and_then(|c| c.get_value(row))
993 .ok_or_else(|| {
994 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
995 })?;
996
997 let node_id = match node_val {
998 Value::Int64(id) => NodeId(id as u64),
999 _ => {
1000 return Err(OperatorError::TypeMismatch {
1001 expected: "Int64 (node ID)".to_string(),
1002 found: format!("{node_val:?}"),
1003 });
1004 }
1005 };
1006
1007 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1009 tracker.record_node_write(tid, node_id)?;
1010 }
1011
1012 for label in &self.labels {
1014 let added = if let Some(tid) = self.transaction_id {
1015 self.store.add_label_versioned(node_id, label, tid)
1016 } else {
1017 self.store.add_label(node_id, label)
1018 };
1019 if added {
1020 updated_count += 1;
1021 }
1022 }
1023 }
1024
1025 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1027 if let Some(dst) = builder.column_mut(0) {
1028 dst.push_value(Value::Int64(updated_count));
1029 }
1030 builder.advance_row();
1031
1032 return Ok(Some(builder.finish()));
1033 }
1034 Ok(None)
1035 }
1036
1037 fn reset(&mut self) {
1038 self.input.reset();
1039 }
1040
1041 fn name(&self) -> &'static str {
1042 "AddLabel"
1043 }
1044}
1045
1046pub struct RemoveLabelOperator {
1048 store: Arc<dyn GraphStoreMut>,
1050 input: Box<dyn Operator>,
1052 node_column: usize,
1054 labels: Vec<String>,
1056 output_schema: Vec<LogicalType>,
1058 viewing_epoch: Option<EpochId>,
1060 transaction_id: Option<TransactionId>,
1062 write_tracker: Option<SharedWriteTracker>,
1064}
1065
1066impl RemoveLabelOperator {
1067 pub fn new(
1069 store: Arc<dyn GraphStoreMut>,
1070 input: Box<dyn Operator>,
1071 node_column: usize,
1072 labels: Vec<String>,
1073 output_schema: Vec<LogicalType>,
1074 ) -> Self {
1075 Self {
1076 store,
1077 input,
1078 node_column,
1079 labels,
1080 output_schema,
1081 viewing_epoch: None,
1082 transaction_id: None,
1083 write_tracker: None,
1084 }
1085 }
1086
1087 pub fn with_transaction_context(
1089 mut self,
1090 epoch: EpochId,
1091 transaction_id: Option<TransactionId>,
1092 ) -> Self {
1093 self.viewing_epoch = Some(epoch);
1094 self.transaction_id = transaction_id;
1095 self
1096 }
1097
1098 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1100 self.write_tracker = Some(tracker);
1101 self
1102 }
1103}
1104
1105impl Operator for RemoveLabelOperator {
1106 fn next(&mut self) -> OperatorResult {
1107 if let Some(chunk) = self.input.next()? {
1108 let mut updated_count = 0;
1109
1110 for row in chunk.selected_indices() {
1111 let node_val = chunk
1112 .column(self.node_column)
1113 .and_then(|c| c.get_value(row))
1114 .ok_or_else(|| {
1115 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1116 })?;
1117
1118 let node_id = match node_val {
1119 Value::Int64(id) => NodeId(id as u64),
1120 _ => {
1121 return Err(OperatorError::TypeMismatch {
1122 expected: "Int64 (node ID)".to_string(),
1123 found: format!("{node_val:?}"),
1124 });
1125 }
1126 };
1127
1128 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1130 tracker.record_node_write(tid, node_id)?;
1131 }
1132
1133 for label in &self.labels {
1135 let removed = if let Some(tid) = self.transaction_id {
1136 self.store.remove_label_versioned(node_id, label, tid)
1137 } else {
1138 self.store.remove_label(node_id, label)
1139 };
1140 if removed {
1141 updated_count += 1;
1142 }
1143 }
1144 }
1145
1146 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
1148 if let Some(dst) = builder.column_mut(0) {
1149 dst.push_value(Value::Int64(updated_count));
1150 }
1151 builder.advance_row();
1152
1153 return Ok(Some(builder.finish()));
1154 }
1155 Ok(None)
1156 }
1157
1158 fn reset(&mut self) {
1159 self.input.reset();
1160 }
1161
1162 fn name(&self) -> &'static str {
1163 "RemoveLabel"
1164 }
1165}
1166
1167pub struct SetPropertyOperator {
1172 store: Arc<dyn GraphStoreMut>,
1174 input: Box<dyn Operator>,
1176 entity_column: usize,
1178 is_edge: bool,
1180 properties: Vec<(String, PropertySource)>,
1182 output_schema: Vec<LogicalType>,
1184 replace: bool,
1186 validator: Option<Arc<dyn ConstraintValidator>>,
1188 labels: Vec<String>,
1190 edge_type_name: Option<String>,
1192 viewing_epoch: Option<EpochId>,
1194 transaction_id: Option<TransactionId>,
1196 write_tracker: Option<SharedWriteTracker>,
1198}
1199
1200impl SetPropertyOperator {
1201 pub fn new_for_node(
1203 store: Arc<dyn GraphStoreMut>,
1204 input: Box<dyn Operator>,
1205 node_column: usize,
1206 properties: Vec<(String, PropertySource)>,
1207 output_schema: Vec<LogicalType>,
1208 ) -> Self {
1209 Self {
1210 store,
1211 input,
1212 entity_column: node_column,
1213 is_edge: false,
1214 properties,
1215 output_schema,
1216 replace: false,
1217 validator: None,
1218 labels: Vec::new(),
1219 edge_type_name: None,
1220 viewing_epoch: None,
1221 transaction_id: None,
1222 write_tracker: None,
1223 }
1224 }
1225
1226 pub fn new_for_edge(
1228 store: Arc<dyn GraphStoreMut>,
1229 input: Box<dyn Operator>,
1230 edge_column: usize,
1231 properties: Vec<(String, PropertySource)>,
1232 output_schema: Vec<LogicalType>,
1233 ) -> Self {
1234 Self {
1235 store,
1236 input,
1237 entity_column: edge_column,
1238 is_edge: true,
1239 properties,
1240 output_schema,
1241 replace: false,
1242 validator: None,
1243 labels: Vec::new(),
1244 edge_type_name: None,
1245 viewing_epoch: None,
1246 transaction_id: None,
1247 write_tracker: None,
1248 }
1249 }
1250
1251 pub fn with_replace(mut self, replace: bool) -> Self {
1253 self.replace = replace;
1254 self
1255 }
1256
1257 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1259 self.validator = Some(validator);
1260 self
1261 }
1262
1263 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1265 self.labels = labels;
1266 self
1267 }
1268
1269 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1271 self.edge_type_name = Some(edge_type);
1272 self
1273 }
1274
1275 pub fn with_transaction_context(
1280 mut self,
1281 epoch: EpochId,
1282 transaction_id: Option<TransactionId>,
1283 ) -> Self {
1284 self.viewing_epoch = Some(epoch);
1285 self.transaction_id = transaction_id;
1286 self
1287 }
1288
1289 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1291 self.write_tracker = Some(tracker);
1292 self
1293 }
1294}
1295
1296impl Operator for SetPropertyOperator {
1297 fn next(&mut self) -> OperatorResult {
1298 if let Some(chunk) = self.input.next()? {
1299 let mut builder =
1300 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1301
1302 for row in chunk.selected_indices() {
1303 let entity_val = chunk
1304 .column(self.entity_column)
1305 .and_then(|c| c.get_value(row))
1306 .ok_or_else(|| {
1307 OperatorError::ColumnNotFound(format!(
1308 "entity column {}",
1309 self.entity_column
1310 ))
1311 })?;
1312
1313 let entity_id = match entity_val {
1314 Value::Int64(id) => id as u64,
1315 _ => {
1316 return Err(OperatorError::TypeMismatch {
1317 expected: "Int64 (entity ID)".to_string(),
1318 found: format!("{entity_val:?}"),
1319 });
1320 }
1321 };
1322
1323 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1325 if self.is_edge {
1326 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1327 } else {
1328 tracker.record_node_write(tid, NodeId(entity_id))?;
1329 }
1330 }
1331
1332 let resolved_props: Vec<(String, Value)> = self
1334 .properties
1335 .iter()
1336 .map(|(name, source)| {
1337 let value =
1338 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1339 (name.clone(), value)
1340 })
1341 .collect();
1342
1343 if let Some(ref validator) = self.validator {
1345 if self.is_edge {
1346 if let Some(ref et) = self.edge_type_name {
1347 for (name, value) in &resolved_props {
1348 validator.validate_edge_property(et, name, value)?;
1349 }
1350 }
1351 } else {
1352 for (name, value) in &resolved_props {
1353 validator.validate_node_property(&self.labels, name, value)?;
1354 validator.check_unique_node_property(&self.labels, name, value)?;
1355 }
1356 }
1357 }
1358
1359 let tx_id = self.transaction_id;
1361 for (prop_name, value) in resolved_props {
1362 if prop_name == "*" {
1363 if let Value::Map(map) = value {
1365 if self.replace {
1366 if self.is_edge {
1368 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1369 let keys: Vec<String> = edge
1370 .properties
1371 .iter()
1372 .map(|(k, _)| k.as_str().to_string())
1373 .collect();
1374 for key in keys {
1375 if let Some(tid) = tx_id {
1376 self.store.remove_edge_property_versioned(
1377 EdgeId(entity_id),
1378 &key,
1379 tid,
1380 );
1381 } else {
1382 self.store
1383 .remove_edge_property(EdgeId(entity_id), &key);
1384 }
1385 }
1386 }
1387 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1388 let keys: Vec<String> = node
1389 .properties
1390 .iter()
1391 .map(|(k, _)| k.as_str().to_string())
1392 .collect();
1393 for key in keys {
1394 if let Some(tid) = tx_id {
1395 self.store.remove_node_property_versioned(
1396 NodeId(entity_id),
1397 &key,
1398 tid,
1399 );
1400 } else {
1401 self.store
1402 .remove_node_property(NodeId(entity_id), &key);
1403 }
1404 }
1405 }
1406 }
1407 for (key, val) in map.iter() {
1409 if self.is_edge {
1410 if let Some(tid) = tx_id {
1411 self.store.set_edge_property_versioned(
1412 EdgeId(entity_id),
1413 key.as_str(),
1414 val.clone(),
1415 tid,
1416 );
1417 } else {
1418 self.store.set_edge_property(
1419 EdgeId(entity_id),
1420 key.as_str(),
1421 val.clone(),
1422 );
1423 }
1424 } else if let Some(tid) = tx_id {
1425 self.store.set_node_property_versioned(
1426 NodeId(entity_id),
1427 key.as_str(),
1428 val.clone(),
1429 tid,
1430 );
1431 } else {
1432 self.store.set_node_property(
1433 NodeId(entity_id),
1434 key.as_str(),
1435 val.clone(),
1436 );
1437 }
1438 }
1439 }
1440 } else if self.is_edge {
1441 if let Some(tid) = tx_id {
1442 self.store.set_edge_property_versioned(
1443 EdgeId(entity_id),
1444 &prop_name,
1445 value,
1446 tid,
1447 );
1448 } else {
1449 self.store
1450 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1451 }
1452 } else if let Some(tid) = tx_id {
1453 self.store.set_node_property_versioned(
1454 NodeId(entity_id),
1455 &prop_name,
1456 value,
1457 tid,
1458 );
1459 } else {
1460 self.store
1461 .set_node_property(NodeId(entity_id), &prop_name, value);
1462 }
1463 }
1464
1465 for col_idx in 0..chunk.column_count() {
1467 if let (Some(src), Some(dst)) =
1468 (chunk.column(col_idx), builder.column_mut(col_idx))
1469 {
1470 if let Some(val) = src.get_value(row) {
1471 dst.push_value(val);
1472 } else {
1473 dst.push_value(Value::Null);
1474 }
1475 }
1476 }
1477
1478 builder.advance_row();
1479 }
1480
1481 return Ok(Some(builder.finish()));
1482 }
1483 Ok(None)
1484 }
1485
1486 fn reset(&mut self) {
1487 self.input.reset();
1488 }
1489
1490 fn name(&self) -> &'static str {
1491 "SetProperty"
1492 }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497 use super::*;
1498 use crate::execution::DataChunk;
1499 use crate::execution::chunk::DataChunkBuilder;
1500 use crate::graph::lpg::LpgStore;
1501
1502 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1505 Arc::new(LpgStore::new().unwrap())
1506 }
1507
1508 struct MockInput {
1509 chunk: Option<DataChunk>,
1510 }
1511
1512 impl MockInput {
1513 fn boxed(chunk: DataChunk) -> Box<Self> {
1514 Box::new(Self { chunk: Some(chunk) })
1515 }
1516 }
1517
1518 impl Operator for MockInput {
1519 fn next(&mut self) -> OperatorResult {
1520 Ok(self.chunk.take())
1521 }
1522 fn reset(&mut self) {}
1523 fn name(&self) -> &'static str {
1524 "MockInput"
1525 }
1526 }
1527
1528 struct EmptyInput;
1529 impl Operator for EmptyInput {
1530 fn next(&mut self) -> OperatorResult {
1531 Ok(None)
1532 }
1533 fn reset(&mut self) {}
1534 fn name(&self) -> &'static str {
1535 "EmptyInput"
1536 }
1537 }
1538
1539 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1540 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1541 for id in ids {
1542 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1543 builder.advance_row();
1544 }
1545 builder.finish()
1546 }
1547
1548 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1549 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1550 for id in ids {
1551 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1552 builder.advance_row();
1553 }
1554 builder.finish()
1555 }
1556
1557 #[test]
1560 fn test_create_node_standalone() {
1561 let store = create_test_store();
1562
1563 let mut op = CreateNodeOperator::new(
1564 Arc::clone(&store),
1565 None,
1566 vec!["Person".to_string()],
1567 vec![(
1568 "name".to_string(),
1569 PropertySource::Constant(Value::String("Alix".into())),
1570 )],
1571 vec![LogicalType::Int64],
1572 0,
1573 );
1574
1575 let chunk = op.next().unwrap().unwrap();
1576 assert_eq!(chunk.row_count(), 1);
1577
1578 assert!(op.next().unwrap().is_none());
1580
1581 assert_eq!(store.node_count(), 1);
1582 }
1583
1584 #[test]
1585 fn test_create_edge() {
1586 let store = create_test_store();
1587
1588 let node1 = store.create_node(&["Person"]);
1589 let node2 = store.create_node(&["Person"]);
1590
1591 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1592 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1593 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1594 builder.advance_row();
1595
1596 let mut op = CreateEdgeOperator::new(
1597 Arc::clone(&store),
1598 MockInput::boxed(builder.finish()),
1599 0,
1600 1,
1601 "KNOWS".to_string(),
1602 vec![LogicalType::Int64, LogicalType::Int64],
1603 );
1604
1605 let _chunk = op.next().unwrap().unwrap();
1606 assert_eq!(store.edge_count(), 1);
1607 }
1608
1609 #[test]
1610 fn test_delete_node() {
1611 let store = create_test_store();
1612
1613 let node_id = store.create_node(&["Person"]);
1614 assert_eq!(store.node_count(), 1);
1615
1616 let mut op = DeleteNodeOperator::new(
1617 Arc::clone(&store),
1618 MockInput::boxed(node_id_chunk(&[node_id])),
1619 0,
1620 vec![LogicalType::Node],
1621 false,
1622 );
1623
1624 let chunk = op.next().unwrap().unwrap();
1625 assert_eq!(chunk.row_count(), 1);
1627 assert_eq!(store.node_count(), 0);
1628 }
1629
1630 #[test]
1633 fn test_delete_edge() {
1634 let store = create_test_store();
1635
1636 let n1 = store.create_node(&["Person"]);
1637 let n2 = store.create_node(&["Person"]);
1638 let eid = store.create_edge(n1, n2, "KNOWS");
1639 assert_eq!(store.edge_count(), 1);
1640
1641 let mut op = DeleteEdgeOperator::new(
1642 Arc::clone(&store),
1643 MockInput::boxed(edge_id_chunk(&[eid])),
1644 0,
1645 vec![LogicalType::Node],
1646 );
1647
1648 let chunk = op.next().unwrap().unwrap();
1649 assert_eq!(chunk.row_count(), 1);
1650 assert_eq!(store.edge_count(), 0);
1651 }
1652
1653 #[test]
1654 fn test_delete_edge_no_input_returns_none() {
1655 let store = create_test_store();
1656
1657 let mut op = DeleteEdgeOperator::new(
1658 Arc::clone(&store),
1659 Box::new(EmptyInput),
1660 0,
1661 vec![LogicalType::Int64],
1662 );
1663
1664 assert!(op.next().unwrap().is_none());
1665 }
1666
1667 #[test]
1668 fn test_delete_multiple_edges() {
1669 let store = create_test_store();
1670
1671 let n1 = store.create_node(&["N"]);
1672 let n2 = store.create_node(&["N"]);
1673 let e1 = store.create_edge(n1, n2, "R");
1674 let e2 = store.create_edge(n2, n1, "S");
1675 assert_eq!(store.edge_count(), 2);
1676
1677 let mut op = DeleteEdgeOperator::new(
1678 Arc::clone(&store),
1679 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1680 0,
1681 vec![LogicalType::Node],
1682 );
1683
1684 let chunk = op.next().unwrap().unwrap();
1685 assert_eq!(chunk.row_count(), 2);
1686 assert_eq!(store.edge_count(), 0);
1687 }
1688
1689 #[test]
1692 fn test_delete_node_detach() {
1693 let store = create_test_store();
1694
1695 let n1 = store.create_node(&["Person"]);
1696 let n2 = store.create_node(&["Person"]);
1697 store.create_edge(n1, n2, "KNOWS");
1698 store.create_edge(n2, n1, "FOLLOWS");
1699 assert_eq!(store.edge_count(), 2);
1700
1701 let mut op = DeleteNodeOperator::new(
1702 Arc::clone(&store),
1703 MockInput::boxed(node_id_chunk(&[n1])),
1704 0,
1705 vec![LogicalType::Node],
1706 true, );
1708
1709 let chunk = op.next().unwrap().unwrap();
1710 assert_eq!(chunk.row_count(), 1);
1711 assert_eq!(store.node_count(), 1);
1712 assert_eq!(store.edge_count(), 0); }
1714
1715 #[test]
1718 fn test_add_label() {
1719 let store = create_test_store();
1720
1721 let node = store.create_node(&["Person"]);
1722
1723 let mut op = AddLabelOperator::new(
1724 Arc::clone(&store),
1725 MockInput::boxed(node_id_chunk(&[node])),
1726 0,
1727 vec!["Employee".to_string()],
1728 vec![LogicalType::Int64],
1729 );
1730
1731 let chunk = op.next().unwrap().unwrap();
1732 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1733 assert_eq!(updated, 1);
1734
1735 let node_data = store.get_node(node).unwrap();
1737 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1738 assert!(labels.contains(&"Person"));
1739 assert!(labels.contains(&"Employee"));
1740 }
1741
1742 #[test]
1743 fn test_add_multiple_labels() {
1744 let store = create_test_store();
1745
1746 let node = store.create_node(&["Base"]);
1747
1748 let mut op = AddLabelOperator::new(
1749 Arc::clone(&store),
1750 MockInput::boxed(node_id_chunk(&[node])),
1751 0,
1752 vec!["LabelA".to_string(), "LabelB".to_string()],
1753 vec![LogicalType::Int64],
1754 );
1755
1756 let chunk = op.next().unwrap().unwrap();
1757 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1758 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1761 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1762 assert!(labels.contains(&"LabelA"));
1763 assert!(labels.contains(&"LabelB"));
1764 }
1765
1766 #[test]
1767 fn test_add_label_no_input_returns_none() {
1768 let store = create_test_store();
1769
1770 let mut op = AddLabelOperator::new(
1771 Arc::clone(&store),
1772 Box::new(EmptyInput),
1773 0,
1774 vec!["Foo".to_string()],
1775 vec![LogicalType::Int64],
1776 );
1777
1778 assert!(op.next().unwrap().is_none());
1779 }
1780
1781 #[test]
1784 fn test_remove_label() {
1785 let store = create_test_store();
1786
1787 let node = store.create_node(&["Person", "Employee"]);
1788
1789 let mut op = RemoveLabelOperator::new(
1790 Arc::clone(&store),
1791 MockInput::boxed(node_id_chunk(&[node])),
1792 0,
1793 vec!["Employee".to_string()],
1794 vec![LogicalType::Int64],
1795 );
1796
1797 let chunk = op.next().unwrap().unwrap();
1798 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1799 assert_eq!(updated, 1);
1800
1801 let node_data = store.get_node(node).unwrap();
1803 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1804 assert!(labels.contains(&"Person"));
1805 assert!(!labels.contains(&"Employee"));
1806 }
1807
1808 #[test]
1809 fn test_remove_nonexistent_label() {
1810 let store = create_test_store();
1811
1812 let node = store.create_node(&["Person"]);
1813
1814 let mut op = RemoveLabelOperator::new(
1815 Arc::clone(&store),
1816 MockInput::boxed(node_id_chunk(&[node])),
1817 0,
1818 vec!["NonExistent".to_string()],
1819 vec![LogicalType::Int64],
1820 );
1821
1822 let chunk = op.next().unwrap().unwrap();
1823 let updated = chunk.column(0).unwrap().get_int64(0).unwrap();
1824 assert_eq!(updated, 0); }
1826
1827 #[test]
1830 fn test_set_node_property_constant() {
1831 let store = create_test_store();
1832
1833 let node = store.create_node(&["Person"]);
1834
1835 let mut op = SetPropertyOperator::new_for_node(
1836 Arc::clone(&store),
1837 MockInput::boxed(node_id_chunk(&[node])),
1838 0,
1839 vec![(
1840 "name".to_string(),
1841 PropertySource::Constant(Value::String("Alix".into())),
1842 )],
1843 vec![LogicalType::Int64],
1844 );
1845
1846 let chunk = op.next().unwrap().unwrap();
1847 assert_eq!(chunk.row_count(), 1);
1848
1849 let node_data = store.get_node(node).unwrap();
1851 assert_eq!(
1852 node_data
1853 .properties
1854 .get(&grafeo_common::types::PropertyKey::new("name")),
1855 Some(&Value::String("Alix".into()))
1856 );
1857 }
1858
1859 #[test]
1860 fn test_set_node_property_from_column() {
1861 let store = create_test_store();
1862
1863 let node = store.create_node(&["Person"]);
1864
1865 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1867 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1868 builder
1869 .column_mut(1)
1870 .unwrap()
1871 .push_value(Value::String("Gus".into()));
1872 builder.advance_row();
1873
1874 let mut op = SetPropertyOperator::new_for_node(
1875 Arc::clone(&store),
1876 MockInput::boxed(builder.finish()),
1877 0,
1878 vec![("name".to_string(), PropertySource::Column(1))],
1879 vec![LogicalType::Int64, LogicalType::String],
1880 );
1881
1882 let chunk = op.next().unwrap().unwrap();
1883 assert_eq!(chunk.row_count(), 1);
1884
1885 let node_data = store.get_node(node).unwrap();
1886 assert_eq!(
1887 node_data
1888 .properties
1889 .get(&grafeo_common::types::PropertyKey::new("name")),
1890 Some(&Value::String("Gus".into()))
1891 );
1892 }
1893
1894 #[test]
1895 fn test_set_edge_property() {
1896 let store = create_test_store();
1897
1898 let n1 = store.create_node(&["N"]);
1899 let n2 = store.create_node(&["N"]);
1900 let eid = store.create_edge(n1, n2, "KNOWS");
1901
1902 let mut op = SetPropertyOperator::new_for_edge(
1903 Arc::clone(&store),
1904 MockInput::boxed(edge_id_chunk(&[eid])),
1905 0,
1906 vec![(
1907 "weight".to_string(),
1908 PropertySource::Constant(Value::Float64(0.75)),
1909 )],
1910 vec![LogicalType::Int64],
1911 );
1912
1913 let chunk = op.next().unwrap().unwrap();
1914 assert_eq!(chunk.row_count(), 1);
1915
1916 let edge_data = store.get_edge(eid).unwrap();
1917 assert_eq!(
1918 edge_data
1919 .properties
1920 .get(&grafeo_common::types::PropertyKey::new("weight")),
1921 Some(&Value::Float64(0.75))
1922 );
1923 }
1924
1925 #[test]
1926 fn test_set_multiple_properties() {
1927 let store = create_test_store();
1928
1929 let node = store.create_node(&["Person"]);
1930
1931 let mut op = SetPropertyOperator::new_for_node(
1932 Arc::clone(&store),
1933 MockInput::boxed(node_id_chunk(&[node])),
1934 0,
1935 vec![
1936 (
1937 "name".to_string(),
1938 PropertySource::Constant(Value::String("Alix".into())),
1939 ),
1940 (
1941 "age".to_string(),
1942 PropertySource::Constant(Value::Int64(30)),
1943 ),
1944 ],
1945 vec![LogicalType::Int64],
1946 );
1947
1948 op.next().unwrap().unwrap();
1949
1950 let node_data = store.get_node(node).unwrap();
1951 assert_eq!(
1952 node_data
1953 .properties
1954 .get(&grafeo_common::types::PropertyKey::new("name")),
1955 Some(&Value::String("Alix".into()))
1956 );
1957 assert_eq!(
1958 node_data
1959 .properties
1960 .get(&grafeo_common::types::PropertyKey::new("age")),
1961 Some(&Value::Int64(30))
1962 );
1963 }
1964
1965 #[test]
1966 fn test_set_property_no_input_returns_none() {
1967 let store = create_test_store();
1968
1969 let mut op = SetPropertyOperator::new_for_node(
1970 Arc::clone(&store),
1971 Box::new(EmptyInput),
1972 0,
1973 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
1974 vec![LogicalType::Int64],
1975 );
1976
1977 assert!(op.next().unwrap().is_none());
1978 }
1979
1980 #[test]
1983 fn test_delete_node_without_detach_errors_when_edges_exist() {
1984 let store = create_test_store();
1985
1986 let n1 = store.create_node(&["Person"]);
1987 let n2 = store.create_node(&["Person"]);
1988 store.create_edge(n1, n2, "KNOWS");
1989
1990 let mut op = DeleteNodeOperator::new(
1991 Arc::clone(&store),
1992 MockInput::boxed(node_id_chunk(&[n1])),
1993 0,
1994 vec![LogicalType::Int64],
1995 false, );
1997
1998 let err = op.next().unwrap_err();
1999 match err {
2000 OperatorError::ConstraintViolation(msg) => {
2001 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2002 }
2003 other => panic!("expected ConstraintViolation, got {other:?}"),
2004 }
2005 assert_eq!(store.node_count(), 2);
2007 }
2008
2009 #[test]
2012 fn test_create_node_with_input_operator() {
2013 let store = create_test_store();
2014
2015 let existing = store.create_node(&["Seed"]);
2017
2018 let mut op = CreateNodeOperator::new(
2019 Arc::clone(&store),
2020 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2021 vec!["Created".to_string()],
2022 vec![(
2023 "source".to_string(),
2024 PropertySource::Constant(Value::String("from_input".into())),
2025 )],
2026 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2029
2030 let chunk = op.next().unwrap().unwrap();
2031 assert_eq!(chunk.row_count(), 1);
2032
2033 assert_eq!(store.node_count(), 2);
2035
2036 assert!(op.next().unwrap().is_none());
2038 }
2039
2040 #[test]
2043 fn test_create_edge_with_properties_and_output_column() {
2044 let store = create_test_store();
2045
2046 let n1 = store.create_node(&["Person"]);
2047 let n2 = store.create_node(&["Person"]);
2048
2049 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2050 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2051 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2052 builder.advance_row();
2053
2054 let mut op = CreateEdgeOperator::new(
2055 Arc::clone(&store),
2056 MockInput::boxed(builder.finish()),
2057 0,
2058 1,
2059 "KNOWS".to_string(),
2060 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2061 )
2062 .with_properties(vec![(
2063 "since".to_string(),
2064 PropertySource::Constant(Value::Int64(2024)),
2065 )])
2066 .with_output_column(2);
2067
2068 let chunk = op.next().unwrap().unwrap();
2069 assert_eq!(chunk.row_count(), 1);
2070 assert_eq!(store.edge_count(), 1);
2071
2072 let edge_id_raw = chunk
2074 .column(2)
2075 .and_then(|c| c.get_int64(0))
2076 .expect("edge ID should be in output column 2");
2077 let edge_id = EdgeId(edge_id_raw as u64);
2078
2079 let edge = store.get_edge(edge_id).expect("edge should exist");
2081 assert_eq!(
2082 edge.properties
2083 .get(&grafeo_common::types::PropertyKey::new("since")),
2084 Some(&Value::Int64(2024))
2085 );
2086 }
2087
2088 #[test]
2091 fn test_set_property_map_replace() {
2092 use std::collections::BTreeMap;
2093
2094 let store = create_test_store();
2095
2096 let node = store.create_node(&["Person"]);
2097 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2098
2099 let mut map = BTreeMap::new();
2100 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2101
2102 let mut op = SetPropertyOperator::new_for_node(
2103 Arc::clone(&store),
2104 MockInput::boxed(node_id_chunk(&[node])),
2105 0,
2106 vec![(
2107 "*".to_string(),
2108 PropertySource::Constant(Value::Map(Arc::new(map))),
2109 )],
2110 vec![LogicalType::Int64],
2111 )
2112 .with_replace(true);
2113
2114 op.next().unwrap().unwrap();
2115
2116 let node_data = store.get_node(node).unwrap();
2117 assert!(
2119 node_data
2120 .properties
2121 .get(&PropertyKey::new("old_prop"))
2122 .is_none()
2123 );
2124 assert_eq!(
2126 node_data.properties.get(&PropertyKey::new("new_key")),
2127 Some(&Value::String("new_val".into()))
2128 );
2129 }
2130
2131 #[test]
2134 fn test_set_property_map_merge() {
2135 use std::collections::BTreeMap;
2136
2137 let store = create_test_store();
2138
2139 let node = store.create_node(&["Person"]);
2140 store.set_node_property(node, "existing", Value::Int64(42));
2141
2142 let mut map = BTreeMap::new();
2143 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2144
2145 let mut op = SetPropertyOperator::new_for_node(
2146 Arc::clone(&store),
2147 MockInput::boxed(node_id_chunk(&[node])),
2148 0,
2149 vec![(
2150 "*".to_string(),
2151 PropertySource::Constant(Value::Map(Arc::new(map))),
2152 )],
2153 vec![LogicalType::Int64],
2154 ); op.next().unwrap().unwrap();
2157
2158 let node_data = store.get_node(node).unwrap();
2159 assert_eq!(
2161 node_data.properties.get(&PropertyKey::new("existing")),
2162 Some(&Value::Int64(42))
2163 );
2164 assert_eq!(
2166 node_data.properties.get(&PropertyKey::new("added")),
2167 Some(&Value::String("hello".into()))
2168 );
2169 }
2170
2171 #[test]
2174 fn test_property_source_property_access() {
2175 let store = create_test_store();
2176
2177 let source_node = store.create_node(&["Source"]);
2178 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2179
2180 let target_node = store.create_node(&["Target"]);
2181
2182 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2184 builder.column_mut(0).unwrap().push_node_id(source_node);
2185 builder
2186 .column_mut(1)
2187 .unwrap()
2188 .push_int64(target_node.0 as i64);
2189 builder.advance_row();
2190
2191 let mut op = SetPropertyOperator::new_for_node(
2192 Arc::clone(&store),
2193 MockInput::boxed(builder.finish()),
2194 1, vec![(
2196 "copied_name".to_string(),
2197 PropertySource::PropertyAccess {
2198 column: 0,
2199 property: "name".to_string(),
2200 },
2201 )],
2202 vec![LogicalType::Node, LogicalType::Int64],
2203 );
2204
2205 op.next().unwrap().unwrap();
2206
2207 let target_data = store.get_node(target_node).unwrap();
2208 assert_eq!(
2209 target_data.properties.get(&PropertyKey::new("copied_name")),
2210 Some(&Value::String("Alix".into()))
2211 );
2212 }
2213
2214 #[test]
2217 fn test_create_node_with_constraint_validator() {
2218 let store = create_test_store();
2219
2220 struct RejectAgeValidator;
2221 impl ConstraintValidator for RejectAgeValidator {
2222 fn validate_node_property(
2223 &self,
2224 _labels: &[String],
2225 key: &str,
2226 _value: &Value,
2227 ) -> Result<(), OperatorError> {
2228 if key == "forbidden" {
2229 return Err(OperatorError::ConstraintViolation(
2230 "property 'forbidden' is not allowed".to_string(),
2231 ));
2232 }
2233 Ok(())
2234 }
2235 fn validate_node_complete(
2236 &self,
2237 _labels: &[String],
2238 _properties: &[(String, Value)],
2239 ) -> Result<(), OperatorError> {
2240 Ok(())
2241 }
2242 fn check_unique_node_property(
2243 &self,
2244 _labels: &[String],
2245 _key: &str,
2246 _value: &Value,
2247 ) -> Result<(), OperatorError> {
2248 Ok(())
2249 }
2250 fn validate_edge_property(
2251 &self,
2252 _edge_type: &str,
2253 _key: &str,
2254 _value: &Value,
2255 ) -> Result<(), OperatorError> {
2256 Ok(())
2257 }
2258 fn validate_edge_complete(
2259 &self,
2260 _edge_type: &str,
2261 _properties: &[(String, Value)],
2262 ) -> Result<(), OperatorError> {
2263 Ok(())
2264 }
2265 }
2266
2267 let mut op = CreateNodeOperator::new(
2269 Arc::clone(&store),
2270 None,
2271 vec!["Thing".to_string()],
2272 vec![(
2273 "name".to_string(),
2274 PropertySource::Constant(Value::String("ok".into())),
2275 )],
2276 vec![LogicalType::Int64],
2277 0,
2278 )
2279 .with_validator(Arc::new(RejectAgeValidator));
2280
2281 assert!(op.next().is_ok());
2282 assert_eq!(store.node_count(), 1);
2283
2284 let mut op = CreateNodeOperator::new(
2286 Arc::clone(&store),
2287 None,
2288 vec!["Thing".to_string()],
2289 vec![(
2290 "forbidden".to_string(),
2291 PropertySource::Constant(Value::Int64(1)),
2292 )],
2293 vec![LogicalType::Int64],
2294 0,
2295 )
2296 .with_validator(Arc::new(RejectAgeValidator));
2297
2298 let err = op.next().unwrap_err();
2299 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2300 }
2303
2304 #[test]
2307 fn test_create_node_reset_allows_re_execution() {
2308 let store = create_test_store();
2309
2310 let mut op = CreateNodeOperator::new(
2311 Arc::clone(&store),
2312 None,
2313 vec!["Person".to_string()],
2314 vec![],
2315 vec![LogicalType::Int64],
2316 0,
2317 );
2318
2319 assert!(op.next().unwrap().is_some());
2321 assert!(op.next().unwrap().is_none());
2322
2323 op.reset();
2325 assert!(op.next().unwrap().is_some());
2326
2327 assert_eq!(store.node_count(), 2);
2328 }
2329
2330 #[test]
2333 fn test_operator_names() {
2334 let store = create_test_store();
2335
2336 let op = CreateNodeOperator::new(
2337 Arc::clone(&store),
2338 None,
2339 vec![],
2340 vec![],
2341 vec![LogicalType::Int64],
2342 0,
2343 );
2344 assert_eq!(op.name(), "CreateNode");
2345
2346 let op = CreateEdgeOperator::new(
2347 Arc::clone(&store),
2348 Box::new(EmptyInput),
2349 0,
2350 1,
2351 "R".to_string(),
2352 vec![LogicalType::Int64],
2353 );
2354 assert_eq!(op.name(), "CreateEdge");
2355
2356 let op = DeleteNodeOperator::new(
2357 Arc::clone(&store),
2358 Box::new(EmptyInput),
2359 0,
2360 vec![LogicalType::Int64],
2361 false,
2362 );
2363 assert_eq!(op.name(), "DeleteNode");
2364
2365 let op = DeleteEdgeOperator::new(
2366 Arc::clone(&store),
2367 Box::new(EmptyInput),
2368 0,
2369 vec![LogicalType::Int64],
2370 );
2371 assert_eq!(op.name(), "DeleteEdge");
2372
2373 let op = AddLabelOperator::new(
2374 Arc::clone(&store),
2375 Box::new(EmptyInput),
2376 0,
2377 vec!["L".to_string()],
2378 vec![LogicalType::Int64],
2379 );
2380 assert_eq!(op.name(), "AddLabel");
2381
2382 let op = RemoveLabelOperator::new(
2383 Arc::clone(&store),
2384 Box::new(EmptyInput),
2385 0,
2386 vec!["L".to_string()],
2387 vec![LogicalType::Int64],
2388 );
2389 assert_eq!(op.name(), "RemoveLabel");
2390
2391 let op = SetPropertyOperator::new_for_node(
2392 Arc::clone(&store),
2393 Box::new(EmptyInput),
2394 0,
2395 vec![],
2396 vec![LogicalType::Int64],
2397 );
2398 assert_eq!(op.name(), "SetProperty");
2399 }
2400}