1use crate::{
2 metadata::{io::topic::KVSchemaType, operator::transforms::validate_transforms_steps},
3 util::{
4 config_error::{ConfigError, INDENT},
5 sdf_types_map::SdfTypesMap,
6 validation_error::ValidationError,
7 validation_failure::ValidationFailure,
8 },
9 wit::{
10 dataflow::{IoRef, IoType, ScheduleConfig, Topic},
11 operator::TransformOperator,
12 },
13};
14
15#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
16pub struct IoRefValidationFailure {
17 pub name: String,
18 pub errors: Vec<IoRefValidationError>,
19}
20
21impl ConfigError for IoRefValidationFailure {
22 fn readable(&self, indents: usize) -> String {
23 self.errors
24 .iter()
25 .map(|e| e.readable(indents))
26 .collect::<Vec<String>>()
27 .join("")
28 }
29}
30
31#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
32pub enum IoRefValidationError {
33 NoTarget,
34 InvalidRef(String),
35 MissingTransformsInput,
36 InvalidTransformsBlock(Vec<ValidationError>),
37 InvalidOperator(Vec<ValidationError>),
38}
39
40impl ConfigError for IoRefValidationError {
41 fn readable(&self, indents: usize) -> String {
42 let indent = INDENT.repeat(indents);
43
44 match self {
45 Self::NoTarget => {
46 format!("{}Cannot have a source with no target\n", indent)
47 }
48 Self::InvalidRef(id) => {
49 format!("{}Referenced topic `{}` not found\n", indent, id)
50 }
51 Self::MissingTransformsInput => {
52 format!(
53 "{}The first operator in a transforms block must take an input\n",
54 indent
55 )
56 }
57 Self::InvalidTransformsBlock(errors) => {
58 let mut res = format!("{}Transforms block is invalid:\n", indent);
59
60 for error in errors {
61 res.push_str(&error.readable(indents + 1));
62 }
63
64 res
65 }
66 Self::InvalidOperator(errors) => {
67 let mut res = format!("{}Invalid operator(s):\n", indent);
68
69 for error in errors {
70 res.push_str(&error.readable(indents + 1));
71 }
72
73 res
74 }
75 }
76 }
77}
78
79impl IoRef {
80 pub fn schema_type(
81 &self,
82 topics: &[(String, Topic)],
83 ) -> Result<Option<KVSchemaType>, IoRefValidationError> {
84 match self.type_ {
85 IoType::NoTarget => Ok(None),
86 IoType::Topic => {
87 let topic = topics.iter().find(|(id, _)| id == &self.id);
88
89 match topic {
90 Some((_name, topic)) => Ok(Some(topic.type_())),
91 None => Err(IoRefValidationError::InvalidRef(self.id.clone())),
92 }
93 }
94 IoType::Schedule => Ok(Some(KVSchemaType::timestamp())),
95 }
96 }
97
98 pub fn source_type(
99 &self,
100 topics: &[(String, Topic)],
101 ) -> Result<KVSchemaType, IoRefValidationError> {
102 if let Some(last_step) = self.steps.last() {
103 get_transform_chain_output_from_last_step(last_step)
104 .map_err(|e| IoRefValidationError::InvalidTransformsBlock(vec![e]))
105 } else {
106 match self.schema_type(topics) {
107 Ok(None) => Err(IoRefValidationError::NoTarget),
108 Err(e) => Err(e),
109 Ok(Some(valid_type)) => Ok(valid_type),
110 }
111 }
112 }
113
114 pub fn sink_type(
115 &self,
116 topics: &[(String, Topic)],
117 ) -> Result<Option<KVSchemaType>, IoRefValidationError> {
118 if let Some(first_step) = self.steps.first() {
119 if let Some(input_type) = first_step.input_type() {
120 Ok(Some(input_type))
121 } else {
122 Err(IoRefValidationError::InvalidTransformsBlock(vec![
123 ValidationError::new(
124 "The first operator in a transforms block must take an input",
125 ),
126 ]))
127 }
128 } else {
129 match self.schema_type(topics) {
130 Ok(None) => Ok(None),
131 Err(e) => Err(e),
132 valid_type => valid_type,
133 }
134 }
135 }
136
137 fn validate_schedule_defined(
138 &self,
139 schedules: Option<&[ScheduleConfig]>,
140 ) -> Result<(), IoRefValidationError> {
141 if self.type_ == IoType::Schedule {
142 let schedules = schedules.unwrap_or_default();
143 if schedules.iter().any(|s| s.name == self.id) {
144 Ok(())
145 } else {
146 Err(IoRefValidationError::InvalidRef(self.id.clone()))
147 }
148 } else {
149 Ok(())
150 }
151 }
152
153 pub fn validate_source(
154 &self,
155 types: &SdfTypesMap,
156 topics: &[(String, Topic)],
157 schedules: Option<&[ScheduleConfig]>,
158 ) -> Result<(), IoRefValidationFailure> {
159 let mut failure = IoRefValidationFailure {
160 name: self.id.clone(),
161 errors: vec![],
162 };
163
164 if let Err(e) = self.source_type(topics) {
165 failure.errors.push(e);
166 }
167
168 if let Ok(Some(topic_type)) = self.schema_type(topics) {
169 if !self.steps.is_empty() {
170 if let Err(e) = self.validate_source_or_sink_steps(
171 types,
172 &topic_type,
173 format!("Topic `{}`", self.id),
174 ) {
175 failure
176 .errors
177 .push(IoRefValidationError::InvalidOperator(e.errors))
178 }
179 }
180 }
181
182 if let Err(err) = self.validate_schedule_defined(schedules) {
183 failure.errors.push(err);
184 }
185
186 if failure.errors.is_empty() {
187 Ok(())
188 } else {
189 Err(failure)
190 }
191 }
192
193 pub fn validate_sink(
194 &self,
195 types: &SdfTypesMap,
196 topics: &[(String, Topic)],
197 service_output_type: &KVSchemaType,
198 ) -> Result<(), IoRefValidationFailure> {
199 let mut failure = IoRefValidationFailure {
200 name: self.id.clone(),
201 errors: vec![],
202 };
203 let mut transforms_errors = vec![];
204
205 match self.sink_type(topics) {
206 Err(e) => failure.errors.push(e),
207 Ok(Some(sink_ty)) => {
208 if sink_ty.value.name.replace('-', "_")
209 != service_output_type.value.name.replace('-', "_")
210 {
211 transforms_errors.push(ValidationError::new(&format!(
212 "service output type `{}` does not match sink input type `{}`",
213 service_output_type.value.name, sink_ty.value.name
214 )));
215 }
216
217 if let (Some(sink_key), Some(service_key)) = (sink_ty.key, &service_output_type.key)
218 {
219 if sink_key != *service_key {
220 transforms_errors.push(
221 ValidationError::new(&format!(
222 "sink transforms input key type `{}` does not match service output key type `{}`",
223 sink_key.name,
224 service_key.name
225 ))
226 );
227 }
228 }
229 }
230 Ok(None) => {}
231 }
232
233 if let Some(last_step) = self.steps.last() {
234 if let Err(e) = self.validate_source_or_sink_steps(
235 types,
236 service_output_type,
237 "service".to_string(),
238 ) {
239 failure
240 .errors
241 .push(IoRefValidationError::InvalidOperator(e.errors));
242 }
243
244 match get_transform_chain_output_from_last_step(last_step) {
245 Ok(output_ty) => match self.schema_type(topics) {
246 Ok(Some(topic_type)) => {
247 if topic_type.value != output_ty.value {
248 transforms_errors.push(ValidationError::new(&format!(
249 "transforms steps final output type `{}` does not match topic type `{}`",
250 output_ty.value.name,
251 topic_type.value.name
252 )));
253 }
254
255 if let Some(topic_key) = topic_type.key {
256 if let Some(output_key) = output_ty.key {
257 if topic_key != output_key {
258 transforms_errors.push(ValidationError::new(&format!(
259 "sink `{}` has transforms steps but final output key type `{}` does not match topic key type `{}`",
260 self.id,
261 output_key.name,
262 topic_key.name
263 )));
264 }
265 }
266 }
267 }
268 Ok(None) => transforms_errors.push(ValidationError::new(
269 "sink cannot have transforms steps without a target",
270 )),
271 _ => {}
272 },
273 Err(e) => {
274 transforms_errors.push(e);
275 }
276 }
277 }
278
279 if !transforms_errors.is_empty() {
280 failure
281 .errors
282 .push(IoRefValidationError::InvalidTransformsBlock(
283 transforms_errors,
284 ));
285 }
286
287 if failure.errors.is_empty() {
288 Ok(())
289 } else {
290 Err(failure)
291 }
292 }
293
294 pub fn validate_source_or_sink_steps(
295 &self,
296 types: &SdfTypesMap,
297 expected_input_type: &KVSchemaType,
298 input_provider_description: String,
299 ) -> Result<(), ValidationFailure> {
300 let mut errors = ValidationFailure::new();
301
302 if let Err(transforms_errors) = validate_transforms_steps(
303 &self.steps,
304 types,
305 expected_input_type.clone(),
306 input_provider_description,
307 ) {
308 errors.concat(&transforms_errors);
309 };
310
311 if errors.any() {
312 Err(errors)
313 } else {
314 Ok(())
315 }
316 }
317}
318
319fn get_transform_chain_output_from_last_step(
322 last_step: &TransformOperator,
323) -> Result<KVSchemaType, ValidationError> {
324 match last_step {
325 TransformOperator::Filter(_) => {
327 if let Some(input_type) = last_step.input_type() {
328 Ok(input_type)
329 } else {
330 Err(ValidationError::new(
331 "Last transforms step is invalid. Filter operator should have an input type",
332 ))
333 }
334 }
335 _ => {
336 if let Some(output_type) = last_step.output_type() {
337 Ok(output_type)
338 } else {
339 Err(ValidationError::new(
340 "Last transforms step is invalid. Expected an operator with an output type",
341 ))
342 }
343 }
344 }
345}
346#[cfg(test)]
347mod test {
348 use crate::{
349 metadata::dataflow::io_ref::IoRefValidationError,
350 util::{
351 config_error::ConfigError, sdf_types_map::SdfTypesMap,
352 validation_error::ValidationError,
353 },
354 wit::{
355 dataflow::{IoRef, IoType, Topic, TransformOperator},
356 io::{SchemaSerDe, TopicSchema, TypeRef},
357 metadata::{NamedParameter, OutputType, Parameter, ParameterKind},
358 operator::StepInvocation,
359 },
360 };
361
362 fn topics() -> Vec<(String, Topic)> {
363 vec![
364 (
365 "my-topic".to_string(),
366 Topic {
367 name: "my-topic".to_string(),
368 schema: TopicSchema {
369 key: None,
370 value: SchemaSerDe {
371 converter: None,
372 type_: TypeRef {
373 name: "u8".to_string(),
374 },
375 },
376 },
377 consumer: None,
378 producer: None,
379 profile: None,
380 },
381 ),
382 (
383 "my-other-topic".to_string(),
384 Topic {
385 name: "my-other-topic".to_string(),
386 schema: TopicSchema {
387 key: None,
388 value: SchemaSerDe {
389 converter: None,
390 type_: TypeRef {
391 name: "u16".to_string(),
392 },
393 },
394 },
395 consumer: None,
396 producer: None,
397 profile: None,
398 },
399 ),
400 ]
401 }
402
403 #[test]
404 fn test_topic_type_returns_none_for_no_target() {
405 let io_ref = IoRef {
406 id: "no-target".to_string(),
407 type_: IoType::NoTarget,
408 steps: vec![],
409 };
410
411 assert_eq!(io_ref.schema_type(&[]), Ok(None));
412 }
413
414 #[test]
415 fn test_topic_type_returns_error_when_topic_not_found() {
416 let io_ref = IoRef {
417 id: "my-topic".to_string(),
418 type_: IoType::Topic,
419 steps: vec![],
420 };
421
422 assert_eq!(
423 io_ref.schema_type(&[]),
424 Err(IoRefValidationError::InvalidRef("my-topic".to_string()))
425 );
426 }
427
428 #[test]
429 fn test_topic_type_returns_topic_type() {
430 let io_ref = IoRef {
431 id: "my-topic".to_string(),
432 type_: IoType::Topic,
433 steps: vec![],
434 };
435
436 let topics = vec![(
437 "my-topic".to_string(),
438 Topic {
439 name: "my-topic".to_string(),
440 schema: TopicSchema {
441 key: None,
442 value: SchemaSerDe {
443 type_: TypeRef {
444 name: "string".to_string(),
445 },
446 converter: None,
447 },
448 },
449 consumer: None,
450 producer: None,
451 profile: None,
452 },
453 )];
454
455 assert_eq!(
456 io_ref.schema_type(&topics),
457 Ok(Some(
458 (
459 None,
460 TypeRef {
461 name: "string".to_string()
462 }
463 )
464 .into()
465 ))
466 );
467 }
468
469 #[test]
470 fn test_validate_source_or_sink_steps_validates_steps_as_a_transforms() {
471 let types = SdfTypesMap::default();
472 let io_ref = IoRef {
473 id: "my-topic".to_string(),
474 type_: IoType::Topic,
475 steps: vec![TransformOperator::Map(StepInvocation {
476 uses: "my-function".to_string(),
477 output: None,
478 ..Default::default()
479 })],
480 };
481
482 let expected_input_type = (
483 Some(TypeRef {
484 name: "bytes".to_string(),
485 }),
486 TypeRef {
487 name: "string".to_string(),
488 },
489 )
490 .into();
491
492 let res = io_ref
493 .validate_source_or_sink_steps(
494 &types,
495 &expected_input_type,
496 "topic `my-topic`".to_string(),
497 )
498 .expect_err("should error for invalid step");
499
500 assert!(res.errors.contains(&ValidationError::new(
501 "map type function `my-function` should have exactly 1 input type, found 0"
502 )));
503 }
504
505 #[test]
506 fn test_validate_source_rejects_source_when_topic_not_found() {
507 let types = SdfTypesMap::default();
508 let source = IoRef {
509 type_: IoType::Topic,
510 id: "my-source-topic".to_string(),
511 steps: vec![],
512 };
513
514 let res = source
515 .validate_source(&types, &[], None)
516 .expect_err("should error for missing sources");
517
518 assert!(res.errors.contains(&IoRefValidationError::InvalidRef(
519 "my-source-topic".to_string()
520 )));
521 }
522
523 #[test]
524 fn test_validate_source_rejects_source_when_last_step_has_no_output() {
525 let topics = topics();
526 let types = SdfTypesMap::default();
527
528 let source = IoRef {
529 type_: IoType::Topic,
530 id: "my-other-topic".to_string(),
531 steps: vec![TransformOperator::Map(StepInvocation {
532 uses: "my-function".to_string(),
533 inputs: vec![NamedParameter {
534 name: "input".to_string(),
535 type_: TypeRef {
536 name: "u16".to_string(),
537 },
538 optional: false,
539 kind: ParameterKind::Value,
540 }],
541 output: None,
542 ..Default::default()
543 })],
544 };
545
546 let res = source
547 .validate_source(&types, &topics, None)
548 .expect_err("should error for invalid step");
549
550 assert!(res
551 .errors
552 .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
553 ValidationError::new(
554 "Last transforms step is invalid. Expected an operator with an output type"
555 )
556 ])));
557
558 assert!(res.readable(0).contains(
559 r#"Transforms block is invalid:
560 Last transforms step is invalid. Expected an operator with an output type
561"#
562 ));
563 }
564
565 #[test]
566 fn test_validate_source_rejects_source_when_topic_is_no_target() {
567 let types = SdfTypesMap::default();
568 let source = IoRef {
569 type_: IoType::NoTarget,
570 id: "".to_string(),
571 steps: vec![],
572 };
573
574 let res = source
575 .validate_source(&types, &[], None)
576 .expect_err("should error for missing sources");
577
578 assert!(res.errors.contains(&IoRefValidationError::NoTarget));
579
580 assert_eq!(
581 res.readable(0),
582 r#"Cannot have a source with no target
583"#
584 );
585 }
586
587 #[test]
588 fn test_validate_source_validates_source_transforms() {
589 let types = SdfTypesMap::default();
590 let source = IoRef {
591 id: "my-topic".to_string(),
592 type_: IoType::Topic,
593 steps: vec![TransformOperator::Map(StepInvocation {
594 uses: "my-function".to_string(),
595 inputs: vec![NamedParameter {
596 name: "input".to_string(),
597 type_: TypeRef {
598 name: "u8".to_string(),
599 },
600 optional: false,
601 kind: ParameterKind::Value,
602 }],
603 output: None,
604 ..Default::default()
605 })],
606 };
607 let topics = topics();
608
609 let res = source
610 .validate_source(&types, &topics, None)
611 .expect_err("should error for invalid step");
612
613 assert!(res
614 .errors
615 .contains(&IoRefValidationError::InvalidOperator(vec![
616 ValidationError::new("map type function `my-function` requires an output type")
617 ])));
618
619 assert_eq!(
620 res.readable(0),
621 r#"Transforms block is invalid:
622 Last transforms step is invalid. Expected an operator with an output type
623Invalid operator(s):
624 map type function `my-function` requires an output type
625"#
626 );
627 }
628
629 #[test]
630 fn test_validate_source_accepts_valid_sources() {
631 let types = SdfTypesMap::default();
632 let io_ref = IoRef {
633 id: "my-topic".to_string(),
634 type_: IoType::Topic,
635 steps: vec![
636 TransformOperator::Map(StepInvocation {
637 uses: "my-function".to_string(),
638 inputs: vec![NamedParameter {
639 name: "input".to_string(),
640 type_: TypeRef {
641 name: "u8".to_string(),
642 },
643 optional: false,
644 kind: ParameterKind::Value,
645 }],
646 output: Some(Parameter {
647 type_: OutputType::Ref(TypeRef {
648 name: "u16".to_string(),
649 }),
650 ..Default::default()
651 }),
652 ..Default::default()
653 }),
654 TransformOperator::Map(StepInvocation {
655 uses: "my-other-function".to_string(),
656 inputs: vec![NamedParameter {
657 name: "input".to_string(),
658 type_: TypeRef {
659 name: "u16".to_string(),
660 },
661 optional: false,
662 kind: ParameterKind::Value,
663 }],
664 output: Some(Parameter {
665 type_: TypeRef {
666 name: "u8".to_string(),
667 }
668 .into(),
669 ..Default::default()
670 }),
671 ..Default::default()
672 }),
673 ],
674 };
675
676 io_ref
677 .validate_source(&types, &topics(), None)
678 .expect("should validate");
679 }
680
681 #[test]
682 fn test_validate_sink_rejects_sink_when_topic_not_found() {
683 let types = SdfTypesMap::default();
684 let sink = IoRef {
685 type_: IoType::Topic,
686 id: "my-sink-topic".to_string(),
687 steps: vec![],
688 };
689
690 let res = sink
691 .validate_sink(
692 &types,
693 &[],
694 &(
695 Some(TypeRef {
696 name: "bytes".to_string(),
697 }),
698 TypeRef {
699 name: "string".to_string(),
700 },
701 )
702 .into(),
703 )
704 .expect_err("should error for missing sources");
705
706 assert!(res.errors.contains(&IoRefValidationError::InvalidRef(
707 "my-sink-topic".to_string()
708 )));
709
710 assert_eq!(
711 res.readable(0),
712 r#"Referenced topic `my-sink-topic` not found
713"#
714 );
715 }
716
717 #[test]
718 fn test_validate_sink_rejects_sink_when_first_step_has_no_input() {
719 let topics = topics();
720 let types = SdfTypesMap::default();
721
722 let sink = IoRef {
723 type_: IoType::Topic,
724 id: "my-other-topic".to_string(),
725 steps: vec![TransformOperator::Map(StepInvocation {
726 uses: "my-function".to_string(),
727 inputs: vec![],
728 ..Default::default()
729 })],
730 };
731
732 let res = sink
733 .validate_sink(
734 &types,
735 &topics,
736 &(
737 Some(TypeRef {
738 name: "bytes".to_string(),
739 }),
740 TypeRef {
741 name: "string".to_string(),
742 },
743 )
744 .into(),
745 )
746 .expect_err("should error for invalid step");
747
748 assert!(res
749 .errors
750 .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
751 ValidationError::new("The first operator in a transforms block must take an input")
752 ])));
753
754 assert!(res.readable(0).contains(
755 r#"Transforms block is invalid:
756 The first operator in a transforms block must take an input
757"#
758 ));
759 }
760
761 #[test]
762 fn test_validate_sink_validates_sink_transforms() {
763 let types = SdfTypesMap::default();
764 let sink = IoRef {
765 id: "my-topic".to_string(),
766 type_: IoType::Topic,
767 steps: vec![
768 TransformOperator::Map(StepInvocation {
769 uses: "my-function".to_string(),
770 inputs: vec![NamedParameter {
771 name: "input".to_string(),
772 type_: TypeRef {
773 name: "string".to_string(),
774 },
775 optional: false,
776 kind: ParameterKind::Value,
777 }],
778 output: Some(Parameter {
779 type_: TypeRef {
780 name: "string".to_string(),
781 }
782 .into(),
783 ..Default::default()
784 }),
785 ..Default::default()
786 }),
787 TransformOperator::Map(StepInvocation {
788 uses: "my-other-function".to_string(),
789 inputs: vec![NamedParameter {
790 name: "input".to_string(),
791 type_: TypeRef {
792 name: "u8".to_string(),
793 },
794 optional: false,
795 kind: ParameterKind::Value,
796 }],
797 output: Some(Parameter {
798 type_: TypeRef {
799 name: "string".to_string(),
800 }
801 .into(),
802 ..Default::default()
803 }),
804 ..Default::default()
805 }),
806 ],
807 };
808 let topics = topics();
809
810 let res = sink
811 .validate_sink(
812 &types,
813 &topics,
814 &(
815 Some(TypeRef {
816 name: "bytes".to_string(),
817 }),
818 TypeRef {
819 name: "string".to_string(),
820 },
821 )
822 .into(),
823 )
824 .expect_err("should error for invalid step");
825
826 assert!(res.errors.iter().any(|e| {
827 if let IoRefValidationError::InvalidOperator(transforms_errors) = e {
828 if transforms_errors.contains(&ValidationError::new("Function `my-other-function` input type was expected to match `string` type provided by function `my-function`, but `u8` was found.")) {
829 return true
830 }
831 }
832
833 false
834 }));
835 }
836
837 #[test]
838 fn test_validate_sink_validates_last_transforms_step_matches_topic() {
839 let types = SdfTypesMap::default();
840 let sink = IoRef {
841 id: "my-topic".to_string(),
842 type_: IoType::Topic,
843 steps: vec![TransformOperator::Map(StepInvocation {
844 uses: "my-function".to_string(),
845 inputs: vec![NamedParameter {
846 name: "input".to_string(),
847 type_: TypeRef {
848 name: "string".to_string(),
849 },
850 optional: false,
851 kind: ParameterKind::Value,
852 }],
853 output: Some(Parameter {
854 type_: TypeRef {
855 name: "string".to_string(),
856 }
857 .into(),
858 ..Default::default()
859 }),
860 ..Default::default()
861 })],
862 };
863 let topics = topics();
864
865 let res = sink
866 .validate_sink(
867 &types,
868 &topics,
869 &(
870 Some(TypeRef {
871 name: "bytes".to_string(),
872 }),
873 TypeRef {
874 name: "string".to_string(),
875 },
876 )
877 .into(),
878 )
879 .expect_err("should error for invalid step");
880
881 assert!(res
882 .errors
883 .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
884 ValidationError::new(
885 "transforms steps final output type `string` does not match topic type `u8`"
886 )
887 ])));
888
889 assert!(res.readable(0).contains(
890 r#"Transforms block is invalid:
891 transforms steps final output type `string` does not match topic type `u8`
892"#
893 ));
894 }
895
896 #[test]
897 fn test_validate_sink_rejects_transforms_without_output() {
898 let types = SdfTypesMap::default();
899 let sink = IoRef {
900 id: "my-topic".to_string(),
901 type_: IoType::Topic,
902 steps: vec![TransformOperator::Map(StepInvocation {
903 uses: "my-function".to_string(),
904 inputs: vec![NamedParameter {
905 name: "input".to_string(),
906 type_: TypeRef {
907 name: "string".to_string(),
908 },
909 optional: false,
910 kind: ParameterKind::Value,
911 }],
912 output: None,
913 ..Default::default()
914 })],
915 };
916 let topics = topics();
917
918 let res = sink
919 .validate_sink(
920 &types,
921 &topics,
922 &(
923 Some(TypeRef {
924 name: "bytes".to_string(),
925 }),
926 TypeRef {
927 name: "string".to_string(),
928 },
929 )
930 .into(),
931 )
932 .expect_err("should error for invalid step");
933
934 assert!(res.errors.iter().any(|e| {
935 if let IoRefValidationError::InvalidTransformsBlock(transforms_errors) = e {
936 transforms_errors.contains(&ValidationError::new(
937 "Last transforms step is invalid. Expected an operator with an output type",
938 ))
939 } else {
940 false
941 }
942 }));
943
944 assert!(res.readable(0).contains(
945 r#"Transforms block is invalid:
946 Last transforms step is invalid. Expected an operator with an output type
947"#
948 ));
949 }
950
951 #[test]
952 fn test_validate_sink_rejects_transforms_without_a_sink_target() {
953 let types = SdfTypesMap::default();
954 let sink = IoRef {
955 id: "my-topic".to_string(),
956 type_: IoType::NoTarget,
957 steps: vec![TransformOperator::Map(StepInvocation {
958 uses: "my-function".to_string(),
959 inputs: vec![NamedParameter {
960 name: "input".to_string(),
961 type_: TypeRef {
962 name: "u8".to_string(),
963 },
964 optional: false,
965 kind: ParameterKind::Value,
966 }],
967 output: Some(Parameter {
968 type_: TypeRef {
969 name: "u16".to_string(),
970 }
971 .into(),
972 ..Default::default()
973 }),
974 ..Default::default()
975 })],
976 };
977 let topics = topics();
978
979 let res = sink
980 .validate_sink(
981 &types,
982 &topics,
983 &(
984 Some(TypeRef {
985 name: "bytes".to_string(),
986 }),
987 TypeRef {
988 name: "string".to_string(),
989 },
990 )
991 .into(),
992 )
993 .expect_err("should error for invalid step");
994
995 assert!(res.errors.iter().any(|e| {
996 if let IoRefValidationError::InvalidTransformsBlock(transforms_errors) = e {
997 if transforms_errors.contains(&ValidationError::new(
998 "sink cannot have transforms steps without a target",
999 )) {
1000 return true;
1001 }
1002 }
1003
1004 false
1005 }));
1006 }
1007
1008 #[test]
1009 fn test_validate_sink_accepts_valid_sinks() {
1010 let types = SdfTypesMap::default();
1011 let io_ref = IoRef {
1012 id: "my-topic".to_string(),
1013 type_: IoType::Topic,
1014 steps: vec![
1015 TransformOperator::Map(StepInvocation {
1016 uses: "my-function".to_string(),
1017 inputs: vec![NamedParameter {
1018 name: "input".to_string(),
1019 type_: TypeRef {
1020 name: "u8".to_string(),
1021 },
1022 optional: false,
1023 kind: ParameterKind::Value,
1024 }],
1025 output: Some(Parameter {
1026 type_: TypeRef {
1027 name: "u16".to_string(),
1028 }
1029 .into(),
1030 ..Default::default()
1031 }),
1032 ..Default::default()
1033 }),
1034 TransformOperator::Map(StepInvocation {
1035 uses: "my-other-function".to_string(),
1036 inputs: vec![NamedParameter {
1037 name: "input".to_string(),
1038 type_: TypeRef {
1039 name: "u16".to_string(),
1040 },
1041 optional: false,
1042 kind: ParameterKind::Value,
1043 }],
1044 output: Some(Parameter {
1045 type_: TypeRef {
1046 name: "u8".to_string(),
1047 }
1048 .into(),
1049 ..Default::default()
1050 }),
1051 ..Default::default()
1052 }),
1053 ],
1054 };
1055
1056 io_ref
1057 .validate_sink(
1058 &types,
1059 &topics(),
1060 &(
1061 None,
1062 TypeRef {
1063 name: "u8".to_string(),
1064 },
1065 )
1066 .into(),
1067 )
1068 .expect("should validate");
1069 }
1070
1071 #[test]
1072 fn test_get_transform_chain_output_from_last_step_if_filter() {
1073 let last_step: crate::wit::operator::TransformOperator =
1074 TransformOperator::Filter(StepInvocation {
1075 uses: "my-filter".to_string(),
1076 inputs: vec![NamedParameter {
1077 name: "input".to_string(),
1078 type_: TypeRef {
1079 name: "u8".to_string(),
1080 },
1081 optional: false,
1082 kind: ParameterKind::Value,
1083 }],
1084 output: Some(Parameter {
1085 type_: TypeRef {
1086 name: "bool".to_string(),
1087 }
1088 .into(),
1089 ..Default::default()
1090 }),
1091 ..Default::default()
1092 });
1093
1094 let res = super::get_transform_chain_output_from_last_step(&last_step)
1095 .expect("should return output type");
1096
1097 assert_eq!(
1098 res,
1099 (
1100 None,
1101 TypeRef {
1102 name: "u8".to_string()
1103 }
1104 )
1105 .into()
1106 );
1107 }
1108
1109 #[test]
1110 fn test_get_transform_chain_output_from_last_step_if_map() {
1111 let last_step: crate::wit::operator::TransformOperator =
1112 TransformOperator::Map(StepInvocation {
1113 uses: "my-map".to_string(),
1114 inputs: vec![NamedParameter {
1115 name: "input".to_string(),
1116 type_: TypeRef {
1117 name: "u8".to_string(),
1118 },
1119 optional: false,
1120 kind: ParameterKind::Value,
1121 }],
1122 output: Some(Parameter {
1123 type_: TypeRef {
1124 name: "u16".to_string(),
1125 }
1126 .into(),
1127 ..Default::default()
1128 }),
1129 ..Default::default()
1130 });
1131
1132 let res = super::get_transform_chain_output_from_last_step(&last_step)
1133 .expect("should return output type");
1134
1135 assert_eq!(
1136 res,
1137 (
1138 None,
1139 TypeRef {
1140 name: "u16".to_string()
1141 }
1142 )
1143 .into()
1144 );
1145 }
1146
1147 #[test]
1148 fn test_source_type_returns_type_when_last_step_is_filter() {
1149 let source = IoRef {
1150 id: "my-source".to_string(),
1151 type_: IoType::Topic,
1152 steps: vec![TransformOperator::Filter(StepInvocation {
1153 uses: "my-filter".to_string(),
1154 inputs: vec![NamedParameter {
1155 name: "input".to_string(),
1156 type_: TypeRef {
1157 name: "u8".to_string(),
1158 },
1159 optional: false,
1160 kind: ParameterKind::Value,
1161 }],
1162 output: Some(Parameter {
1163 type_: TypeRef {
1164 name: "bool".to_string(),
1165 }
1166 .into(),
1167 ..Default::default()
1168 }),
1169 ..Default::default()
1170 })],
1171 };
1172
1173 let topics = topics();
1174
1175 let res = source
1176 .source_type(&topics)
1177 .expect("should return output type");
1178
1179 assert_eq!(
1180 res,
1181 (
1182 None,
1183 TypeRef {
1184 name: "u8".to_string()
1185 }
1186 )
1187 .into()
1188 );
1189 }
1190
1191 #[test]
1192 fn test_source_type_returns_type_when_last_step_is_map() {
1193 let source = IoRef {
1194 id: "my-source".to_string(),
1195 type_: IoType::Topic,
1196 steps: vec![TransformOperator::Map(StepInvocation {
1197 uses: "my-map".to_string(),
1198 inputs: vec![NamedParameter {
1199 name: "input".to_string(),
1200 type_: TypeRef {
1201 name: "u8".to_string(),
1202 },
1203 optional: false,
1204 kind: ParameterKind::Value,
1205 }],
1206 output: Some(Parameter {
1207 type_: TypeRef {
1208 name: "u16".to_string(),
1209 }
1210 .into(),
1211 ..Default::default()
1212 }),
1213 ..Default::default()
1214 })],
1215 };
1216
1217 let topics = topics();
1218
1219 let res = source
1220 .source_type(&topics)
1221 .expect("should return output type");
1222
1223 assert_eq!(
1224 res,
1225 (
1226 None,
1227 TypeRef {
1228 name: "u16".to_string()
1229 }
1230 )
1231 .into()
1232 );
1233 }
1234
1235 #[test]
1236 fn test_validate_schedule_undefined() {
1237 let source = IoRef {
1238 id: "my-schedule".to_string(),
1239 type_: IoType::Schedule,
1240 steps: vec![],
1241 };
1242
1243 let error = source
1244 .validate_schedule_defined(None)
1245 .expect_err("should error for undefined schedule");
1246
1247 assert_eq!(
1248 error,
1249 IoRefValidationError::InvalidRef("my-schedule".to_string())
1250 );
1251 }
1252
1253 #[test]
1254 fn test_source_type_returns_topic_type_with_no_steps() {
1255 let source = IoRef {
1256 id: "my-topic".to_string(),
1257 type_: IoType::Topic,
1258 steps: vec![],
1259 };
1260
1261 let topics = topics();
1262
1263 let res = source
1264 .source_type(&topics)
1265 .expect("should return output type");
1266
1267 assert_eq!(
1268 res,
1269 (
1270 None,
1271 TypeRef {
1272 name: "u8".to_string()
1273 }
1274 )
1275 .into()
1276 );
1277 }
1278}