1use std::sync::Arc;
10
11use grafeo_common::types::{
12 EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
13};
14
15use super::{Operator, OperatorError, OperatorResult, SharedWriteTracker};
16use crate::execution::chunk::DataChunkBuilder;
17use crate::graph::{GraphStore, GraphStoreMut};
18
19pub trait ConstraintValidator: Send + Sync {
24 fn validate_node_property(
28 &self,
29 labels: &[String],
30 key: &str,
31 value: &Value,
32 ) -> Result<(), OperatorError>;
33
34 fn validate_node_complete(
38 &self,
39 labels: &[String],
40 properties: &[(String, Value)],
41 ) -> Result<(), OperatorError>;
42
43 fn check_unique_node_property(
47 &self,
48 labels: &[String],
49 key: &str,
50 value: &Value,
51 ) -> Result<(), OperatorError>;
52
53 fn validate_edge_property(
55 &self,
56 edge_type: &str,
57 key: &str,
58 value: &Value,
59 ) -> Result<(), OperatorError>;
60
61 fn validate_edge_complete(
63 &self,
64 edge_type: &str,
65 properties: &[(String, Value)],
66 ) -> Result<(), OperatorError>;
67
68 fn validate_node_labels_allowed(&self, labels: &[String]) -> Result<(), OperatorError> {
70 let _ = labels;
71 Ok(())
72 }
73
74 fn validate_edge_type_allowed(&self, edge_type: &str) -> Result<(), OperatorError> {
76 let _ = edge_type;
77 Ok(())
78 }
79
80 fn validate_edge_endpoints(
82 &self,
83 edge_type: &str,
84 source_labels: &[String],
85 target_labels: &[String],
86 ) -> Result<(), OperatorError> {
87 let _ = (edge_type, source_labels, target_labels);
88 Ok(())
89 }
90
91 fn inject_defaults(&self, labels: &[String], properties: &mut Vec<(String, Value)>) {
94 let _ = (labels, properties);
95 }
96}
97
98pub struct CreateNodeOperator {
103 store: Arc<dyn GraphStoreMut>,
105 input: Option<Box<dyn Operator>>,
107 labels: Vec<String>,
109 properties: Vec<(String, PropertySource)>,
111 output_schema: Vec<LogicalType>,
113 output_column: usize,
115 executed: bool,
117 viewing_epoch: Option<EpochId>,
119 transaction_id: Option<TransactionId>,
121 validator: Option<Arc<dyn ConstraintValidator>>,
123 write_tracker: Option<SharedWriteTracker>,
125}
126
127#[derive(Debug, Clone)]
129pub enum PropertySource {
130 Column(usize),
132 Constant(Value),
134 PropertyAccess {
136 column: usize,
138 property: String,
140 },
141}
142
143impl PropertySource {
144 pub fn resolve(
146 &self,
147 chunk: &crate::execution::chunk::DataChunk,
148 row: usize,
149 store: &dyn GraphStore,
150 ) -> Value {
151 match self {
152 PropertySource::Column(col_idx) => chunk
153 .column(*col_idx)
154 .and_then(|c| c.get_value(row))
155 .unwrap_or(Value::Null),
156 PropertySource::Constant(v) => v.clone(),
157 PropertySource::PropertyAccess { column, property } => {
158 let Some(col) = chunk.column(*column) else {
159 return Value::Null;
160 };
161 if let Some(node_id) = col.get_node_id(row) {
163 store
164 .get_node(node_id)
165 .and_then(|node| node.get_property(property).cloned())
166 .unwrap_or(Value::Null)
167 } else if let Some(edge_id) = col.get_edge_id(row) {
168 store
169 .get_edge(edge_id)
170 .and_then(|edge| edge.get_property(property).cloned())
171 .unwrap_or(Value::Null)
172 } else if let Some(Value::Map(map)) = col.get_value(row) {
173 let key = PropertyKey::new(property);
174 map.get(&key).cloned().unwrap_or(Value::Null)
175 } else {
176 Value::Null
177 }
178 }
179 }
180 }
181}
182
183impl CreateNodeOperator {
184 pub fn new(
194 store: Arc<dyn GraphStoreMut>,
195 input: Option<Box<dyn Operator>>,
196 labels: Vec<String>,
197 properties: Vec<(String, PropertySource)>,
198 output_schema: Vec<LogicalType>,
199 output_column: usize,
200 ) -> Self {
201 Self {
202 store,
203 input,
204 labels,
205 properties,
206 output_schema,
207 output_column,
208 executed: false,
209 viewing_epoch: None,
210 transaction_id: None,
211 validator: None,
212 write_tracker: None,
213 }
214 }
215
216 pub fn with_transaction_context(
218 mut self,
219 epoch: EpochId,
220 transaction_id: Option<TransactionId>,
221 ) -> Self {
222 self.viewing_epoch = Some(epoch);
223 self.transaction_id = transaction_id;
224 self
225 }
226
227 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
229 self.validator = Some(validator);
230 self
231 }
232
233 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
235 self.write_tracker = Some(tracker);
236 self
237 }
238}
239
240impl CreateNodeOperator {
241 fn validate_and_set_properties(
243 &self,
244 node_id: NodeId,
245 resolved_props: &mut Vec<(String, Value)>,
246 ) -> Result<(), OperatorError> {
247 if let Some(ref validator) = self.validator {
249 validator.validate_node_labels_allowed(&self.labels)?;
250 }
251
252 if let Some(ref validator) = self.validator {
254 validator.inject_defaults(&self.labels, resolved_props);
255 }
256
257 if let Some(ref validator) = self.validator {
259 for (name, value) in resolved_props.iter() {
260 validator.validate_node_property(&self.labels, name, value)?;
261 validator.check_unique_node_property(&self.labels, name, value)?;
262 }
263 validator.validate_node_complete(&self.labels, resolved_props)?;
265 }
266
267 if let Some(tid) = self.transaction_id {
269 for (name, value) in resolved_props.iter() {
270 self.store
271 .set_node_property_versioned(node_id, name, value.clone(), tid);
272 }
273 } else {
274 for (name, value) in resolved_props.iter() {
275 self.store.set_node_property(node_id, name, value.clone());
276 }
277 }
278 Ok(())
279 }
280}
281
282impl Operator for CreateNodeOperator {
283 fn next(&mut self) -> OperatorResult {
284 let epoch = self
286 .viewing_epoch
287 .unwrap_or_else(|| self.store.current_epoch());
288 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
289
290 if let Some(ref mut input) = self.input {
291 if let Some(chunk) = input.next()? {
293 let mut builder =
294 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
295
296 for row in chunk.selected_indices() {
297 let mut resolved_props: Vec<(String, Value)> = self
299 .properties
300 .iter()
301 .map(|(name, source)| {
302 let value =
303 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
304 (name.clone(), value)
305 })
306 .collect();
307
308 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
310 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
311
312 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
314 tracker.record_node_write(tid, node_id)?;
315 }
316
317 self.validate_and_set_properties(node_id, &mut resolved_props)?;
319
320 for col_idx in 0..chunk.column_count() {
322 if col_idx < self.output_column
323 && let (Some(src), Some(dst)) =
324 (chunk.column(col_idx), builder.column_mut(col_idx))
325 {
326 if let Some(val) = src.get_value(row) {
327 dst.push_value(val);
328 } else {
329 dst.push_value(Value::Null);
330 }
331 }
332 }
333
334 if let Some(dst) = builder.column_mut(self.output_column) {
336 dst.push_value(Value::Int64(node_id.0 as i64));
337 }
338
339 builder.advance_row();
340 }
341
342 return Ok(Some(builder.finish()));
343 }
344 Ok(None)
345 } else {
346 if self.executed {
348 return Ok(None);
349 }
350 self.executed = true;
351
352 let mut resolved_props: Vec<(String, Value)> = self
354 .properties
355 .iter()
356 .filter_map(|(name, source)| {
357 if let PropertySource::Constant(value) = source {
358 Some((name.clone(), value.clone()))
359 } else {
360 None
361 }
362 })
363 .collect();
364
365 let label_refs: Vec<&str> = self.labels.iter().map(String::as_str).collect();
367 let node_id = self.store.create_node_versioned(&label_refs, epoch, tx);
368
369 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
371 tracker.record_node_write(tid, node_id)?;
372 }
373
374 self.validate_and_set_properties(node_id, &mut resolved_props)?;
376
377 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 1);
379 if let Some(dst) = builder.column_mut(self.output_column) {
380 dst.push_value(Value::Int64(node_id.0 as i64));
381 }
382 builder.advance_row();
383
384 Ok(Some(builder.finish()))
385 }
386 }
387
388 fn reset(&mut self) {
389 if let Some(ref mut input) = self.input {
390 input.reset();
391 }
392 self.executed = false;
393 }
394
395 fn name(&self) -> &'static str {
396 "CreateNode"
397 }
398}
399
400pub struct CreateEdgeOperator {
402 store: Arc<dyn GraphStoreMut>,
404 input: Box<dyn Operator>,
406 from_column: usize,
408 to_column: usize,
410 edge_type: String,
412 properties: Vec<(String, PropertySource)>,
414 output_schema: Vec<LogicalType>,
416 output_column: Option<usize>,
418 viewing_epoch: Option<EpochId>,
420 transaction_id: Option<TransactionId>,
422 validator: Option<Arc<dyn ConstraintValidator>>,
424 write_tracker: Option<SharedWriteTracker>,
426}
427
428impl CreateEdgeOperator {
429 pub fn new(
436 store: Arc<dyn GraphStoreMut>,
437 input: Box<dyn Operator>,
438 from_column: usize,
439 to_column: usize,
440 edge_type: String,
441 output_schema: Vec<LogicalType>,
442 ) -> Self {
443 Self {
444 store,
445 input,
446 from_column,
447 to_column,
448 edge_type,
449 properties: Vec::new(),
450 output_schema,
451 output_column: None,
452 viewing_epoch: None,
453 transaction_id: None,
454 validator: None,
455 write_tracker: None,
456 }
457 }
458
459 pub fn with_properties(mut self, properties: Vec<(String, PropertySource)>) -> Self {
461 self.properties = properties;
462 self
463 }
464
465 pub fn with_output_column(mut self, column: usize) -> Self {
467 self.output_column = Some(column);
468 self
469 }
470
471 pub fn with_transaction_context(
473 mut self,
474 epoch: EpochId,
475 transaction_id: Option<TransactionId>,
476 ) -> Self {
477 self.viewing_epoch = Some(epoch);
478 self.transaction_id = transaction_id;
479 self
480 }
481
482 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
484 self.validator = Some(validator);
485 self
486 }
487
488 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
490 self.write_tracker = Some(tracker);
491 self
492 }
493}
494
495impl Operator for CreateEdgeOperator {
496 fn next(&mut self) -> OperatorResult {
497 let epoch = self
499 .viewing_epoch
500 .unwrap_or_else(|| self.store.current_epoch());
501 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
502
503 if let Some(chunk) = self.input.next()? {
504 let mut builder =
505 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
506
507 for row in chunk.selected_indices() {
508 let from_id = chunk
510 .column(self.from_column)
511 .and_then(|c| c.get_value(row))
512 .ok_or_else(|| {
513 OperatorError::ColumnNotFound(format!("from column {}", self.from_column))
514 })?;
515
516 let to_id = chunk
517 .column(self.to_column)
518 .and_then(|c| c.get_value(row))
519 .ok_or_else(|| {
520 OperatorError::ColumnNotFound(format!("to column {}", self.to_column))
521 })?;
522
523 let from_node_id = match from_id {
525 Value::Int64(id) => NodeId(id as u64),
526 _ => {
527 return Err(OperatorError::TypeMismatch {
528 expected: "Int64 (node ID)".to_string(),
529 found: format!("{from_id:?}"),
530 });
531 }
532 };
533
534 let to_node_id = match to_id {
535 Value::Int64(id) => NodeId(id as u64),
536 _ => {
537 return Err(OperatorError::TypeMismatch {
538 expected: "Int64 (node ID)".to_string(),
539 found: format!("{to_id:?}"),
540 });
541 }
542 };
543
544 if let Some(ref validator) = self.validator {
546 validator.validate_edge_type_allowed(&self.edge_type)?;
547
548 let source_labels: Vec<String> = self
550 .store
551 .get_node(from_node_id)
552 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
553 .unwrap_or_default();
554 let target_labels: Vec<String> = self
555 .store
556 .get_node(to_node_id)
557 .map(|n| n.labels.iter().map(|l| l.to_string()).collect())
558 .unwrap_or_default();
559 validator.validate_edge_endpoints(
560 &self.edge_type,
561 &source_labels,
562 &target_labels,
563 )?;
564 }
565
566 let resolved_props: Vec<(String, Value)> = self
568 .properties
569 .iter()
570 .map(|(name, source)| {
571 let value =
572 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
573 (name.clone(), value)
574 })
575 .collect();
576
577 if let Some(ref validator) = self.validator {
579 for (name, value) in &resolved_props {
580 validator.validate_edge_property(&self.edge_type, name, value)?;
581 }
582 validator.validate_edge_complete(&self.edge_type, &resolved_props)?;
583 }
584
585 let edge_id = self.store.create_edge_versioned(
587 from_node_id,
588 to_node_id,
589 &self.edge_type,
590 epoch,
591 tx,
592 );
593
594 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
596 tracker.record_edge_write(tid, edge_id)?;
597 }
598
599 if let Some(tid) = self.transaction_id {
601 for (name, value) in resolved_props {
602 self.store
603 .set_edge_property_versioned(edge_id, &name, value, tid);
604 }
605 } else {
606 for (name, value) in resolved_props {
607 self.store.set_edge_property(edge_id, &name, value);
608 }
609 }
610
611 for col_idx in 0..chunk.column_count() {
613 if let (Some(src), Some(dst)) =
614 (chunk.column(col_idx), builder.column_mut(col_idx))
615 {
616 if let Some(val) = src.get_value(row) {
617 dst.push_value(val);
618 } else {
619 dst.push_value(Value::Null);
620 }
621 }
622 }
623
624 if let Some(out_col) = self.output_column
626 && let Some(dst) = builder.column_mut(out_col)
627 {
628 dst.push_value(Value::Int64(edge_id.0 as i64));
629 }
630
631 builder.advance_row();
632 }
633
634 return Ok(Some(builder.finish()));
635 }
636 Ok(None)
637 }
638
639 fn reset(&mut self) {
640 self.input.reset();
641 }
642
643 fn name(&self) -> &'static str {
644 "CreateEdge"
645 }
646}
647
648pub struct DeleteNodeOperator {
650 store: Arc<dyn GraphStoreMut>,
652 input: Box<dyn Operator>,
654 node_column: usize,
656 output_schema: Vec<LogicalType>,
658 detach: bool,
660 viewing_epoch: Option<EpochId>,
662 transaction_id: Option<TransactionId>,
664 write_tracker: Option<SharedWriteTracker>,
666}
667
668impl DeleteNodeOperator {
669 pub fn new(
671 store: Arc<dyn GraphStoreMut>,
672 input: Box<dyn Operator>,
673 node_column: usize,
674 output_schema: Vec<LogicalType>,
675 detach: bool,
676 ) -> Self {
677 Self {
678 store,
679 input,
680 node_column,
681 output_schema,
682 detach,
683 viewing_epoch: None,
684 transaction_id: None,
685 write_tracker: None,
686 }
687 }
688
689 pub fn with_transaction_context(
691 mut self,
692 epoch: EpochId,
693 transaction_id: Option<TransactionId>,
694 ) -> Self {
695 self.viewing_epoch = Some(epoch);
696 self.transaction_id = transaction_id;
697 self
698 }
699
700 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
702 self.write_tracker = Some(tracker);
703 self
704 }
705}
706
707impl Operator for DeleteNodeOperator {
708 fn next(&mut self) -> OperatorResult {
709 let epoch = self
711 .viewing_epoch
712 .unwrap_or_else(|| self.store.current_epoch());
713 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
714
715 if let Some(chunk) = self.input.next()? {
716 let mut builder =
717 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
718
719 for row in chunk.selected_indices() {
720 let node_val = chunk
721 .column(self.node_column)
722 .and_then(|c| c.get_value(row))
723 .ok_or_else(|| {
724 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
725 })?;
726
727 let node_id = match node_val {
728 Value::Int64(id) => NodeId(id as u64),
729 _ => {
730 return Err(OperatorError::TypeMismatch {
731 expected: "Int64 (node ID)".to_string(),
732 found: format!("{node_val:?}"),
733 });
734 }
735 };
736
737 if self.detach {
738 let outgoing = self
741 .store
742 .edges_from(node_id, crate::graph::Direction::Outgoing);
743 let incoming = self
744 .store
745 .edges_from(node_id, crate::graph::Direction::Incoming);
746 for (_, edge_id) in outgoing.into_iter().chain(incoming) {
747 self.store.delete_edge_versioned(edge_id, epoch, tx);
748 if let (Some(tracker), Some(tid)) =
749 (&self.write_tracker, self.transaction_id)
750 {
751 tracker.record_edge_write(tid, edge_id)?;
752 }
753 }
754 } else {
755 let degree = self.store.out_degree(node_id) + self.store.in_degree(node_id);
757 if degree > 0 {
758 return Err(OperatorError::ConstraintViolation(format!(
759 "Cannot delete node with {} connected edge(s). Use DETACH DELETE.",
760 degree
761 )));
762 }
763 }
764
765 self.store.delete_node_versioned(node_id, epoch, tx);
767
768 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
770 tracker.record_node_write(tid, node_id)?;
771 }
772
773 for col_idx in 0..chunk.column_count() {
776 if let (Some(src), Some(dst)) =
777 (chunk.column(col_idx), builder.column_mut(col_idx))
778 {
779 if let Some(val) = src.get_value(row) {
780 dst.push_value(val);
781 } else {
782 dst.push_value(Value::Null);
783 }
784 }
785 }
786 builder.advance_row();
787 }
788
789 return Ok(Some(builder.finish()));
790 }
791 Ok(None)
792 }
793
794 fn reset(&mut self) {
795 self.input.reset();
796 }
797
798 fn name(&self) -> &'static str {
799 "DeleteNode"
800 }
801}
802
803pub struct DeleteEdgeOperator {
805 store: Arc<dyn GraphStoreMut>,
807 input: Box<dyn Operator>,
809 edge_column: usize,
811 output_schema: Vec<LogicalType>,
813 viewing_epoch: Option<EpochId>,
815 transaction_id: Option<TransactionId>,
817 write_tracker: Option<SharedWriteTracker>,
819}
820
821impl DeleteEdgeOperator {
822 pub fn new(
824 store: Arc<dyn GraphStoreMut>,
825 input: Box<dyn Operator>,
826 edge_column: usize,
827 output_schema: Vec<LogicalType>,
828 ) -> Self {
829 Self {
830 store,
831 input,
832 edge_column,
833 output_schema,
834 viewing_epoch: None,
835 transaction_id: None,
836 write_tracker: None,
837 }
838 }
839
840 pub fn with_transaction_context(
842 mut self,
843 epoch: EpochId,
844 transaction_id: Option<TransactionId>,
845 ) -> Self {
846 self.viewing_epoch = Some(epoch);
847 self.transaction_id = transaction_id;
848 self
849 }
850
851 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
853 self.write_tracker = Some(tracker);
854 self
855 }
856}
857
858impl Operator for DeleteEdgeOperator {
859 fn next(&mut self) -> OperatorResult {
860 let epoch = self
862 .viewing_epoch
863 .unwrap_or_else(|| self.store.current_epoch());
864 let tx = self.transaction_id.unwrap_or(TransactionId::SYSTEM);
865
866 if let Some(chunk) = self.input.next()? {
867 let mut builder =
868 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
869
870 for row in chunk.selected_indices() {
871 let edge_val = chunk
872 .column(self.edge_column)
873 .and_then(|c| c.get_value(row))
874 .ok_or_else(|| {
875 OperatorError::ColumnNotFound(format!("edge column {}", self.edge_column))
876 })?;
877
878 let edge_id = match edge_val {
879 Value::Int64(id) => EdgeId(id as u64),
880 _ => {
881 return Err(OperatorError::TypeMismatch {
882 expected: "Int64 (edge ID)".to_string(),
883 found: format!("{edge_val:?}"),
884 });
885 }
886 };
887
888 self.store.delete_edge_versioned(edge_id, epoch, tx);
890
891 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
893 tracker.record_edge_write(tid, edge_id)?;
894 }
895
896 for col_idx in 0..chunk.column_count() {
898 if let (Some(src), Some(dst)) =
899 (chunk.column(col_idx), builder.column_mut(col_idx))
900 {
901 if let Some(val) = src.get_value(row) {
902 dst.push_value(val);
903 } else {
904 dst.push_value(Value::Null);
905 }
906 }
907 }
908 builder.advance_row();
909 }
910
911 return Ok(Some(builder.finish()));
912 }
913 Ok(None)
914 }
915
916 fn reset(&mut self) {
917 self.input.reset();
918 }
919
920 fn name(&self) -> &'static str {
921 "DeleteEdge"
922 }
923}
924
925pub struct AddLabelOperator {
927 store: Arc<dyn GraphStoreMut>,
929 input: Box<dyn Operator>,
931 node_column: usize,
933 labels: Vec<String>,
935 output_schema: Vec<LogicalType>,
937 count_column: usize,
939 viewing_epoch: Option<EpochId>,
941 transaction_id: Option<TransactionId>,
943 write_tracker: Option<SharedWriteTracker>,
945}
946
947impl AddLabelOperator {
948 pub fn new(
950 store: Arc<dyn GraphStoreMut>,
951 input: Box<dyn Operator>,
952 node_column: usize,
953 labels: Vec<String>,
954 output_schema: Vec<LogicalType>,
955 ) -> Self {
956 let count_column = output_schema.len() - 1;
957 Self {
958 store,
959 input,
960 node_column,
961 labels,
962 count_column,
963 output_schema,
964 viewing_epoch: None,
965 transaction_id: None,
966 write_tracker: None,
967 }
968 }
969
970 pub fn with_transaction_context(
972 mut self,
973 epoch: EpochId,
974 transaction_id: Option<TransactionId>,
975 ) -> Self {
976 self.viewing_epoch = Some(epoch);
977 self.transaction_id = transaction_id;
978 self
979 }
980
981 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
983 self.write_tracker = Some(tracker);
984 self
985 }
986}
987
988impl Operator for AddLabelOperator {
989 fn next(&mut self) -> OperatorResult {
990 if let Some(chunk) = self.input.next()? {
991 let mut builder =
992 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
993
994 for row in chunk.selected_indices() {
995 let node_val = chunk
996 .column(self.node_column)
997 .and_then(|c| c.get_value(row))
998 .ok_or_else(|| {
999 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1000 })?;
1001
1002 let node_id = match node_val {
1003 Value::Int64(id) => NodeId(id as u64),
1004 _ => {
1005 return Err(OperatorError::TypeMismatch {
1006 expected: "Int64 (node ID)".to_string(),
1007 found: format!("{node_val:?}"),
1008 });
1009 }
1010 };
1011
1012 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1014 tracker.record_node_write(tid, node_id)?;
1015 }
1016
1017 let mut row_count: i64 = 0;
1019 for label in &self.labels {
1020 let added = if let Some(tid) = self.transaction_id {
1021 self.store.add_label_versioned(node_id, label, tid)
1022 } else {
1023 self.store.add_label(node_id, label)
1024 };
1025 if added {
1026 row_count += 1;
1027 }
1028 }
1029
1030 for col_idx in 0..chunk.column_count() {
1032 if let (Some(src), Some(dst)) =
1033 (chunk.column(col_idx), builder.column_mut(col_idx))
1034 {
1035 if let Some(val) = src.get_value(row) {
1036 dst.push_value(val);
1037 } else {
1038 dst.push_value(Value::Null);
1039 }
1040 }
1041 }
1042 if let Some(dst) = builder.column_mut(self.count_column) {
1044 dst.push_value(Value::Int64(row_count));
1045 }
1046
1047 builder.advance_row();
1048 }
1049
1050 return Ok(Some(builder.finish()));
1051 }
1052 Ok(None)
1053 }
1054
1055 fn reset(&mut self) {
1056 self.input.reset();
1057 }
1058
1059 fn name(&self) -> &'static str {
1060 "AddLabel"
1061 }
1062}
1063
1064pub struct RemoveLabelOperator {
1066 store: Arc<dyn GraphStoreMut>,
1068 input: Box<dyn Operator>,
1070 node_column: usize,
1072 labels: Vec<String>,
1074 output_schema: Vec<LogicalType>,
1076 count_column: usize,
1078 viewing_epoch: Option<EpochId>,
1080 transaction_id: Option<TransactionId>,
1082 write_tracker: Option<SharedWriteTracker>,
1084}
1085
1086impl RemoveLabelOperator {
1087 pub fn new(
1089 store: Arc<dyn GraphStoreMut>,
1090 input: Box<dyn Operator>,
1091 node_column: usize,
1092 labels: Vec<String>,
1093 output_schema: Vec<LogicalType>,
1094 ) -> Self {
1095 let count_column = output_schema.len() - 1;
1096 Self {
1097 store,
1098 input,
1099 node_column,
1100 labels,
1101 count_column,
1102 output_schema,
1103 viewing_epoch: None,
1104 transaction_id: None,
1105 write_tracker: None,
1106 }
1107 }
1108
1109 pub fn with_transaction_context(
1111 mut self,
1112 epoch: EpochId,
1113 transaction_id: Option<TransactionId>,
1114 ) -> Self {
1115 self.viewing_epoch = Some(epoch);
1116 self.transaction_id = transaction_id;
1117 self
1118 }
1119
1120 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1122 self.write_tracker = Some(tracker);
1123 self
1124 }
1125}
1126
1127impl Operator for RemoveLabelOperator {
1128 fn next(&mut self) -> OperatorResult {
1129 if let Some(chunk) = self.input.next()? {
1130 let mut builder =
1131 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1132
1133 for row in chunk.selected_indices() {
1134 let node_val = chunk
1135 .column(self.node_column)
1136 .and_then(|c| c.get_value(row))
1137 .ok_or_else(|| {
1138 OperatorError::ColumnNotFound(format!("node column {}", self.node_column))
1139 })?;
1140
1141 let node_id = match node_val {
1142 Value::Int64(id) => NodeId(id as u64),
1143 _ => {
1144 return Err(OperatorError::TypeMismatch {
1145 expected: "Int64 (node ID)".to_string(),
1146 found: format!("{node_val:?}"),
1147 });
1148 }
1149 };
1150
1151 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1153 tracker.record_node_write(tid, node_id)?;
1154 }
1155
1156 let mut row_count: i64 = 0;
1158 for label in &self.labels {
1159 let removed = if let Some(tid) = self.transaction_id {
1160 self.store.remove_label_versioned(node_id, label, tid)
1161 } else {
1162 self.store.remove_label(node_id, label)
1163 };
1164 if removed {
1165 row_count += 1;
1166 }
1167 }
1168
1169 for col_idx in 0..chunk.column_count() {
1171 if let (Some(src), Some(dst)) =
1172 (chunk.column(col_idx), builder.column_mut(col_idx))
1173 {
1174 if let Some(val) = src.get_value(row) {
1175 dst.push_value(val);
1176 } else {
1177 dst.push_value(Value::Null);
1178 }
1179 }
1180 }
1181 if let Some(dst) = builder.column_mut(self.count_column) {
1183 dst.push_value(Value::Int64(row_count));
1184 }
1185
1186 builder.advance_row();
1187 }
1188
1189 return Ok(Some(builder.finish()));
1190 }
1191 Ok(None)
1192 }
1193
1194 fn reset(&mut self) {
1195 self.input.reset();
1196 }
1197
1198 fn name(&self) -> &'static str {
1199 "RemoveLabel"
1200 }
1201}
1202
1203pub struct SetPropertyOperator {
1208 store: Arc<dyn GraphStoreMut>,
1210 input: Box<dyn Operator>,
1212 entity_column: usize,
1214 is_edge: bool,
1216 properties: Vec<(String, PropertySource)>,
1218 output_schema: Vec<LogicalType>,
1220 replace: bool,
1222 validator: Option<Arc<dyn ConstraintValidator>>,
1224 labels: Vec<String>,
1226 edge_type_name: Option<String>,
1228 viewing_epoch: Option<EpochId>,
1230 transaction_id: Option<TransactionId>,
1232 write_tracker: Option<SharedWriteTracker>,
1234}
1235
1236impl SetPropertyOperator {
1237 pub fn new_for_node(
1239 store: Arc<dyn GraphStoreMut>,
1240 input: Box<dyn Operator>,
1241 node_column: usize,
1242 properties: Vec<(String, PropertySource)>,
1243 output_schema: Vec<LogicalType>,
1244 ) -> Self {
1245 Self {
1246 store,
1247 input,
1248 entity_column: node_column,
1249 is_edge: false,
1250 properties,
1251 output_schema,
1252 replace: false,
1253 validator: None,
1254 labels: Vec::new(),
1255 edge_type_name: None,
1256 viewing_epoch: None,
1257 transaction_id: None,
1258 write_tracker: None,
1259 }
1260 }
1261
1262 pub fn new_for_edge(
1264 store: Arc<dyn GraphStoreMut>,
1265 input: Box<dyn Operator>,
1266 edge_column: usize,
1267 properties: Vec<(String, PropertySource)>,
1268 output_schema: Vec<LogicalType>,
1269 ) -> Self {
1270 Self {
1271 store,
1272 input,
1273 entity_column: edge_column,
1274 is_edge: true,
1275 properties,
1276 output_schema,
1277 replace: false,
1278 validator: None,
1279 labels: Vec::new(),
1280 edge_type_name: None,
1281 viewing_epoch: None,
1282 transaction_id: None,
1283 write_tracker: None,
1284 }
1285 }
1286
1287 pub fn with_replace(mut self, replace: bool) -> Self {
1289 self.replace = replace;
1290 self
1291 }
1292
1293 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
1295 self.validator = Some(validator);
1296 self
1297 }
1298
1299 pub fn with_labels(mut self, labels: Vec<String>) -> Self {
1301 self.labels = labels;
1302 self
1303 }
1304
1305 pub fn with_edge_type(mut self, edge_type: String) -> Self {
1307 self.edge_type_name = Some(edge_type);
1308 self
1309 }
1310
1311 pub fn with_transaction_context(
1316 mut self,
1317 epoch: EpochId,
1318 transaction_id: Option<TransactionId>,
1319 ) -> Self {
1320 self.viewing_epoch = Some(epoch);
1321 self.transaction_id = transaction_id;
1322 self
1323 }
1324
1325 pub fn with_write_tracker(mut self, tracker: SharedWriteTracker) -> Self {
1327 self.write_tracker = Some(tracker);
1328 self
1329 }
1330}
1331
1332impl Operator for SetPropertyOperator {
1333 fn next(&mut self) -> OperatorResult {
1334 if let Some(chunk) = self.input.next()? {
1335 let mut builder =
1336 DataChunkBuilder::with_capacity(&self.output_schema, chunk.row_count());
1337
1338 for row in chunk.selected_indices() {
1339 let entity_val = chunk
1340 .column(self.entity_column)
1341 .and_then(|c| c.get_value(row))
1342 .ok_or_else(|| {
1343 OperatorError::ColumnNotFound(format!(
1344 "entity column {}",
1345 self.entity_column
1346 ))
1347 })?;
1348
1349 let entity_id = match entity_val {
1350 Value::Int64(id) => id as u64,
1351 _ => {
1352 return Err(OperatorError::TypeMismatch {
1353 expected: "Int64 (entity ID)".to_string(),
1354 found: format!("{entity_val:?}"),
1355 });
1356 }
1357 };
1358
1359 if let (Some(tracker), Some(tid)) = (&self.write_tracker, self.transaction_id) {
1361 if self.is_edge {
1362 tracker.record_edge_write(tid, EdgeId(entity_id))?;
1363 } else {
1364 tracker.record_node_write(tid, NodeId(entity_id))?;
1365 }
1366 }
1367
1368 let resolved_props: Vec<(String, Value)> = self
1370 .properties
1371 .iter()
1372 .map(|(name, source)| {
1373 let value =
1374 source.resolve(&chunk, row, self.store.as_ref() as &dyn GraphStore);
1375 (name.clone(), value)
1376 })
1377 .collect();
1378
1379 if let Some(ref validator) = self.validator {
1381 if self.is_edge {
1382 if let Some(ref et) = self.edge_type_name {
1383 for (name, value) in &resolved_props {
1384 validator.validate_edge_property(et, name, value)?;
1385 }
1386 }
1387 } else {
1388 for (name, value) in &resolved_props {
1389 validator.validate_node_property(&self.labels, name, value)?;
1390 validator.check_unique_node_property(&self.labels, name, value)?;
1391 }
1392 }
1393 }
1394
1395 let tx_id = self.transaction_id;
1397 for (prop_name, value) in resolved_props {
1398 if prop_name == "*" {
1399 if let Value::Map(map) = value {
1401 if self.replace {
1402 if self.is_edge {
1404 if let Some(edge) = self.store.get_edge(EdgeId(entity_id)) {
1405 let keys: Vec<String> = edge
1406 .properties
1407 .iter()
1408 .map(|(k, _)| k.as_str().to_string())
1409 .collect();
1410 for key in keys {
1411 if let Some(tid) = tx_id {
1412 self.store.remove_edge_property_versioned(
1413 EdgeId(entity_id),
1414 &key,
1415 tid,
1416 );
1417 } else {
1418 self.store
1419 .remove_edge_property(EdgeId(entity_id), &key);
1420 }
1421 }
1422 }
1423 } else if let Some(node) = self.store.get_node(NodeId(entity_id)) {
1424 let keys: Vec<String> = node
1425 .properties
1426 .iter()
1427 .map(|(k, _)| k.as_str().to_string())
1428 .collect();
1429 for key in keys {
1430 if let Some(tid) = tx_id {
1431 self.store.remove_node_property_versioned(
1432 NodeId(entity_id),
1433 &key,
1434 tid,
1435 );
1436 } else {
1437 self.store
1438 .remove_node_property(NodeId(entity_id), &key);
1439 }
1440 }
1441 }
1442 }
1443 for (key, val) in map.iter() {
1445 if self.is_edge {
1446 if let Some(tid) = tx_id {
1447 self.store.set_edge_property_versioned(
1448 EdgeId(entity_id),
1449 key.as_str(),
1450 val.clone(),
1451 tid,
1452 );
1453 } else {
1454 self.store.set_edge_property(
1455 EdgeId(entity_id),
1456 key.as_str(),
1457 val.clone(),
1458 );
1459 }
1460 } else if let Some(tid) = tx_id {
1461 self.store.set_node_property_versioned(
1462 NodeId(entity_id),
1463 key.as_str(),
1464 val.clone(),
1465 tid,
1466 );
1467 } else {
1468 self.store.set_node_property(
1469 NodeId(entity_id),
1470 key.as_str(),
1471 val.clone(),
1472 );
1473 }
1474 }
1475 }
1476 } else if self.is_edge {
1477 if let Some(tid) = tx_id {
1478 self.store.set_edge_property_versioned(
1479 EdgeId(entity_id),
1480 &prop_name,
1481 value,
1482 tid,
1483 );
1484 } else {
1485 self.store
1486 .set_edge_property(EdgeId(entity_id), &prop_name, value);
1487 }
1488 } else if let Some(tid) = tx_id {
1489 self.store.set_node_property_versioned(
1490 NodeId(entity_id),
1491 &prop_name,
1492 value,
1493 tid,
1494 );
1495 } else {
1496 self.store
1497 .set_node_property(NodeId(entity_id), &prop_name, value);
1498 }
1499 }
1500
1501 for col_idx in 0..chunk.column_count() {
1503 if let (Some(src), Some(dst)) =
1504 (chunk.column(col_idx), builder.column_mut(col_idx))
1505 {
1506 if let Some(val) = src.get_value(row) {
1507 dst.push_value(val);
1508 } else {
1509 dst.push_value(Value::Null);
1510 }
1511 }
1512 }
1513
1514 builder.advance_row();
1515 }
1516
1517 return Ok(Some(builder.finish()));
1518 }
1519 Ok(None)
1520 }
1521
1522 fn reset(&mut self) {
1523 self.input.reset();
1524 }
1525
1526 fn name(&self) -> &'static str {
1527 "SetProperty"
1528 }
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533 use super::*;
1534 use crate::execution::DataChunk;
1535 use crate::execution::chunk::DataChunkBuilder;
1536 use crate::graph::lpg::LpgStore;
1537
1538 fn create_test_store() -> Arc<dyn GraphStoreMut> {
1541 Arc::new(LpgStore::new().unwrap())
1542 }
1543
1544 struct MockInput {
1545 chunk: Option<DataChunk>,
1546 }
1547
1548 impl MockInput {
1549 fn boxed(chunk: DataChunk) -> Box<Self> {
1550 Box::new(Self { chunk: Some(chunk) })
1551 }
1552 }
1553
1554 impl Operator for MockInput {
1555 fn next(&mut self) -> OperatorResult {
1556 Ok(self.chunk.take())
1557 }
1558 fn reset(&mut self) {}
1559 fn name(&self) -> &'static str {
1560 "MockInput"
1561 }
1562 }
1563
1564 struct EmptyInput;
1565 impl Operator for EmptyInput {
1566 fn next(&mut self) -> OperatorResult {
1567 Ok(None)
1568 }
1569 fn reset(&mut self) {}
1570 fn name(&self) -> &'static str {
1571 "EmptyInput"
1572 }
1573 }
1574
1575 fn node_id_chunk(ids: &[NodeId]) -> DataChunk {
1576 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1577 for id in ids {
1578 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1579 builder.advance_row();
1580 }
1581 builder.finish()
1582 }
1583
1584 fn edge_id_chunk(ids: &[EdgeId]) -> DataChunk {
1585 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
1586 for id in ids {
1587 builder.column_mut(0).unwrap().push_int64(id.0 as i64);
1588 builder.advance_row();
1589 }
1590 builder.finish()
1591 }
1592
1593 #[test]
1596 fn test_create_node_standalone() {
1597 let store = create_test_store();
1598
1599 let mut op = CreateNodeOperator::new(
1600 Arc::clone(&store),
1601 None,
1602 vec!["Person".to_string()],
1603 vec![(
1604 "name".to_string(),
1605 PropertySource::Constant(Value::String("Alix".into())),
1606 )],
1607 vec![LogicalType::Int64],
1608 0,
1609 );
1610
1611 let chunk = op.next().unwrap().unwrap();
1612 assert_eq!(chunk.row_count(), 1);
1613
1614 assert!(op.next().unwrap().is_none());
1616
1617 assert_eq!(store.node_count(), 1);
1618 }
1619
1620 #[test]
1621 fn test_create_edge() {
1622 let store = create_test_store();
1623
1624 let node1 = store.create_node(&["Person"]);
1625 let node2 = store.create_node(&["Person"]);
1626
1627 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
1628 builder.column_mut(0).unwrap().push_int64(node1.0 as i64);
1629 builder.column_mut(1).unwrap().push_int64(node2.0 as i64);
1630 builder.advance_row();
1631
1632 let mut op = CreateEdgeOperator::new(
1633 Arc::clone(&store),
1634 MockInput::boxed(builder.finish()),
1635 0,
1636 1,
1637 "KNOWS".to_string(),
1638 vec![LogicalType::Int64, LogicalType::Int64],
1639 );
1640
1641 let _chunk = op.next().unwrap().unwrap();
1642 assert_eq!(store.edge_count(), 1);
1643 }
1644
1645 #[test]
1646 fn test_delete_node() {
1647 let store = create_test_store();
1648
1649 let node_id = store.create_node(&["Person"]);
1650 assert_eq!(store.node_count(), 1);
1651
1652 let mut op = DeleteNodeOperator::new(
1653 Arc::clone(&store),
1654 MockInput::boxed(node_id_chunk(&[node_id])),
1655 0,
1656 vec![LogicalType::Node],
1657 false,
1658 );
1659
1660 let chunk = op.next().unwrap().unwrap();
1661 assert_eq!(chunk.row_count(), 1);
1663 assert_eq!(store.node_count(), 0);
1664 }
1665
1666 #[test]
1669 fn test_delete_edge() {
1670 let store = create_test_store();
1671
1672 let n1 = store.create_node(&["Person"]);
1673 let n2 = store.create_node(&["Person"]);
1674 let eid = store.create_edge(n1, n2, "KNOWS");
1675 assert_eq!(store.edge_count(), 1);
1676
1677 let mut op = DeleteEdgeOperator::new(
1678 Arc::clone(&store),
1679 MockInput::boxed(edge_id_chunk(&[eid])),
1680 0,
1681 vec![LogicalType::Node],
1682 );
1683
1684 let chunk = op.next().unwrap().unwrap();
1685 assert_eq!(chunk.row_count(), 1);
1686 assert_eq!(store.edge_count(), 0);
1687 }
1688
1689 #[test]
1690 fn test_delete_edge_no_input_returns_none() {
1691 let store = create_test_store();
1692
1693 let mut op = DeleteEdgeOperator::new(
1694 Arc::clone(&store),
1695 Box::new(EmptyInput),
1696 0,
1697 vec![LogicalType::Int64],
1698 );
1699
1700 assert!(op.next().unwrap().is_none());
1701 }
1702
1703 #[test]
1704 fn test_delete_multiple_edges() {
1705 let store = create_test_store();
1706
1707 let n1 = store.create_node(&["N"]);
1708 let n2 = store.create_node(&["N"]);
1709 let e1 = store.create_edge(n1, n2, "R");
1710 let e2 = store.create_edge(n2, n1, "S");
1711 assert_eq!(store.edge_count(), 2);
1712
1713 let mut op = DeleteEdgeOperator::new(
1714 Arc::clone(&store),
1715 MockInput::boxed(edge_id_chunk(&[e1, e2])),
1716 0,
1717 vec![LogicalType::Node],
1718 );
1719
1720 let chunk = op.next().unwrap().unwrap();
1721 assert_eq!(chunk.row_count(), 2);
1722 assert_eq!(store.edge_count(), 0);
1723 }
1724
1725 #[test]
1728 fn test_delete_node_detach() {
1729 let store = create_test_store();
1730
1731 let n1 = store.create_node(&["Person"]);
1732 let n2 = store.create_node(&["Person"]);
1733 store.create_edge(n1, n2, "KNOWS");
1734 store.create_edge(n2, n1, "FOLLOWS");
1735 assert_eq!(store.edge_count(), 2);
1736
1737 let mut op = DeleteNodeOperator::new(
1738 Arc::clone(&store),
1739 MockInput::boxed(node_id_chunk(&[n1])),
1740 0,
1741 vec![LogicalType::Node],
1742 true, );
1744
1745 let chunk = op.next().unwrap().unwrap();
1746 assert_eq!(chunk.row_count(), 1);
1747 assert_eq!(store.node_count(), 1);
1748 assert_eq!(store.edge_count(), 0); }
1750
1751 #[test]
1754 fn test_add_label() {
1755 let store = create_test_store();
1756
1757 let node = store.create_node(&["Person"]);
1758
1759 let mut op = AddLabelOperator::new(
1760 Arc::clone(&store),
1761 MockInput::boxed(node_id_chunk(&[node])),
1762 0,
1763 vec!["Employee".to_string()],
1764 vec![LogicalType::Int64, LogicalType::Int64],
1765 );
1766
1767 let chunk = op.next().unwrap().unwrap();
1768 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1769 assert_eq!(updated, 1);
1770
1771 let node_data = store.get_node(node).unwrap();
1773 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1774 assert!(labels.contains(&"Person"));
1775 assert!(labels.contains(&"Employee"));
1776 }
1777
1778 #[test]
1779 fn test_add_multiple_labels() {
1780 let store = create_test_store();
1781
1782 let node = store.create_node(&["Base"]);
1783
1784 let mut op = AddLabelOperator::new(
1785 Arc::clone(&store),
1786 MockInput::boxed(node_id_chunk(&[node])),
1787 0,
1788 vec!["LabelA".to_string(), "LabelB".to_string()],
1789 vec![LogicalType::Int64, LogicalType::Int64],
1790 );
1791
1792 let chunk = op.next().unwrap().unwrap();
1793 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1794 assert_eq!(updated, 2); let node_data = store.get_node(node).unwrap();
1797 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1798 assert!(labels.contains(&"LabelA"));
1799 assert!(labels.contains(&"LabelB"));
1800 }
1801
1802 #[test]
1803 fn test_add_label_no_input_returns_none() {
1804 let store = create_test_store();
1805
1806 let mut op = AddLabelOperator::new(
1807 Arc::clone(&store),
1808 Box::new(EmptyInput),
1809 0,
1810 vec!["Foo".to_string()],
1811 vec![LogicalType::Int64, LogicalType::Int64],
1812 );
1813
1814 assert!(op.next().unwrap().is_none());
1815 }
1816
1817 #[test]
1820 fn test_remove_label() {
1821 let store = create_test_store();
1822
1823 let node = store.create_node(&["Person", "Employee"]);
1824
1825 let mut op = RemoveLabelOperator::new(
1826 Arc::clone(&store),
1827 MockInput::boxed(node_id_chunk(&[node])),
1828 0,
1829 vec!["Employee".to_string()],
1830 vec![LogicalType::Int64, LogicalType::Int64],
1831 );
1832
1833 let chunk = op.next().unwrap().unwrap();
1834 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1835 assert_eq!(updated, 1);
1836
1837 let node_data = store.get_node(node).unwrap();
1839 let labels: Vec<&str> = node_data.labels.iter().map(|l| l.as_ref()).collect();
1840 assert!(labels.contains(&"Person"));
1841 assert!(!labels.contains(&"Employee"));
1842 }
1843
1844 #[test]
1845 fn test_remove_nonexistent_label() {
1846 let store = create_test_store();
1847
1848 let node = store.create_node(&["Person"]);
1849
1850 let mut op = RemoveLabelOperator::new(
1851 Arc::clone(&store),
1852 MockInput::boxed(node_id_chunk(&[node])),
1853 0,
1854 vec!["NonExistent".to_string()],
1855 vec![LogicalType::Int64, LogicalType::Int64],
1856 );
1857
1858 let chunk = op.next().unwrap().unwrap();
1859 let updated = chunk.column(1).unwrap().get_int64(0).unwrap();
1860 assert_eq!(updated, 0); }
1862
1863 #[test]
1866 fn test_set_node_property_constant() {
1867 let store = create_test_store();
1868
1869 let node = store.create_node(&["Person"]);
1870
1871 let mut op = SetPropertyOperator::new_for_node(
1872 Arc::clone(&store),
1873 MockInput::boxed(node_id_chunk(&[node])),
1874 0,
1875 vec![(
1876 "name".to_string(),
1877 PropertySource::Constant(Value::String("Alix".into())),
1878 )],
1879 vec![LogicalType::Int64],
1880 );
1881
1882 let chunk = op.next().unwrap().unwrap();
1883 assert_eq!(chunk.row_count(), 1);
1884
1885 let node_data = store.get_node(node).unwrap();
1887 assert_eq!(
1888 node_data
1889 .properties
1890 .get(&grafeo_common::types::PropertyKey::new("name")),
1891 Some(&Value::String("Alix".into()))
1892 );
1893 }
1894
1895 #[test]
1896 fn test_set_node_property_from_column() {
1897 let store = create_test_store();
1898
1899 let node = store.create_node(&["Person"]);
1900
1901 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
1903 builder.column_mut(0).unwrap().push_int64(node.0 as i64);
1904 builder
1905 .column_mut(1)
1906 .unwrap()
1907 .push_value(Value::String("Gus".into()));
1908 builder.advance_row();
1909
1910 let mut op = SetPropertyOperator::new_for_node(
1911 Arc::clone(&store),
1912 MockInput::boxed(builder.finish()),
1913 0,
1914 vec![("name".to_string(), PropertySource::Column(1))],
1915 vec![LogicalType::Int64, LogicalType::String],
1916 );
1917
1918 let chunk = op.next().unwrap().unwrap();
1919 assert_eq!(chunk.row_count(), 1);
1920
1921 let node_data = store.get_node(node).unwrap();
1922 assert_eq!(
1923 node_data
1924 .properties
1925 .get(&grafeo_common::types::PropertyKey::new("name")),
1926 Some(&Value::String("Gus".into()))
1927 );
1928 }
1929
1930 #[test]
1931 fn test_set_edge_property() {
1932 let store = create_test_store();
1933
1934 let n1 = store.create_node(&["N"]);
1935 let n2 = store.create_node(&["N"]);
1936 let eid = store.create_edge(n1, n2, "KNOWS");
1937
1938 let mut op = SetPropertyOperator::new_for_edge(
1939 Arc::clone(&store),
1940 MockInput::boxed(edge_id_chunk(&[eid])),
1941 0,
1942 vec![(
1943 "weight".to_string(),
1944 PropertySource::Constant(Value::Float64(0.75)),
1945 )],
1946 vec![LogicalType::Int64],
1947 );
1948
1949 let chunk = op.next().unwrap().unwrap();
1950 assert_eq!(chunk.row_count(), 1);
1951
1952 let edge_data = store.get_edge(eid).unwrap();
1953 assert_eq!(
1954 edge_data
1955 .properties
1956 .get(&grafeo_common::types::PropertyKey::new("weight")),
1957 Some(&Value::Float64(0.75))
1958 );
1959 }
1960
1961 #[test]
1962 fn test_set_multiple_properties() {
1963 let store = create_test_store();
1964
1965 let node = store.create_node(&["Person"]);
1966
1967 let mut op = SetPropertyOperator::new_for_node(
1968 Arc::clone(&store),
1969 MockInput::boxed(node_id_chunk(&[node])),
1970 0,
1971 vec![
1972 (
1973 "name".to_string(),
1974 PropertySource::Constant(Value::String("Alix".into())),
1975 ),
1976 (
1977 "age".to_string(),
1978 PropertySource::Constant(Value::Int64(30)),
1979 ),
1980 ],
1981 vec![LogicalType::Int64],
1982 );
1983
1984 op.next().unwrap().unwrap();
1985
1986 let node_data = store.get_node(node).unwrap();
1987 assert_eq!(
1988 node_data
1989 .properties
1990 .get(&grafeo_common::types::PropertyKey::new("name")),
1991 Some(&Value::String("Alix".into()))
1992 );
1993 assert_eq!(
1994 node_data
1995 .properties
1996 .get(&grafeo_common::types::PropertyKey::new("age")),
1997 Some(&Value::Int64(30))
1998 );
1999 }
2000
2001 #[test]
2002 fn test_set_property_no_input_returns_none() {
2003 let store = create_test_store();
2004
2005 let mut op = SetPropertyOperator::new_for_node(
2006 Arc::clone(&store),
2007 Box::new(EmptyInput),
2008 0,
2009 vec![("x".to_string(), PropertySource::Constant(Value::Int64(1)))],
2010 vec![LogicalType::Int64],
2011 );
2012
2013 assert!(op.next().unwrap().is_none());
2014 }
2015
2016 #[test]
2019 fn test_delete_node_without_detach_errors_when_edges_exist() {
2020 let store = create_test_store();
2021
2022 let n1 = store.create_node(&["Person"]);
2023 let n2 = store.create_node(&["Person"]);
2024 store.create_edge(n1, n2, "KNOWS");
2025
2026 let mut op = DeleteNodeOperator::new(
2027 Arc::clone(&store),
2028 MockInput::boxed(node_id_chunk(&[n1])),
2029 0,
2030 vec![LogicalType::Int64],
2031 false, );
2033
2034 let err = op.next().unwrap_err();
2035 match err {
2036 OperatorError::ConstraintViolation(msg) => {
2037 assert!(msg.contains("connected edge"), "unexpected message: {msg}");
2038 }
2039 other => panic!("expected ConstraintViolation, got {other:?}"),
2040 }
2041 assert_eq!(store.node_count(), 2);
2043 }
2044
2045 #[test]
2048 fn test_create_node_with_input_operator() {
2049 let store = create_test_store();
2050
2051 let existing = store.create_node(&["Seed"]);
2053
2054 let mut op = CreateNodeOperator::new(
2055 Arc::clone(&store),
2056 Some(MockInput::boxed(node_id_chunk(&[existing]))),
2057 vec!["Created".to_string()],
2058 vec![(
2059 "source".to_string(),
2060 PropertySource::Constant(Value::String("from_input".into())),
2061 )],
2062 vec![LogicalType::Int64, LogicalType::Int64], 1, );
2065
2066 let chunk = op.next().unwrap().unwrap();
2067 assert_eq!(chunk.row_count(), 1);
2068
2069 assert_eq!(store.node_count(), 2);
2071
2072 assert!(op.next().unwrap().is_none());
2074 }
2075
2076 #[test]
2079 fn test_create_edge_with_properties_and_output_column() {
2080 let store = create_test_store();
2081
2082 let n1 = store.create_node(&["Person"]);
2083 let n2 = store.create_node(&["Person"]);
2084
2085 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::Int64]);
2086 builder.column_mut(0).unwrap().push_int64(n1.0 as i64);
2087 builder.column_mut(1).unwrap().push_int64(n2.0 as i64);
2088 builder.advance_row();
2089
2090 let mut op = CreateEdgeOperator::new(
2091 Arc::clone(&store),
2092 MockInput::boxed(builder.finish()),
2093 0,
2094 1,
2095 "KNOWS".to_string(),
2096 vec![LogicalType::Int64, LogicalType::Int64, LogicalType::Int64],
2097 )
2098 .with_properties(vec![(
2099 "since".to_string(),
2100 PropertySource::Constant(Value::Int64(2024)),
2101 )])
2102 .with_output_column(2);
2103
2104 let chunk = op.next().unwrap().unwrap();
2105 assert_eq!(chunk.row_count(), 1);
2106 assert_eq!(store.edge_count(), 1);
2107
2108 let edge_id_raw = chunk
2110 .column(2)
2111 .and_then(|c| c.get_int64(0))
2112 .expect("edge ID should be in output column 2");
2113 let edge_id = EdgeId(edge_id_raw as u64);
2114
2115 let edge = store.get_edge(edge_id).expect("edge should exist");
2117 assert_eq!(
2118 edge.properties
2119 .get(&grafeo_common::types::PropertyKey::new("since")),
2120 Some(&Value::Int64(2024))
2121 );
2122 }
2123
2124 #[test]
2127 fn test_set_property_map_replace() {
2128 use std::collections::BTreeMap;
2129
2130 let store = create_test_store();
2131
2132 let node = store.create_node(&["Person"]);
2133 store.set_node_property(node, "old_prop", Value::String("should_be_removed".into()));
2134
2135 let mut map = BTreeMap::new();
2136 map.insert(PropertyKey::new("new_key"), Value::String("new_val".into()));
2137
2138 let mut op = SetPropertyOperator::new_for_node(
2139 Arc::clone(&store),
2140 MockInput::boxed(node_id_chunk(&[node])),
2141 0,
2142 vec![(
2143 "*".to_string(),
2144 PropertySource::Constant(Value::Map(Arc::new(map))),
2145 )],
2146 vec![LogicalType::Int64],
2147 )
2148 .with_replace(true);
2149
2150 op.next().unwrap().unwrap();
2151
2152 let node_data = store.get_node(node).unwrap();
2153 assert!(
2155 node_data
2156 .properties
2157 .get(&PropertyKey::new("old_prop"))
2158 .is_none()
2159 );
2160 assert_eq!(
2162 node_data.properties.get(&PropertyKey::new("new_key")),
2163 Some(&Value::String("new_val".into()))
2164 );
2165 }
2166
2167 #[test]
2170 fn test_set_property_map_merge() {
2171 use std::collections::BTreeMap;
2172
2173 let store = create_test_store();
2174
2175 let node = store.create_node(&["Person"]);
2176 store.set_node_property(node, "existing", Value::Int64(42));
2177
2178 let mut map = BTreeMap::new();
2179 map.insert(PropertyKey::new("added"), Value::String("hello".into()));
2180
2181 let mut op = SetPropertyOperator::new_for_node(
2182 Arc::clone(&store),
2183 MockInput::boxed(node_id_chunk(&[node])),
2184 0,
2185 vec![(
2186 "*".to_string(),
2187 PropertySource::Constant(Value::Map(Arc::new(map))),
2188 )],
2189 vec![LogicalType::Int64],
2190 ); op.next().unwrap().unwrap();
2193
2194 let node_data = store.get_node(node).unwrap();
2195 assert_eq!(
2197 node_data.properties.get(&PropertyKey::new("existing")),
2198 Some(&Value::Int64(42))
2199 );
2200 assert_eq!(
2202 node_data.properties.get(&PropertyKey::new("added")),
2203 Some(&Value::String("hello".into()))
2204 );
2205 }
2206
2207 #[test]
2210 fn test_property_source_property_access() {
2211 let store = create_test_store();
2212
2213 let source_node = store.create_node(&["Source"]);
2214 store.set_node_property(source_node, "name", Value::String("Alix".into()));
2215
2216 let target_node = store.create_node(&["Target"]);
2217
2218 let mut builder = DataChunkBuilder::new(&[LogicalType::Node, LogicalType::Int64]);
2220 builder.column_mut(0).unwrap().push_node_id(source_node);
2221 builder
2222 .column_mut(1)
2223 .unwrap()
2224 .push_int64(target_node.0 as i64);
2225 builder.advance_row();
2226
2227 let mut op = SetPropertyOperator::new_for_node(
2228 Arc::clone(&store),
2229 MockInput::boxed(builder.finish()),
2230 1, vec![(
2232 "copied_name".to_string(),
2233 PropertySource::PropertyAccess {
2234 column: 0,
2235 property: "name".to_string(),
2236 },
2237 )],
2238 vec![LogicalType::Node, LogicalType::Int64],
2239 );
2240
2241 op.next().unwrap().unwrap();
2242
2243 let target_data = store.get_node(target_node).unwrap();
2244 assert_eq!(
2245 target_data.properties.get(&PropertyKey::new("copied_name")),
2246 Some(&Value::String("Alix".into()))
2247 );
2248 }
2249
2250 #[test]
2253 fn test_create_node_with_constraint_validator() {
2254 let store = create_test_store();
2255
2256 struct RejectAgeValidator;
2257 impl ConstraintValidator for RejectAgeValidator {
2258 fn validate_node_property(
2259 &self,
2260 _labels: &[String],
2261 key: &str,
2262 _value: &Value,
2263 ) -> Result<(), OperatorError> {
2264 if key == "forbidden" {
2265 return Err(OperatorError::ConstraintViolation(
2266 "property 'forbidden' is not allowed".to_string(),
2267 ));
2268 }
2269 Ok(())
2270 }
2271 fn validate_node_complete(
2272 &self,
2273 _labels: &[String],
2274 _properties: &[(String, Value)],
2275 ) -> Result<(), OperatorError> {
2276 Ok(())
2277 }
2278 fn check_unique_node_property(
2279 &self,
2280 _labels: &[String],
2281 _key: &str,
2282 _value: &Value,
2283 ) -> Result<(), OperatorError> {
2284 Ok(())
2285 }
2286 fn validate_edge_property(
2287 &self,
2288 _edge_type: &str,
2289 _key: &str,
2290 _value: &Value,
2291 ) -> Result<(), OperatorError> {
2292 Ok(())
2293 }
2294 fn validate_edge_complete(
2295 &self,
2296 _edge_type: &str,
2297 _properties: &[(String, Value)],
2298 ) -> Result<(), OperatorError> {
2299 Ok(())
2300 }
2301 }
2302
2303 let mut op = CreateNodeOperator::new(
2305 Arc::clone(&store),
2306 None,
2307 vec!["Thing".to_string()],
2308 vec![(
2309 "name".to_string(),
2310 PropertySource::Constant(Value::String("ok".into())),
2311 )],
2312 vec![LogicalType::Int64],
2313 0,
2314 )
2315 .with_validator(Arc::new(RejectAgeValidator));
2316
2317 assert!(op.next().is_ok());
2318 assert_eq!(store.node_count(), 1);
2319
2320 let mut op = CreateNodeOperator::new(
2322 Arc::clone(&store),
2323 None,
2324 vec!["Thing".to_string()],
2325 vec![(
2326 "forbidden".to_string(),
2327 PropertySource::Constant(Value::Int64(1)),
2328 )],
2329 vec![LogicalType::Int64],
2330 0,
2331 )
2332 .with_validator(Arc::new(RejectAgeValidator));
2333
2334 let err = op.next().unwrap_err();
2335 assert!(matches!(err, OperatorError::ConstraintViolation(_)));
2336 }
2339
2340 #[test]
2343 fn test_create_node_reset_allows_re_execution() {
2344 let store = create_test_store();
2345
2346 let mut op = CreateNodeOperator::new(
2347 Arc::clone(&store),
2348 None,
2349 vec!["Person".to_string()],
2350 vec![],
2351 vec![LogicalType::Int64],
2352 0,
2353 );
2354
2355 assert!(op.next().unwrap().is_some());
2357 assert!(op.next().unwrap().is_none());
2358
2359 op.reset();
2361 assert!(op.next().unwrap().is_some());
2362
2363 assert_eq!(store.node_count(), 2);
2364 }
2365
2366 #[test]
2369 fn test_operator_names() {
2370 let store = create_test_store();
2371
2372 let op = CreateNodeOperator::new(
2373 Arc::clone(&store),
2374 None,
2375 vec![],
2376 vec![],
2377 vec![LogicalType::Int64],
2378 0,
2379 );
2380 assert_eq!(op.name(), "CreateNode");
2381
2382 let op = CreateEdgeOperator::new(
2383 Arc::clone(&store),
2384 Box::new(EmptyInput),
2385 0,
2386 1,
2387 "R".to_string(),
2388 vec![LogicalType::Int64],
2389 );
2390 assert_eq!(op.name(), "CreateEdge");
2391
2392 let op = DeleteNodeOperator::new(
2393 Arc::clone(&store),
2394 Box::new(EmptyInput),
2395 0,
2396 vec![LogicalType::Int64],
2397 false,
2398 );
2399 assert_eq!(op.name(), "DeleteNode");
2400
2401 let op = DeleteEdgeOperator::new(
2402 Arc::clone(&store),
2403 Box::new(EmptyInput),
2404 0,
2405 vec![LogicalType::Int64],
2406 );
2407 assert_eq!(op.name(), "DeleteEdge");
2408
2409 let op = AddLabelOperator::new(
2410 Arc::clone(&store),
2411 Box::new(EmptyInput),
2412 0,
2413 vec!["L".to_string()],
2414 vec![LogicalType::Int64],
2415 );
2416 assert_eq!(op.name(), "AddLabel");
2417
2418 let op = RemoveLabelOperator::new(
2419 Arc::clone(&store),
2420 Box::new(EmptyInput),
2421 0,
2422 vec!["L".to_string()],
2423 vec![LogicalType::Int64],
2424 );
2425 assert_eq!(op.name(), "RemoveLabel");
2426
2427 let op = SetPropertyOperator::new_for_node(
2428 Arc::clone(&store),
2429 Box::new(EmptyInput),
2430 0,
2431 vec![],
2432 vec![LogicalType::Int64],
2433 );
2434 assert_eq!(op.name(), "SetProperty");
2435 }
2436}