sdf_metadata/metadata/dataflow/
io_ref.rs

1use crate::{
2    metadata::{io::topic::KVSchemaType, operator::transforms::validate_transforms_steps},
3    util::{
4        config_error::{ConfigError, INDENT},
5        sdf_types_map::SdfTypesMap,
6        validation_error::ValidationError,
7        validation_failure::ValidationFailure,
8    },
9    wit::{
10        dataflow::{IoRef, IoType, ScheduleConfig, Topic},
11        operator::TransformOperator,
12    },
13};
14
15#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
16pub struct IoRefValidationFailure {
17    pub name: String,
18    pub errors: Vec<IoRefValidationError>,
19}
20
21impl ConfigError for IoRefValidationFailure {
22    fn readable(&self, indents: usize) -> String {
23        self.errors
24            .iter()
25            .map(|e| e.readable(indents))
26            .collect::<Vec<String>>()
27            .join("")
28    }
29}
30
31#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
32pub enum IoRefValidationError {
33    NoTarget,
34    InvalidRef(String),
35    MissingTransformsInput,
36    InvalidTransformsBlock(Vec<ValidationError>),
37    InvalidOperator(Vec<ValidationError>),
38}
39
40impl ConfigError for IoRefValidationError {
41    fn readable(&self, indents: usize) -> String {
42        let indent = INDENT.repeat(indents);
43
44        match self {
45            Self::NoTarget => {
46                format!("{}Cannot have a source with no target\n", indent)
47            }
48            Self::InvalidRef(id) => {
49                format!("{}Referenced topic `{}` not found\n", indent, id)
50            }
51            Self::MissingTransformsInput => {
52                format!(
53                    "{}The first operator in a transforms block must take an input\n",
54                    indent
55                )
56            }
57            Self::InvalidTransformsBlock(errors) => {
58                let mut res = format!("{}Transforms block is invalid:\n", indent);
59
60                for error in errors {
61                    res.push_str(&error.readable(indents + 1));
62                }
63
64                res
65            }
66            Self::InvalidOperator(errors) => {
67                let mut res = format!("{}Invalid operator(s):\n", indent);
68
69                for error in errors {
70                    res.push_str(&error.readable(indents + 1));
71                }
72
73                res
74            }
75        }
76    }
77}
78
79impl IoRef {
80    pub fn schema_type(
81        &self,
82        topics: &[(String, Topic)],
83    ) -> Result<Option<KVSchemaType>, IoRefValidationError> {
84        match self.type_ {
85            IoType::NoTarget => Ok(None),
86            IoType::Topic => {
87                let topic = topics.iter().find(|(id, _)| id == &self.id);
88
89                match topic {
90                    Some((_name, topic)) => Ok(Some(topic.type_())),
91                    None => Err(IoRefValidationError::InvalidRef(self.id.clone())),
92                }
93            }
94            IoType::Schedule => Ok(Some(KVSchemaType::timestamp())),
95        }
96    }
97
98    pub fn source_type(
99        &self,
100        topics: &[(String, Topic)],
101    ) -> Result<KVSchemaType, IoRefValidationError> {
102        if let Some(last_step) = self.steps.last() {
103            get_transform_chain_output_from_last_step(last_step)
104                .map_err(|e| IoRefValidationError::InvalidTransformsBlock(vec![e]))
105        } else {
106            match self.schema_type(topics) {
107                Ok(None) => Err(IoRefValidationError::NoTarget),
108                Err(e) => Err(e),
109                Ok(Some(valid_type)) => Ok(valid_type),
110            }
111        }
112    }
113
114    pub fn sink_type(
115        &self,
116        topics: &[(String, Topic)],
117    ) -> Result<Option<KVSchemaType>, IoRefValidationError> {
118        if let Some(first_step) = self.steps.first() {
119            if let Some(input_type) = first_step.input_type() {
120                Ok(Some(input_type))
121            } else {
122                Err(IoRefValidationError::InvalidTransformsBlock(vec![
123                    ValidationError::new(
124                        "The first operator in a transforms block must take an input",
125                    ),
126                ]))
127            }
128        } else {
129            match self.schema_type(topics) {
130                Ok(None) => Ok(None),
131                Err(e) => Err(e),
132                valid_type => valid_type,
133            }
134        }
135    }
136
137    fn validate_schedule_defined(
138        &self,
139        schedules: Option<&[ScheduleConfig]>,
140    ) -> Result<(), IoRefValidationError> {
141        if self.type_ == IoType::Schedule {
142            let schedules = schedules.unwrap_or_default();
143            if schedules.iter().any(|s| s.name == self.id) {
144                Ok(())
145            } else {
146                Err(IoRefValidationError::InvalidRef(self.id.clone()))
147            }
148        } else {
149            Ok(())
150        }
151    }
152
153    pub fn validate_source(
154        &self,
155        types: &SdfTypesMap,
156        topics: &[(String, Topic)],
157        schedules: Option<&[ScheduleConfig]>,
158    ) -> Result<(), IoRefValidationFailure> {
159        let mut failure = IoRefValidationFailure {
160            name: self.id.clone(),
161            errors: vec![],
162        };
163
164        if let Err(e) = self.source_type(topics) {
165            failure.errors.push(e);
166        }
167
168        if let Ok(Some(topic_type)) = self.schema_type(topics) {
169            if !self.steps.is_empty() {
170                if let Err(e) = self.validate_source_or_sink_steps(
171                    types,
172                    &topic_type,
173                    format!("Topic `{}`", self.id),
174                ) {
175                    failure
176                        .errors
177                        .push(IoRefValidationError::InvalidOperator(e.errors))
178                }
179            }
180        }
181
182        if let Err(err) = self.validate_schedule_defined(schedules) {
183            failure.errors.push(err);
184        }
185
186        if failure.errors.is_empty() {
187            Ok(())
188        } else {
189            Err(failure)
190        }
191    }
192
193    pub fn validate_sink(
194        &self,
195        types: &SdfTypesMap,
196        topics: &[(String, Topic)],
197        service_output_type: &KVSchemaType,
198    ) -> Result<(), IoRefValidationFailure> {
199        let mut failure = IoRefValidationFailure {
200            name: self.id.clone(),
201            errors: vec![],
202        };
203        let mut transforms_errors = vec![];
204
205        match self.sink_type(topics) {
206            Err(e) => failure.errors.push(e),
207            Ok(Some(sink_ty)) => {
208                if sink_ty.value.name.replace('-', "_")
209                    != service_output_type.value.name.replace('-', "_")
210                {
211                    transforms_errors.push(ValidationError::new(&format!(
212                        "service output type `{}` does not match sink input type `{}`",
213                        service_output_type.value.name, sink_ty.value.name
214                    )));
215                }
216
217                if let (Some(sink_key), Some(service_key)) = (sink_ty.key, &service_output_type.key)
218                {
219                    if sink_key != *service_key {
220                        transforms_errors.push(
221                            ValidationError::new(&format!(
222                                "sink transforms input key type `{}` does not match service output key type `{}`",
223                                sink_key.name,
224                                service_key.name
225                            ))
226                        );
227                    }
228                }
229            }
230            Ok(None) => {}
231        }
232
233        if let Some(last_step) = self.steps.last() {
234            if let Err(e) = self.validate_source_or_sink_steps(
235                types,
236                service_output_type,
237                "service".to_string(),
238            ) {
239                failure
240                    .errors
241                    .push(IoRefValidationError::InvalidOperator(e.errors));
242            }
243
244            match get_transform_chain_output_from_last_step(last_step) {
245                Ok(output_ty) => match self.schema_type(topics) {
246                    Ok(Some(topic_type)) => {
247                        if topic_type.value != output_ty.value {
248                            transforms_errors.push(ValidationError::new(&format!(
249                                    "transforms steps final output type `{}` does not match topic type `{}`",
250                                    output_ty.value.name,
251                                    topic_type.value.name
252                                )));
253                        }
254
255                        if let Some(topic_key) = topic_type.key {
256                            if let Some(output_key) = output_ty.key {
257                                if topic_key != output_key {
258                                    transforms_errors.push(ValidationError::new(&format!(
259                                            "sink `{}` has transforms steps but final output key type `{}` does not match topic key type `{}`",
260                                            self.id,
261                                            output_key.name,
262                                            topic_key.name
263                                        )));
264                                }
265                            }
266                        }
267                    }
268                    Ok(None) => transforms_errors.push(ValidationError::new(
269                        "sink cannot have transforms steps without a target",
270                    )),
271                    _ => {}
272                },
273                Err(e) => {
274                    transforms_errors.push(e);
275                }
276            }
277        }
278
279        if !transforms_errors.is_empty() {
280            failure
281                .errors
282                .push(IoRefValidationError::InvalidTransformsBlock(
283                    transforms_errors,
284                ));
285        }
286
287        if failure.errors.is_empty() {
288            Ok(())
289        } else {
290            Err(failure)
291        }
292    }
293
294    pub fn validate_source_or_sink_steps(
295        &self,
296        types: &SdfTypesMap,
297        expected_input_type: &KVSchemaType,
298        input_provider_description: String,
299    ) -> Result<(), ValidationFailure> {
300        let mut errors = ValidationFailure::new();
301
302        if let Err(transforms_errors) = validate_transforms_steps(
303            &self.steps,
304            types,
305            expected_input_type.clone(),
306            input_provider_description,
307        ) {
308            errors.concat(&transforms_errors);
309        };
310
311        if errors.any() {
312            Err(errors)
313        } else {
314            Ok(())
315        }
316    }
317}
318
319// Helper function to get the output type of the last step in a transform chain
320// If the last step is a filter, its input type is returned
321fn get_transform_chain_output_from_last_step(
322    last_step: &TransformOperator,
323) -> Result<KVSchemaType, ValidationError> {
324    match last_step {
325        // if filter, use input type
326        TransformOperator::Filter(_) => {
327            if let Some(input_type) = last_step.input_type() {
328                Ok(input_type)
329            } else {
330                Err(ValidationError::new(
331                    "Last transforms step is invalid. Filter operator should have an input type",
332                ))
333            }
334        }
335        _ => {
336            if let Some(output_type) = last_step.output_type() {
337                Ok(output_type)
338            } else {
339                Err(ValidationError::new(
340                    "Last transforms step is invalid. Expected an operator with an output type",
341                ))
342            }
343        }
344    }
345}
346#[cfg(test)]
347mod test {
348    use crate::{
349        metadata::dataflow::io_ref::IoRefValidationError,
350        util::{
351            config_error::ConfigError, sdf_types_map::SdfTypesMap,
352            validation_error::ValidationError,
353        },
354        wit::{
355            dataflow::{IoRef, IoType, Topic, TransformOperator},
356            io::{SchemaSerDe, TopicSchema, TypeRef},
357            metadata::{NamedParameter, OutputType, Parameter, ParameterKind},
358            operator::StepInvocation,
359        },
360    };
361
362    fn topics() -> Vec<(String, Topic)> {
363        vec![
364            (
365                "my-topic".to_string(),
366                Topic {
367                    name: "my-topic".to_string(),
368                    schema: TopicSchema {
369                        key: None,
370                        value: SchemaSerDe {
371                            converter: None,
372                            type_: TypeRef {
373                                name: "u8".to_string(),
374                            },
375                        },
376                    },
377                    consumer: None,
378                    producer: None,
379                    profile: None,
380                },
381            ),
382            (
383                "my-other-topic".to_string(),
384                Topic {
385                    name: "my-other-topic".to_string(),
386                    schema: TopicSchema {
387                        key: None,
388                        value: SchemaSerDe {
389                            converter: None,
390                            type_: TypeRef {
391                                name: "u16".to_string(),
392                            },
393                        },
394                    },
395                    consumer: None,
396                    producer: None,
397                    profile: None,
398                },
399            ),
400        ]
401    }
402
403    #[test]
404    fn test_topic_type_returns_none_for_no_target() {
405        let io_ref = IoRef {
406            id: "no-target".to_string(),
407            type_: IoType::NoTarget,
408            steps: vec![],
409        };
410
411        assert_eq!(io_ref.schema_type(&[]), Ok(None));
412    }
413
414    #[test]
415    fn test_topic_type_returns_error_when_topic_not_found() {
416        let io_ref = IoRef {
417            id: "my-topic".to_string(),
418            type_: IoType::Topic,
419            steps: vec![],
420        };
421
422        assert_eq!(
423            io_ref.schema_type(&[]),
424            Err(IoRefValidationError::InvalidRef("my-topic".to_string()))
425        );
426    }
427
428    #[test]
429    fn test_topic_type_returns_topic_type() {
430        let io_ref = IoRef {
431            id: "my-topic".to_string(),
432            type_: IoType::Topic,
433            steps: vec![],
434        };
435
436        let topics = vec![(
437            "my-topic".to_string(),
438            Topic {
439                name: "my-topic".to_string(),
440                schema: TopicSchema {
441                    key: None,
442                    value: SchemaSerDe {
443                        type_: TypeRef {
444                            name: "string".to_string(),
445                        },
446                        converter: None,
447                    },
448                },
449                consumer: None,
450                producer: None,
451                profile: None,
452            },
453        )];
454
455        assert_eq!(
456            io_ref.schema_type(&topics),
457            Ok(Some(
458                (
459                    None,
460                    TypeRef {
461                        name: "string".to_string()
462                    }
463                )
464                    .into()
465            ))
466        );
467    }
468
469    #[test]
470    fn test_validate_source_or_sink_steps_validates_steps_as_a_transforms() {
471        let types = SdfTypesMap::default();
472        let io_ref = IoRef {
473            id: "my-topic".to_string(),
474            type_: IoType::Topic,
475            steps: vec![TransformOperator::Map(StepInvocation {
476                uses: "my-function".to_string(),
477                output: None,
478                ..Default::default()
479            })],
480        };
481
482        let expected_input_type = (
483            Some(TypeRef {
484                name: "bytes".to_string(),
485            }),
486            TypeRef {
487                name: "string".to_string(),
488            },
489        )
490            .into();
491
492        let res = io_ref
493            .validate_source_or_sink_steps(
494                &types,
495                &expected_input_type,
496                "topic `my-topic`".to_string(),
497            )
498            .expect_err("should error for invalid step");
499
500        assert!(res.errors.contains(&ValidationError::new(
501            "map type function `my-function` should have exactly 1 input type, found 0"
502        )));
503    }
504
505    #[test]
506    fn test_validate_source_rejects_source_when_topic_not_found() {
507        let types = SdfTypesMap::default();
508        let source = IoRef {
509            type_: IoType::Topic,
510            id: "my-source-topic".to_string(),
511            steps: vec![],
512        };
513
514        let res = source
515            .validate_source(&types, &[], None)
516            .expect_err("should error for missing sources");
517
518        assert!(res.errors.contains(&IoRefValidationError::InvalidRef(
519            "my-source-topic".to_string()
520        )));
521    }
522
523    #[test]
524    fn test_validate_source_rejects_source_when_last_step_has_no_output() {
525        let topics = topics();
526        let types = SdfTypesMap::default();
527
528        let source = IoRef {
529            type_: IoType::Topic,
530            id: "my-other-topic".to_string(),
531            steps: vec![TransformOperator::Map(StepInvocation {
532                uses: "my-function".to_string(),
533                inputs: vec![NamedParameter {
534                    name: "input".to_string(),
535                    type_: TypeRef {
536                        name: "u16".to_string(),
537                    },
538                    optional: false,
539                    kind: ParameterKind::Value,
540                }],
541                output: None,
542                ..Default::default()
543            })],
544        };
545
546        let res = source
547            .validate_source(&types, &topics, None)
548            .expect_err("should error for invalid step");
549
550        assert!(res
551            .errors
552            .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
553                ValidationError::new(
554                    "Last transforms step is invalid. Expected an operator with an output type"
555                )
556            ])));
557
558        assert!(res.readable(0).contains(
559            r#"Transforms block is invalid:
560    Last transforms step is invalid. Expected an operator with an output type
561"#
562        ));
563    }
564
565    #[test]
566    fn test_validate_source_rejects_source_when_topic_is_no_target() {
567        let types = SdfTypesMap::default();
568        let source = IoRef {
569            type_: IoType::NoTarget,
570            id: "".to_string(),
571            steps: vec![],
572        };
573
574        let res = source
575            .validate_source(&types, &[], None)
576            .expect_err("should error for missing sources");
577
578        assert!(res.errors.contains(&IoRefValidationError::NoTarget));
579
580        assert_eq!(
581            res.readable(0),
582            r#"Cannot have a source with no target
583"#
584        );
585    }
586
587    #[test]
588    fn test_validate_source_validates_source_transforms() {
589        let types = SdfTypesMap::default();
590        let source = IoRef {
591            id: "my-topic".to_string(),
592            type_: IoType::Topic,
593            steps: vec![TransformOperator::Map(StepInvocation {
594                uses: "my-function".to_string(),
595                inputs: vec![NamedParameter {
596                    name: "input".to_string(),
597                    type_: TypeRef {
598                        name: "u8".to_string(),
599                    },
600                    optional: false,
601                    kind: ParameterKind::Value,
602                }],
603                output: None,
604                ..Default::default()
605            })],
606        };
607        let topics = topics();
608
609        let res = source
610            .validate_source(&types, &topics, None)
611            .expect_err("should error for invalid step");
612
613        assert!(res
614            .errors
615            .contains(&IoRefValidationError::InvalidOperator(vec![
616                ValidationError::new("map type function `my-function` requires an output type")
617            ])));
618
619        assert_eq!(
620            res.readable(0),
621            r#"Transforms block is invalid:
622    Last transforms step is invalid. Expected an operator with an output type
623Invalid operator(s):
624    map type function `my-function` requires an output type
625"#
626        );
627    }
628
629    #[test]
630    fn test_validate_source_accepts_valid_sources() {
631        let types = SdfTypesMap::default();
632        let io_ref = IoRef {
633            id: "my-topic".to_string(),
634            type_: IoType::Topic,
635            steps: vec![
636                TransformOperator::Map(StepInvocation {
637                    uses: "my-function".to_string(),
638                    inputs: vec![NamedParameter {
639                        name: "input".to_string(),
640                        type_: TypeRef {
641                            name: "u8".to_string(),
642                        },
643                        optional: false,
644                        kind: ParameterKind::Value,
645                    }],
646                    output: Some(Parameter {
647                        type_: OutputType::Ref(TypeRef {
648                            name: "u16".to_string(),
649                        }),
650                        ..Default::default()
651                    }),
652                    ..Default::default()
653                }),
654                TransformOperator::Map(StepInvocation {
655                    uses: "my-other-function".to_string(),
656                    inputs: vec![NamedParameter {
657                        name: "input".to_string(),
658                        type_: TypeRef {
659                            name: "u16".to_string(),
660                        },
661                        optional: false,
662                        kind: ParameterKind::Value,
663                    }],
664                    output: Some(Parameter {
665                        type_: TypeRef {
666                            name: "u8".to_string(),
667                        }
668                        .into(),
669                        ..Default::default()
670                    }),
671                    ..Default::default()
672                }),
673            ],
674        };
675
676        io_ref
677            .validate_source(&types, &topics(), None)
678            .expect("should validate");
679    }
680
681    #[test]
682    fn test_validate_sink_rejects_sink_when_topic_not_found() {
683        let types = SdfTypesMap::default();
684        let sink = IoRef {
685            type_: IoType::Topic,
686            id: "my-sink-topic".to_string(),
687            steps: vec![],
688        };
689
690        let res = sink
691            .validate_sink(
692                &types,
693                &[],
694                &(
695                    Some(TypeRef {
696                        name: "bytes".to_string(),
697                    }),
698                    TypeRef {
699                        name: "string".to_string(),
700                    },
701                )
702                    .into(),
703            )
704            .expect_err("should error for missing sources");
705
706        assert!(res.errors.contains(&IoRefValidationError::InvalidRef(
707            "my-sink-topic".to_string()
708        )));
709
710        assert_eq!(
711            res.readable(0),
712            r#"Referenced topic `my-sink-topic` not found
713"#
714        );
715    }
716
717    #[test]
718    fn test_validate_sink_rejects_sink_when_first_step_has_no_input() {
719        let topics = topics();
720        let types = SdfTypesMap::default();
721
722        let sink = IoRef {
723            type_: IoType::Topic,
724            id: "my-other-topic".to_string(),
725            steps: vec![TransformOperator::Map(StepInvocation {
726                uses: "my-function".to_string(),
727                inputs: vec![],
728                ..Default::default()
729            })],
730        };
731
732        let res = sink
733            .validate_sink(
734                &types,
735                &topics,
736                &(
737                    Some(TypeRef {
738                        name: "bytes".to_string(),
739                    }),
740                    TypeRef {
741                        name: "string".to_string(),
742                    },
743                )
744                    .into(),
745            )
746            .expect_err("should error for invalid step");
747
748        assert!(res
749            .errors
750            .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
751                ValidationError::new("The first operator in a transforms block must take an input")
752            ])));
753
754        assert!(res.readable(0).contains(
755            r#"Transforms block is invalid:
756    The first operator in a transforms block must take an input
757"#
758        ));
759    }
760
761    #[test]
762    fn test_validate_sink_validates_sink_transforms() {
763        let types = SdfTypesMap::default();
764        let sink = IoRef {
765            id: "my-topic".to_string(),
766            type_: IoType::Topic,
767            steps: vec![
768                TransformOperator::Map(StepInvocation {
769                    uses: "my-function".to_string(),
770                    inputs: vec![NamedParameter {
771                        name: "input".to_string(),
772                        type_: TypeRef {
773                            name: "string".to_string(),
774                        },
775                        optional: false,
776                        kind: ParameterKind::Value,
777                    }],
778                    output: Some(Parameter {
779                        type_: TypeRef {
780                            name: "string".to_string(),
781                        }
782                        .into(),
783                        ..Default::default()
784                    }),
785                    ..Default::default()
786                }),
787                TransformOperator::Map(StepInvocation {
788                    uses: "my-other-function".to_string(),
789                    inputs: vec![NamedParameter {
790                        name: "input".to_string(),
791                        type_: TypeRef {
792                            name: "u8".to_string(),
793                        },
794                        optional: false,
795                        kind: ParameterKind::Value,
796                    }],
797                    output: Some(Parameter {
798                        type_: TypeRef {
799                            name: "string".to_string(),
800                        }
801                        .into(),
802                        ..Default::default()
803                    }),
804                    ..Default::default()
805                }),
806            ],
807        };
808        let topics = topics();
809
810        let res = sink
811            .validate_sink(
812                &types,
813                &topics,
814                &(
815                    Some(TypeRef {
816                        name: "bytes".to_string(),
817                    }),
818                    TypeRef {
819                        name: "string".to_string(),
820                    },
821                )
822                    .into(),
823            )
824            .expect_err("should error for invalid step");
825
826        assert!(res.errors.iter().any(|e| {
827            if let IoRefValidationError::InvalidOperator(transforms_errors) = e {
828                if transforms_errors.contains(&ValidationError::new("Function `my-other-function` input type was expected to match `string` type provided by function `my-function`, but `u8` was found.")) {
829                    return true
830                }
831            }
832
833            false
834        }));
835    }
836
837    #[test]
838    fn test_validate_sink_validates_last_transforms_step_matches_topic() {
839        let types = SdfTypesMap::default();
840        let sink = IoRef {
841            id: "my-topic".to_string(),
842            type_: IoType::Topic,
843            steps: vec![TransformOperator::Map(StepInvocation {
844                uses: "my-function".to_string(),
845                inputs: vec![NamedParameter {
846                    name: "input".to_string(),
847                    type_: TypeRef {
848                        name: "string".to_string(),
849                    },
850                    optional: false,
851                    kind: ParameterKind::Value,
852                }],
853                output: Some(Parameter {
854                    type_: TypeRef {
855                        name: "string".to_string(),
856                    }
857                    .into(),
858                    ..Default::default()
859                }),
860                ..Default::default()
861            })],
862        };
863        let topics = topics();
864
865        let res = sink
866            .validate_sink(
867                &types,
868                &topics,
869                &(
870                    Some(TypeRef {
871                        name: "bytes".to_string(),
872                    }),
873                    TypeRef {
874                        name: "string".to_string(),
875                    },
876                )
877                    .into(),
878            )
879            .expect_err("should error for invalid step");
880
881        assert!(res
882            .errors
883            .contains(&IoRefValidationError::InvalidTransformsBlock(vec![
884                ValidationError::new(
885                    "transforms steps final output type `string` does not match topic type `u8`"
886                )
887            ])));
888
889        assert!(res.readable(0).contains(
890            r#"Transforms block is invalid:
891    transforms steps final output type `string` does not match topic type `u8`
892"#
893        ));
894    }
895
896    #[test]
897    fn test_validate_sink_rejects_transforms_without_output() {
898        let types = SdfTypesMap::default();
899        let sink = IoRef {
900            id: "my-topic".to_string(),
901            type_: IoType::Topic,
902            steps: vec![TransformOperator::Map(StepInvocation {
903                uses: "my-function".to_string(),
904                inputs: vec![NamedParameter {
905                    name: "input".to_string(),
906                    type_: TypeRef {
907                        name: "string".to_string(),
908                    },
909                    optional: false,
910                    kind: ParameterKind::Value,
911                }],
912                output: None,
913                ..Default::default()
914            })],
915        };
916        let topics = topics();
917
918        let res = sink
919            .validate_sink(
920                &types,
921                &topics,
922                &(
923                    Some(TypeRef {
924                        name: "bytes".to_string(),
925                    }),
926                    TypeRef {
927                        name: "string".to_string(),
928                    },
929                )
930                    .into(),
931            )
932            .expect_err("should error for invalid step");
933
934        assert!(res.errors.iter().any(|e| {
935            if let IoRefValidationError::InvalidTransformsBlock(transforms_errors) = e {
936                transforms_errors.contains(&ValidationError::new(
937                    "Last transforms step is invalid. Expected an operator with an output type",
938                ))
939            } else {
940                false
941            }
942        }));
943
944        assert!(res.readable(0).contains(
945            r#"Transforms block is invalid:
946    Last transforms step is invalid. Expected an operator with an output type
947"#
948        ));
949    }
950
951    #[test]
952    fn test_validate_sink_rejects_transforms_without_a_sink_target() {
953        let types = SdfTypesMap::default();
954        let sink = IoRef {
955            id: "my-topic".to_string(),
956            type_: IoType::NoTarget,
957            steps: vec![TransformOperator::Map(StepInvocation {
958                uses: "my-function".to_string(),
959                inputs: vec![NamedParameter {
960                    name: "input".to_string(),
961                    type_: TypeRef {
962                        name: "u8".to_string(),
963                    },
964                    optional: false,
965                    kind: ParameterKind::Value,
966                }],
967                output: Some(Parameter {
968                    type_: TypeRef {
969                        name: "u16".to_string(),
970                    }
971                    .into(),
972                    ..Default::default()
973                }),
974                ..Default::default()
975            })],
976        };
977        let topics = topics();
978
979        let res = sink
980            .validate_sink(
981                &types,
982                &topics,
983                &(
984                    Some(TypeRef {
985                        name: "bytes".to_string(),
986                    }),
987                    TypeRef {
988                        name: "string".to_string(),
989                    },
990                )
991                    .into(),
992            )
993            .expect_err("should error for invalid step");
994
995        assert!(res.errors.iter().any(|e| {
996            if let IoRefValidationError::InvalidTransformsBlock(transforms_errors) = e {
997                if transforms_errors.contains(&ValidationError::new(
998                    "sink cannot have transforms steps without a target",
999                )) {
1000                    return true;
1001                }
1002            }
1003
1004            false
1005        }));
1006    }
1007
1008    #[test]
1009    fn test_validate_sink_accepts_valid_sinks() {
1010        let types = SdfTypesMap::default();
1011        let io_ref = IoRef {
1012            id: "my-topic".to_string(),
1013            type_: IoType::Topic,
1014            steps: vec![
1015                TransformOperator::Map(StepInvocation {
1016                    uses: "my-function".to_string(),
1017                    inputs: vec![NamedParameter {
1018                        name: "input".to_string(),
1019                        type_: TypeRef {
1020                            name: "u8".to_string(),
1021                        },
1022                        optional: false,
1023                        kind: ParameterKind::Value,
1024                    }],
1025                    output: Some(Parameter {
1026                        type_: TypeRef {
1027                            name: "u16".to_string(),
1028                        }
1029                        .into(),
1030                        ..Default::default()
1031                    }),
1032                    ..Default::default()
1033                }),
1034                TransformOperator::Map(StepInvocation {
1035                    uses: "my-other-function".to_string(),
1036                    inputs: vec![NamedParameter {
1037                        name: "input".to_string(),
1038                        type_: TypeRef {
1039                            name: "u16".to_string(),
1040                        },
1041                        optional: false,
1042                        kind: ParameterKind::Value,
1043                    }],
1044                    output: Some(Parameter {
1045                        type_: TypeRef {
1046                            name: "u8".to_string(),
1047                        }
1048                        .into(),
1049                        ..Default::default()
1050                    }),
1051                    ..Default::default()
1052                }),
1053            ],
1054        };
1055
1056        io_ref
1057            .validate_sink(
1058                &types,
1059                &topics(),
1060                &(
1061                    None,
1062                    TypeRef {
1063                        name: "u8".to_string(),
1064                    },
1065                )
1066                    .into(),
1067            )
1068            .expect("should validate");
1069    }
1070
1071    #[test]
1072    fn test_get_transform_chain_output_from_last_step_if_filter() {
1073        let last_step: crate::wit::operator::TransformOperator =
1074            TransformOperator::Filter(StepInvocation {
1075                uses: "my-filter".to_string(),
1076                inputs: vec![NamedParameter {
1077                    name: "input".to_string(),
1078                    type_: TypeRef {
1079                        name: "u8".to_string(),
1080                    },
1081                    optional: false,
1082                    kind: ParameterKind::Value,
1083                }],
1084                output: Some(Parameter {
1085                    type_: TypeRef {
1086                        name: "bool".to_string(),
1087                    }
1088                    .into(),
1089                    ..Default::default()
1090                }),
1091                ..Default::default()
1092            });
1093
1094        let res = super::get_transform_chain_output_from_last_step(&last_step)
1095            .expect("should return output type");
1096
1097        assert_eq!(
1098            res,
1099            (
1100                None,
1101                TypeRef {
1102                    name: "u8".to_string()
1103                }
1104            )
1105                .into()
1106        );
1107    }
1108
1109    #[test]
1110    fn test_get_transform_chain_output_from_last_step_if_map() {
1111        let last_step: crate::wit::operator::TransformOperator =
1112            TransformOperator::Map(StepInvocation {
1113                uses: "my-map".to_string(),
1114                inputs: vec![NamedParameter {
1115                    name: "input".to_string(),
1116                    type_: TypeRef {
1117                        name: "u8".to_string(),
1118                    },
1119                    optional: false,
1120                    kind: ParameterKind::Value,
1121                }],
1122                output: Some(Parameter {
1123                    type_: TypeRef {
1124                        name: "u16".to_string(),
1125                    }
1126                    .into(),
1127                    ..Default::default()
1128                }),
1129                ..Default::default()
1130            });
1131
1132        let res = super::get_transform_chain_output_from_last_step(&last_step)
1133            .expect("should return output type");
1134
1135        assert_eq!(
1136            res,
1137            (
1138                None,
1139                TypeRef {
1140                    name: "u16".to_string()
1141                }
1142            )
1143                .into()
1144        );
1145    }
1146
1147    #[test]
1148    fn test_source_type_returns_type_when_last_step_is_filter() {
1149        let source = IoRef {
1150            id: "my-source".to_string(),
1151            type_: IoType::Topic,
1152            steps: vec![TransformOperator::Filter(StepInvocation {
1153                uses: "my-filter".to_string(),
1154                inputs: vec![NamedParameter {
1155                    name: "input".to_string(),
1156                    type_: TypeRef {
1157                        name: "u8".to_string(),
1158                    },
1159                    optional: false,
1160                    kind: ParameterKind::Value,
1161                }],
1162                output: Some(Parameter {
1163                    type_: TypeRef {
1164                        name: "bool".to_string(),
1165                    }
1166                    .into(),
1167                    ..Default::default()
1168                }),
1169                ..Default::default()
1170            })],
1171        };
1172
1173        let topics = topics();
1174
1175        let res = source
1176            .source_type(&topics)
1177            .expect("should return output type");
1178
1179        assert_eq!(
1180            res,
1181            (
1182                None,
1183                TypeRef {
1184                    name: "u8".to_string()
1185                }
1186            )
1187                .into()
1188        );
1189    }
1190
1191    #[test]
1192    fn test_source_type_returns_type_when_last_step_is_map() {
1193        let source = IoRef {
1194            id: "my-source".to_string(),
1195            type_: IoType::Topic,
1196            steps: vec![TransformOperator::Map(StepInvocation {
1197                uses: "my-map".to_string(),
1198                inputs: vec![NamedParameter {
1199                    name: "input".to_string(),
1200                    type_: TypeRef {
1201                        name: "u8".to_string(),
1202                    },
1203                    optional: false,
1204                    kind: ParameterKind::Value,
1205                }],
1206                output: Some(Parameter {
1207                    type_: TypeRef {
1208                        name: "u16".to_string(),
1209                    }
1210                    .into(),
1211                    ..Default::default()
1212                }),
1213                ..Default::default()
1214            })],
1215        };
1216
1217        let topics = topics();
1218
1219        let res = source
1220            .source_type(&topics)
1221            .expect("should return output type");
1222
1223        assert_eq!(
1224            res,
1225            (
1226                None,
1227                TypeRef {
1228                    name: "u16".to_string()
1229                }
1230            )
1231                .into()
1232        );
1233    }
1234
1235    #[test]
1236    fn test_validate_schedule_undefined() {
1237        let source = IoRef {
1238            id: "my-schedule".to_string(),
1239            type_: IoType::Schedule,
1240            steps: vec![],
1241        };
1242
1243        let error = source
1244            .validate_schedule_defined(None)
1245            .expect_err("should error for undefined schedule");
1246
1247        assert_eq!(
1248            error,
1249            IoRefValidationError::InvalidRef("my-schedule".to_string())
1250        );
1251    }
1252
1253    #[test]
1254    fn test_source_type_returns_topic_type_with_no_steps() {
1255        let source = IoRef {
1256            id: "my-topic".to_string(),
1257            type_: IoType::Topic,
1258            steps: vec![],
1259        };
1260
1261        let topics = topics();
1262
1263        let res = source
1264            .source_type(&topics)
1265            .expect("should return output type");
1266
1267        assert_eq!(
1268            res,
1269            (
1270                None,
1271                TypeRef {
1272                    name: "u8".to_string()
1273                }
1274            )
1275                .into()
1276        );
1277    }
1278}