sdf_metadata/metadata/dataflow/
operations.rs

1use std::collections::BTreeMap;
2
3use anyhow::{Result, anyhow};
4
5use crate::{
6    importer::{function::imported_operator_config, states::inject_states},
7    metadata::{io::topic::KVSchemaType, operator::transforms::validate_transforms_steps},
8    util::{
9        config_error::{ConfigError, INDENT},
10        operator_placement::OperatorPlacement,
11        sdf_types_map::SdfTypesMap,
12        validate::validate_all,
13        validation_error::ValidationError,
14        validation_failure::ValidationFailure,
15    },
16    wit::{
17        dataflow::{
18            IoType, Operations, PackageDefinition, PackageImport, PostTransforms, ScheduleConfig,
19            Topic,
20        },
21        operator::StepInvocation,
22        package_interface::OperatorType,
23    },
24};
25
26use super::io_ref::IoRefValidationFailure;
27
28#[derive(Debug, Clone, Eq, PartialEq)]
29pub struct ServiceValidationFailure {
30    pub name: String,
31    pub errors: Vec<ServiceValidationError>,
32}
33
34impl ConfigError for ServiceValidationFailure {
35    fn readable(&self, indents: usize) -> String {
36        let mut result = format!(
37            "{}Service `{}` is invalid:\n",
38            INDENT.repeat(indents),
39            self.name
40        );
41
42        for error in &self.errors {
43            result.push_str(&error.readable(indents + 1));
44        }
45
46        result
47    }
48}
49
50#[derive(Debug, Clone, Eq, PartialEq)]
51pub enum ServiceValidationError {
52    NameEmpty,
53    MissingSourceTopic(String),
54    NoSources,
55    InvalidSource(IoRefValidationFailure),
56    InvalidSink(IoRefValidationFailure),
57    SourceTypeMismatch(String),
58    SinkTypeMismatch(String),
59    InvalidState(ValidationError),
60    InvalidTransformsSteps(ValidationFailure),
61    InvalidPostTransforms(ValidationFailure),
62}
63
64impl ConfigError for ServiceValidationError {
65    fn readable(&self, indents: usize) -> String {
66        let indent = INDENT.repeat(indents);
67
68        match self {
69            Self::NameEmpty => format!("{}Service name cannot be empty\n", indent),
70            Self::MissingSourceTopic(topic) => {
71                format!("{}Source topic `{}` not found\n", indent, topic)
72            }
73            Self::NoSources => format!("{}Service must have at least one source\n", indent),
74            Self::InvalidSource(error) => format!(
75                "{}Source `{}` is invalid:\n{}",
76                indent,
77                error.name,
78                error.readable(indents + 1),
79            ),
80            Self::InvalidSink(error) => format!(
81                "{}Sink `{}` is invalid:\n{}",
82                indent,
83                error.name,
84                error.readable(indents + 1),
85            ),
86            Self::SourceTypeMismatch(types) => format!(
87                "{}Sources for service must be identical, but the sources had the following types:\n{}{}{}\n",
88                indent, indent, INDENT, types
89            ),
90            Self::SinkTypeMismatch(types) => format!(
91                "{}Sinks for service must be identical, but the sinks had the following types:\n{}{}{}\n",
92                indent, indent, INDENT, types
93            ),
94            Self::InvalidState(error) => format!(
95                "{}State is invalid:\n{}",
96                indent,
97                error.readable(indents + 1)
98            ),
99            Self::InvalidTransformsSteps(error) => format!(
100                "{}Transforms block is invalid:\n{}",
101                indent,
102                error.readable(indents + 1)
103            ),
104            Self::InvalidPostTransforms(error) => error.readable(indents),
105        }
106    }
107}
108
109impl Default for Operations {
110    fn default() -> Self {
111        Operations {
112            name: "".to_string(),
113            sources: vec![],
114            sinks: vec![],
115            transforms: Default::default(),
116            post_transforms: None,
117            states: vec![],
118        }
119    }
120}
121
122impl Operations {
123    pub fn add_operator(
124        &mut self,
125        operator_type: OperatorType,
126        operator_placement: OperatorPlacement,
127        step_invocation: StepInvocation,
128    ) -> Result<()> {
129        if operator_placement.window {
130            match self.post_transforms {
131                Some(PostTransforms::AssignTimestamp(ref mut window)) => window.add_operator(
132                    operator_placement.transforms_index,
133                    operator_placement.partition,
134                    operator_type,
135                    step_invocation,
136                ),
137                _ => Err(anyhow!(
138                    "Cannot add operator. Window was specified but service does not have a window"
139                )),
140            }
141        } else if operator_placement.partition {
142            match self.post_transforms {
143                Some(PostTransforms::Partition(ref mut partition)) => partition.add_operator(
144                    operator_placement.transforms_index,
145                    operator_type,
146                    step_invocation,
147                ),
148                Some(PostTransforms::AssignTimestamp(_)) => Err(anyhow!(
149                    "Cannot add operator. Service does not have top level partition. To delete operator from window partition, please specify window"
150                )),
151                None => Err(anyhow!(
152                    "Cannot add operator. Parition was specified but service does not have a partition"
153                )),
154            }
155        } else {
156            self.transforms.insert_operator(
157                operator_placement.transforms_index,
158                operator_type,
159                step_invocation,
160            )
161        }
162    }
163
164    pub fn delete_operator(&mut self, operator_placement: OperatorPlacement) -> Result<()> {
165        if operator_placement.window {
166            match self.post_transforms {
167                Some(PostTransforms::AssignTimestamp(ref mut window)) => window.delete_operator(
168                    operator_placement.transforms_index,
169                    operator_placement.partition,
170                ),
171                _ => Err(anyhow!(
172                    "Cannot delete operator. Window was specified but service does not have a window"
173                )),
174            }
175        } else if operator_placement.partition {
176            match self.post_transforms {
177                Some(PostTransforms::Partition(ref mut partition)) => {
178                    partition.delete_operator(operator_placement.transforms_index)
179                }
180                Some(PostTransforms::AssignTimestamp(_)) => Err(anyhow!(
181                    "Cannot delete operator. Service does not have top level partition. To delete operator from window partition, please specify window"
182                )),
183                None => Err(anyhow!(
184                    "Cannot delete operator. Parition was specified but service does not have a partition"
185                )),
186            }
187        } else {
188            match operator_placement.transforms_index {
189                Some(index) => self.transforms.delete_operator(index),
190                None => Err(anyhow!(
191                    "Transforms index required to delete operator from transforms"
192                )),
193            }
194        }
195    }
196
197    pub fn import_operator_configs(
198        &mut self,
199        imports: &[PackageImport],
200        packages: &[PackageDefinition],
201    ) -> Result<()> {
202        let mut service_states: BTreeMap<_, _> = self
203            .states
204            .iter()
205            .map(|s| (s.name().to_owned(), s.clone()))
206            .collect();
207        for source in &mut self.sources {
208            if let IoType::Topic = source.type_ {
209                for step in &mut source.steps {
210                    if step.is_imported(imports) {
211                        *step = imported_operator_config(step, imports, packages)?;
212                    }
213                }
214            }
215        }
216
217        for sink in &mut self.sinks {
218            if let IoType::Topic = sink.type_ {
219                for step in &mut sink.steps {
220                    if step.is_imported(imports) {
221                        *step = imported_operator_config(step, imports, packages)?;
222
223                        inject_states(&mut service_states, &step.inner().states)?;
224                    }
225                }
226            }
227        }
228
229        for step in &mut self.transforms.steps {
230            if step.is_imported(imports) {
231                *step = imported_operator_config(step, imports, packages)?;
232                inject_states(&mut service_states, &step.inner().states)?;
233            }
234        }
235
236        if let Some(ref mut post_transforms) = self.post_transforms {
237            match post_transforms {
238                PostTransforms::AssignTimestamp(window) => {
239                    window.import_operator_configs(imports, packages, &mut service_states)?;
240                }
241                PostTransforms::Partition(partition) => {
242                    partition.import_operator_configs(imports, packages, &mut service_states)?;
243                }
244            }
245        }
246
247        self.states = service_states.into_values().collect();
248
249        Ok(())
250    }
251
252    #[cfg(feature = "parser")]
253    /// parse each inline operator and update the operator with the correct operator type
254    pub fn update_inline_operators(&mut self) -> Result<()> {
255        for source in &mut self.sources {
256            if let IoType::Topic = source.type_ {
257                for step in &mut source.steps {
258                    step.update_signature_from_code()?;
259                }
260            }
261        }
262
263        for sink in &mut self.sinks {
264            if let IoType::Topic = sink.type_ {
265                for step in &mut sink.steps {
266                    step.update_signature_from_code()?;
267                }
268            }
269        }
270
271        for step in &mut self.transforms.steps {
272            step.update_signature_from_code()?;
273        }
274
275        if let Some(ref mut post_transforms) = self.post_transforms {
276            post_transforms.update_inline_operators()?;
277        }
278        Ok(())
279    }
280
281    pub(crate) fn validate(
282        &self,
283        types: &SdfTypesMap,
284        topics: &[(String, Topic)],
285        schedules: Option<&[ScheduleConfig]>,
286    ) -> Result<(), ServiceValidationFailure> {
287        let mut failure = ServiceValidationFailure {
288            name: self.name.clone(),
289            errors: vec![],
290        };
291
292        if self.name.is_empty() {
293            failure.errors.push(ServiceValidationError::NameEmpty);
294        }
295
296        let service_input_type = match self.service_input_type(topics) {
297            Ok(ty) => ty,
298            Err(e) => {
299                failure.errors.push(e);
300                return Err(failure);
301            }
302        };
303
304        if let Err(e) = self.validate_sources(types, topics, schedules) {
305            failure.errors = [failure.errors, e].concat();
306        }
307
308        if let Err(e) = self.validate_states() {
309            for error in e.errors {
310                failure
311                    .errors
312                    .push(ServiceValidationError::InvalidState(error));
313            }
314        }
315
316        if let Err(transforms_validation_errors) = validate_transforms_steps(
317            &self.transforms.steps,
318            types,
319            service_input_type.clone(),
320            "sources".to_string(),
321        ) {
322            failure
323                .errors
324                .push(ServiceValidationError::InvalidTransformsSteps(
325                    transforms_validation_errors,
326                ));
327        }
328
329        let transforms_output_type = match self.transforms.output_type(service_input_type) {
330            Ok(ty) => ty,
331            Err(e) => {
332                failure
333                    .errors
334                    .push(ServiceValidationError::InvalidTransformsSteps(
335                        ValidationFailure { errors: vec![e] },
336                    ));
337
338                return Err(failure);
339            }
340        };
341
342        if let Some(post_transforms) = &self.post_transforms {
343            if let Err(post_transforms_error) =
344                post_transforms.validate(types, &transforms_output_type)
345            {
346                failure
347                    .errors
348                    .push(ServiceValidationError::InvalidPostTransforms(
349                        post_transforms_error,
350                    ));
351            }
352        }
353
354        let service_output_type = match self.post_transforms_output_type(transforms_output_type) {
355            Ok(ty) => ty,
356            Err(_e) => {
357                // should be unreachable since we already validated the transforms
358                return Err(failure);
359            }
360        };
361
362        if let Err(e) = self.validate_sinks(types, topics, service_output_type) {
363            failure.errors = [failure.errors, e].concat();
364        }
365
366        if failure.errors.is_empty() {
367            Ok(())
368        } else {
369            Err(failure)
370        }
371    }
372
373    fn validate_states(&self) -> Result<(), ValidationFailure> {
374        validate_all(&self.states)
375    }
376
377    fn validate_sources(
378        &self,
379        types: &SdfTypesMap,
380        topics: &[(String, Topic)],
381        schedules: Option<&[ScheduleConfig]>,
382    ) -> Result<(), Vec<ServiceValidationError>> {
383        let mut errors = vec![];
384        let mut source_types = vec![];
385
386        for source in &self.sources {
387            if let Err(source_error) = source.validate_source(types, topics, schedules) {
388                errors.push(ServiceValidationError::InvalidSource(source_error));
389            }
390
391            if let Ok(source_type) = source.source_type(topics) {
392                source_types.push((source.id.clone(), source_type));
393            }
394        }
395
396        // test the source types match
397        if !types_are_identical(&source_types) {
398            errors.push(ServiceValidationError::SourceTypeMismatch(
399                source_types
400                    .iter()
401                    .map(|(source_name, type_)| {
402                        if let Some(key) = &type_.key {
403                            format!(
404                                "{}: {}(key) - {}(value)",
405                                source_name, key.name, type_.value.name
406                            )
407                        } else {
408                            format!("{}: {}(value)", source_name, type_.value.name)
409                        }
410                    })
411                    .collect::<Vec<_>>()
412                    .join(", "),
413            ))
414        }
415
416        if errors.is_empty() {
417            Ok(())
418        } else {
419            Err(errors)
420        }
421    }
422
423    fn validate_sinks(
424        &self,
425        types: &SdfTypesMap,
426        topics: &[(String, Topic)],
427        service_output_type: KVSchemaType,
428    ) -> Result<(), Vec<ServiceValidationError>> {
429        let mut errors = vec![];
430        let mut sink_types = vec![];
431
432        for sink in &self.sinks {
433            if let Err(sink_error) = sink.validate_sink(types, topics, &service_output_type) {
434                errors.push(ServiceValidationError::InvalidSink(sink_error));
435            }
436
437            if let Ok(Some(source_type)) = sink.sink_type(topics) {
438                sink_types.push((sink.id.clone(), source_type));
439            }
440        }
441
442        if !types_are_identical(&sink_types) {
443            errors.push(ServiceValidationError::SinkTypeMismatch(
444                sink_types
445                    .iter()
446                    .map(|(sink_name, type_)| {
447                        if let Some(key) = &type_.key {
448                            format!(
449                                "{}: {}(key) - {}(value)",
450                                sink_name, key.name, type_.value.name
451                            )
452                        } else {
453                            format!("{}: {}(value)", sink_name, type_.value.name)
454                        }
455                    })
456                    .collect::<Vec<_>>()
457                    .join(", "),
458            ));
459        }
460
461        if errors.is_empty() {
462            Ok(())
463        } else {
464            Err(errors)
465        }
466    }
467
468    fn service_input_type(
469        &self,
470        topics: &[(String, Topic)],
471    ) -> Result<KVSchemaType, ServiceValidationError> {
472        match self.sources.first() {
473            Some(source) => match source.source_type(topics) {
474                Ok(ty) => Ok(ty),
475                _ => Err(ServiceValidationError::MissingSourceTopic(
476                    source.id.clone(),
477                )),
478            },
479            None => Err(ServiceValidationError::NoSources),
480        }
481    }
482
483    fn post_transforms_output_type(
484        &self,
485        transforms_output_type: KVSchemaType,
486    ) -> Result<KVSchemaType, ValidationError> {
487        if let Some(post_transforms) = &self.post_transforms {
488            post_transforms.output_type(transforms_output_type)
489        } else {
490            Ok(transforms_output_type)
491        }
492    }
493
494    pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
495        let ops = self
496            .transforms
497            .steps
498            .iter()
499            .map(|op| (op.inner().to_owned(), op.clone().into()))
500            .chain(
501                self.post_transforms
502                    .iter()
503                    .flat_map(|post_transforms| post_transforms.operators()),
504            )
505            .collect();
506
507        ops
508    }
509}
510
511fn types_are_identical(types: &[(String, KVSchemaType)]) -> bool {
512    let Some((_, mut sample_ty)) = types.first().cloned() else {
513        return true;
514    };
515
516    for (_, current_ty) in types.iter().skip(1) {
517        if current_ty.value != sample_ty.value {
518            return false;
519        }
520
521        match (&current_ty.key, &sample_ty.key) {
522            (Some(k), Some(fk)) => {
523                if k != fk {
524                    return false;
525                }
526            }
527            (Some(k), None) => sample_ty.key = Some(k.clone()),
528            _ => (),
529        }
530    }
531    true
532}
533
534#[cfg(test)]
535mod test {
536
537    use sdf_common::constants::DATAFLOW_STABLE_VERSION;
538
539    use crate::{
540        metadata::dataflow::{
541            io_ref::{IoRefValidationError, IoRefValidationFailure},
542            operations::ServiceValidationError,
543        },
544        util::{
545            config_error::ConfigError, operator_placement::OperatorPlacement,
546            sdf_types_map::SdfTypesMap, validation_error::ValidationError,
547            validation_failure::ValidationFailure,
548        },
549        wit::{
550            dataflow::{
551                IoRef, IoType, Operations, PackageDefinition, PackageImport, PostTransforms, Topic,
552            },
553            io::{SchemaSerDe, TopicSchema, TypeRef},
554            metadata::{
555                NamedParameter, OutputType, Parameter, ParameterKind, SdfKeyedState,
556                SdfKeyedStateValue,
557            },
558            operator::{
559                PartitionOperator, StepInvocation, StepState, TransformOperator, Transforms,
560                TumblingWindow, Window, WindowProperties, WindowKind, WatermarkConfig,
561            },
562            package_interface::{FunctionImport, Header, OperatorType, StateTyped},
563            states::{State, StateRef},
564        },
565    };
566
567    fn packages() -> Vec<PackageDefinition> {
568        vec![PackageDefinition {
569            api_version: DATAFLOW_STABLE_VERSION.to_owned(),
570            meta: Header {
571                name: "my-pkg".to_string(),
572                namespace: "my-ns".to_string(),
573                version: "0.1.0".to_string(),
574            },
575            functions: vec![my_fn()],
576            imports: vec![],
577            types: vec![],
578            states: vec![StateTyped {
579                name: "map-state".to_string(),
580                type_: SdfKeyedState {
581                    key: TypeRef {
582                        name: "string".to_string(),
583                    },
584                    value: SdfKeyedStateValue::U32,
585                },
586            }],
587            dev: None,
588        }]
589    }
590
591    fn my_fn() -> (StepInvocation, OperatorType) {
592        (
593            StepInvocation {
594                uses: "map-fn".to_string(),
595                inputs: vec![NamedParameter {
596                    name: "map-input".to_string(),
597                    type_: TypeRef {
598                        name: "u8".to_string(),
599                    },
600                    optional: false,
601                    kind: ParameterKind::Value,
602                }],
603                output: Some(Parameter {
604                    type_: TypeRef {
605                        name: "u8".to_string(),
606                    }
607                    .into(),
608                    ..Default::default()
609                }),
610                states: vec![StepState::Resolved(StateTyped {
611                    name: "map-state".to_string(),
612                    type_: SdfKeyedState {
613                        key: TypeRef {
614                            name: "string".to_string(),
615                        },
616                        value: SdfKeyedStateValue::U32,
617                    },
618                })],
619                ..Default::default()
620            },
621            OperatorType::Map,
622        )
623    }
624
625    fn imports() -> Vec<PackageImport> {
626        vec![PackageImport {
627            metadata: Header {
628                name: "my-pkg".to_string(),
629                namespace: "my-ns".to_string(),
630                version: "0.1.0".to_string(),
631            },
632            functions: vec![FunctionImport {
633                name: "map-fn".to_string(),
634                alias: None,
635            }],
636            path: Some("path/to/my-pkg".to_string()),
637            types: vec![],
638            states: vec![],
639        }]
640    }
641
642    fn operations() -> Operations {
643        Operations {
644            name: "my-service".to_string(),
645            sources: vec![IoRef {
646                type_: IoType::Topic,
647                id: "my-source".to_string(),
648                steps: vec![TransformOperator::Map(StepInvocation {
649                    uses: "map-fn".to_string(),
650                    ..Default::default()
651                })],
652            }],
653            sinks: vec![IoRef {
654                type_: IoType::Topic,
655                id: "my-source".to_string(),
656                steps: vec![TransformOperator::Map(StepInvocation {
657                    uses: "map-fn".to_string(),
658                    ..Default::default()
659                })],
660            }],
661            transforms: Transforms {
662                steps: vec![
663                    TransformOperator::Map(StepInvocation {
664                        uses: "map-fn".to_string(),
665                        ..Default::default()
666                    }),
667                    TransformOperator::Map(StepInvocation {
668                        uses: "map-fn".to_string(),
669                        ..Default::default()
670                    }),
671                ],
672            },
673            post_transforms: None,
674            states: vec![],
675        }
676    }
677
678    fn topics() -> Vec<(String, Topic)> {
679        vec![
680            (
681                "my-topic".to_string(),
682                Topic {
683                    name: "my-topic".to_string(),
684                    schema: TopicSchema {
685                        key: None,
686                        value: SchemaSerDe {
687                            converter: None,
688                            type_: TypeRef {
689                                name: "u8".to_string(),
690                            },
691                        },
692                    },
693                    consumer: None,
694                    producer: None,
695                    profile: None,
696                },
697            ),
698            (
699                "my-other-topic".to_string(),
700                Topic {
701                    name: "my-other-topic".to_string(),
702                    schema: TopicSchema {
703                        key: None,
704                        value: SchemaSerDe {
705                            converter: None,
706                            type_: TypeRef {
707                                name: "u16".to_string(),
708                            },
709                        },
710                    },
711                    consumer: None,
712                    producer: None,
713                    profile: None,
714                },
715            ),
716            (
717                "my-third-topic".to_string(),
718                Topic {
719                    name: "my-other-topic".to_string(),
720                    schema: TopicSchema {
721                        key: None,
722                        value: SchemaSerDe {
723                            converter: None,
724                            type_: TypeRef {
725                                name: "u16".to_string(),
726                            },
727                        },
728                    },
729                    consumer: None,
730                    producer: None,
731                    profile: None,
732                },
733            ),
734            (
735                "my-topic-with-key".to_string(),
736                Topic {
737                    name: "my-topic-with-key".to_string(),
738                    schema: TopicSchema {
739                        key: Some(SchemaSerDe {
740                            converter: None,
741                            type_: TypeRef {
742                                name: "string".to_string(),
743                            },
744                        }),
745                        value: SchemaSerDe {
746                            converter: None,
747                            type_: TypeRef {
748                                name: "u8".to_string(),
749                            },
750                        },
751                    },
752                    consumer: None,
753                    producer: None,
754                    profile: None,
755                },
756            ),
757            (
758                "my-topic-with-another-key".to_string(),
759                Topic {
760                    name: "my-topic-with-another-key".to_string(),
761                    schema: TopicSchema {
762                        key: Some(SchemaSerDe {
763                            converter: None,
764                            type_: TypeRef {
765                                name: "bytes".to_string(),
766                            },
767                        }),
768                        value: SchemaSerDe {
769                            converter: None,
770                            type_: TypeRef {
771                                name: "u8".to_string(),
772                            },
773                        },
774                    },
775                    consumer: None,
776                    producer: None,
777                    profile: None,
778                },
779            ),
780        ]
781    }
782
783    fn function() -> StepInvocation {
784        StepInvocation {
785            uses: "cat_map_cat".to_string(),
786            inputs: vec![NamedParameter {
787                name: "cat".to_string(),
788                type_: TypeRef {
789                    name: "string".to_string(),
790                },
791                optional: false,
792                kind: ParameterKind::Value,
793            }],
794            output: Some(Parameter {
795                type_: OutputType::Ref(TypeRef {
796                    name: "string".to_string(),
797                }),
798                ..Default::default()
799            }),
800            ..Default::default()
801        }
802    }
803
804    fn service_with_window() -> Operations {
805        let sources = vec![IoRef {
806            type_: IoType::Topic,
807            id: "listing".to_string(),
808            steps: vec![],
809        }];
810        let sinks = vec![IoRef {
811            type_: IoType::Topic,
812            id: "prospect".to_string(),
813            steps: vec![],
814        }];
815        let transforms = Transforms {
816            steps: vec![
817                TransformOperator::FilterMap(StepInvocation {
818                    uses: "listing_map_job".to_string(),
819                    ..Default::default()
820                }),
821                TransformOperator::Map(StepInvocation {
822                    uses: "job_map_prospect".to_string(),
823                    ..Default::default()
824                }),
825            ],
826        };
827
828        let post_transforms = Some(PostTransforms::AssignTimestamp(Window {
829            assign_timestamp: StepInvocation {
830                uses: "assign_timestamp".to_string(),
831                ..Default::default()
832            },
833            transforms: Transforms {
834                steps: vec![TransformOperator::Map(StepInvocation {
835                    uses: "prospect_map_prospect".to_string(),
836                    ..Default::default()
837                })],
838            },
839            partition: Some(PartitionOperator {
840                assign_key: StepInvocation {
841                    uses: "assign_key".to_string(),
842                    ..Default::default()
843                },
844                transforms: Transforms {
845                    steps: vec![TransformOperator::Map(StepInvocation {
846                        uses: "prospect_map_prospect2".to_string(),
847                        ..Default::default()
848                    })],
849                },
850                update_state: None,
851            }),
852            flush: Some(StepInvocation {
853                uses: "job_aggregate".to_string(),
854                ..Default::default()
855            }),
856            properties: WindowProperties {
857                kind: WindowKind::Tumbling(TumblingWindow {
858                    duration: 3600000,
859                    offset: 0,
860                }),
861                watermark_config: WatermarkConfig::default(),
862            },
863        }));
864
865        Operations {
866            name: "listing-to-prospect-op".to_string(),
867            sources,
868            sinks,
869            transforms,
870            post_transforms,
871            states: vec![],
872        }
873    }
874
875    fn service_with_partition() -> Operations {
876        let sources = vec![IoRef {
877            type_: IoType::Topic,
878            id: "listing".to_string(),
879            steps: vec![],
880        }];
881        let sinks = vec![IoRef {
882            type_: IoType::Topic,
883            id: "prospect".to_string(),
884            steps: vec![],
885        }];
886        let transforms = Transforms {
887            steps: vec![
888                TransformOperator::FilterMap(StepInvocation {
889                    uses: "listing_map_job".to_string(),
890                    ..Default::default()
891                }),
892                TransformOperator::Map(StepInvocation {
893                    uses: "job_map_prospect".to_string(),
894                    ..Default::default()
895                }),
896            ],
897        };
898
899        let post_transforms = Some(PostTransforms::Partition(PartitionOperator {
900            assign_key: StepInvocation {
901                uses: "assign_key".to_string(),
902                ..Default::default()
903            },
904            transforms: Transforms {
905                steps: vec![TransformOperator::Map(StepInvocation {
906                    uses: "prospect_map_prospect2".to_string(),
907                    ..Default::default()
908                })],
909            },
910            update_state: None,
911        }));
912
913        Operations {
914            name: "listing-to-prospect-op".to_string(),
915            sources,
916            sinks,
917            transforms,
918            post_transforms,
919            states: vec![],
920        }
921    }
922
923    #[test]
924    fn test_import_operator_configs_merges_operator_signatures() {
925        let mut operations = operations();
926
927        let source_steps = &operations.sources.first().unwrap().steps;
928        assert!(source_steps.first().unwrap().inner().inputs.is_empty());
929        assert!(source_steps.first().unwrap().inner().output.is_none());
930        assert!(source_steps.first().unwrap().inner().states.is_empty());
931
932        let sink_steps = &operations.sinks.first().unwrap().steps;
933        assert!(sink_steps.first().unwrap().inner().inputs.is_empty());
934        assert!(sink_steps.first().unwrap().inner().output.is_none());
935        assert!(sink_steps.first().unwrap().inner().states.is_empty());
936
937        let steps = &operations.transforms.steps;
938        assert!(steps.first().unwrap().inner().inputs.is_empty());
939        assert!(steps.first().unwrap().inner().output.is_none());
940        assert!(steps.first().unwrap().inner().states.is_empty());
941
942        assert!(operations.states.is_empty());
943
944        operations
945            .import_operator_configs(&imports(), &packages())
946            .unwrap();
947
948        let source_steps = &operations.sources.first().unwrap().steps;
949        assert_eq!(source_steps.first().unwrap().inner().inputs.len(), 1);
950        assert!(source_steps.first().unwrap().inner().output.is_some());
951        assert_eq!(source_steps.first().unwrap().inner().states.len(), 1);
952
953        let sink_steps = &operations.sinks.first().unwrap().steps;
954        assert_eq!(sink_steps.first().unwrap().inner().inputs.len(), 1);
955        assert!(sink_steps.first().unwrap().inner().output.is_some());
956        assert_eq!(sink_steps.first().unwrap().inner().states.len(), 1);
957
958        let steps = &operations.transforms.steps;
959        assert_eq!(steps.first().unwrap().inner().inputs.len(), 1);
960        assert!(steps.first().unwrap().inner().output.is_some());
961        assert_eq!(steps.first().unwrap().inner().states.len(), 1);
962
963        assert_eq!(operations.states.len(), 1);
964    }
965
966    #[test]
967    fn test_validate_rejects_service_without_name() {
968        let types = SdfTypesMap::default();
969        let service = Operations {
970            name: "".to_string(),
971            sources: vec![],
972            sinks: vec![],
973            transforms: Transforms { steps: vec![] },
974            post_transforms: None,
975            states: vec![],
976        };
977
978        let res = service
979            .validate(&types, &[], None)
980            .expect_err("should error for missing service name");
981
982        assert!(res.errors.contains(&ServiceValidationError::NameEmpty));
983
984        assert!(res.readable(0).contains(
985            r#"Service `` is invalid:
986    Service name cannot be empty
987"#
988        ));
989    }
990
991    #[test]
992    fn test_validate_validates_states() {
993        let types = SdfTypesMap::default();
994        let service = Operations {
995            name: "my-service".to_string(),
996            sources: vec![IoRef {
997                type_: IoType::Topic,
998                id: "my-topic".to_string(),
999                steps: vec![],
1000            }],
1001            sinks: vec![],
1002            transforms: Transforms { steps: vec![] },
1003            post_transforms: None,
1004            states: vec![State::Reference(StateRef {
1005                name: "my-state".to_string(),
1006                ref_service: "".to_string(),
1007            })],
1008        };
1009
1010        let res = service
1011            .validate(&types, &topics(), None)
1012            .expect_err("should error for invalid ref state");
1013
1014        assert!(res.errors.contains(&ServiceValidationError::InvalidState(ValidationError::new(
1015            "service name missing for state reference. state reference must be of the form <service>.<state>"
1016        ))));
1017        assert_eq!(
1018            res.readable(0),
1019            r#"Service `my-service` is invalid:
1020    State is invalid:
1021        service name missing for state reference. state reference must be of the form <service>.<state>
1022"#
1023        );
1024    }
1025
1026    #[test]
1027    fn test_validate_rejects_service_without_sources() {
1028        let types = SdfTypesMap::default();
1029        let service = Operations {
1030            name: "my-service".to_string(),
1031            sources: vec![],
1032            sinks: vec![],
1033            transforms: Transforms { steps: vec![] },
1034            post_transforms: None,
1035            states: vec![],
1036        };
1037
1038        let res = service
1039            .validate(&types, &[], None)
1040            .expect_err("should error for missing sources");
1041
1042        assert!(res.errors.contains(&ServiceValidationError::NoSources));
1043        assert_eq!(
1044            res.readable(0),
1045            r#"Service `my-service` is invalid:
1046    Service must have at least one source
1047"#
1048        );
1049    }
1050
1051    #[test]
1052    fn test_validate_sources_validates_each_source() {
1053        let types = SdfTypesMap::default();
1054        let service = Operations {
1055            name: "my-service".to_string(),
1056            sources: vec![IoRef {
1057                type_: IoType::Topic,
1058                id: "my-source-topic".to_string(),
1059                steps: vec![],
1060            }],
1061            sinks: vec![],
1062            transforms: Transforms { steps: vec![] },
1063            post_transforms: None,
1064            states: vec![],
1065        };
1066
1067        let res = service
1068            .validate(&types, &[], None)
1069            .expect_err("should error for missing sources");
1070
1071        assert!(res
1072            .errors
1073            .contains(&ServiceValidationError::MissingSourceTopic(
1074                "my-source-topic".to_string()
1075            )));
1076        assert_eq!(
1077            res.readable(0),
1078            r#"Service `my-service` is invalid:
1079    Source topic `my-source-topic` not found
1080"#
1081        );
1082    }
1083
1084    #[test]
1085    fn test_validate_rejects_sources_when_key_types_differ() {
1086        let topics = topics();
1087        let types = SdfTypesMap::default();
1088
1089        let service = Operations {
1090            name: "my-service".to_string(),
1091            sources: vec![
1092                IoRef {
1093                    type_: IoType::Topic,
1094                    id: "my-topic".to_string(),
1095                    steps: vec![],
1096                },
1097                IoRef {
1098                    type_: IoType::Topic,
1099                    id: "my-topic-with-key".to_string(),
1100                    steps: vec![],
1101                },
1102                IoRef {
1103                    type_: IoType::Topic,
1104                    id: "my-topic-with-another-key".to_string(),
1105                    steps: vec![],
1106                },
1107            ],
1108            sinks: vec![],
1109            transforms: Transforms { steps: vec![] },
1110            post_transforms: None,
1111            states: vec![],
1112        };
1113
1114        let res = service
1115            .validate(&types, &topics, None)
1116            .expect_err("should error for source type mismatch");
1117
1118        assert!(res.errors.contains(&ServiceValidationError::SourceTypeMismatch(
1119            "my-topic: u8(value), my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)".to_string()
1120        )));
1121        assert_eq!(
1122            res.readable(0),
1123            r#"Service `my-service` is invalid:
1124    Sources for service must be identical, but the sources had the following types:
1125        my-topic: u8(value), my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)
1126"#
1127        );
1128    }
1129
1130    #[test]
1131    fn test_validate_rejects_sources_when_type_values_differ() {
1132        let topics = topics();
1133        let types = SdfTypesMap::default();
1134
1135        let service = Operations {
1136            name: "my-service".to_string(),
1137            sources: vec![
1138                IoRef {
1139                    type_: IoType::Topic,
1140                    id: "my-topic".to_string(),
1141                    steps: vec![],
1142                },
1143                IoRef {
1144                    type_: IoType::Topic,
1145                    id: "my-other-topic".to_string(),
1146                    steps: vec![],
1147                },
1148            ],
1149            sinks: vec![],
1150            transforms: Transforms { steps: vec![] },
1151            post_transforms: None,
1152            states: vec![],
1153        };
1154
1155        let res = service
1156            .validate(&types, &topics, None)
1157            .expect_err("should error for source type mismatch");
1158
1159        assert!(res
1160            .errors
1161            .contains(&ServiceValidationError::SourceTypeMismatch(
1162                "my-topic: u8(value), my-other-topic: u16(value)".to_string()
1163            )));
1164
1165        // same case but with steps
1166        let service = Operations {
1167            name: "my-service".to_string(),
1168            sources: vec![
1169                IoRef {
1170                    type_: IoType::Topic,
1171                    id: "my-topic".to_string(),
1172                    steps: vec![],
1173                },
1174                IoRef {
1175                    type_: IoType::Topic,
1176                    id: "my-other-topic".to_string(),
1177                    steps: vec![TransformOperator::Map(StepInvocation {
1178                        uses: "my-function".to_string(),
1179                        inputs: vec![NamedParameter {
1180                            name: "input".to_string(),
1181                            type_: TypeRef {
1182                                name: "u16".to_string(),
1183                            },
1184                            optional: false,
1185                            kind: ParameterKind::Value,
1186                        }],
1187                        output: Some(Parameter {
1188                            type_: TypeRef {
1189                                name: "string".to_string(),
1190                            }
1191                            .into(),
1192                            ..Default::default()
1193                        }),
1194                        ..Default::default()
1195                    })],
1196                },
1197            ],
1198            sinks: vec![],
1199            transforms: Transforms { steps: vec![] },
1200            post_transforms: None,
1201            states: vec![],
1202        };
1203
1204        let res = service
1205            .validate(&types, &topics, None)
1206            .expect_err("should error for source type mismatch");
1207
1208        assert!(res
1209            .errors
1210            .contains(&ServiceValidationError::SourceTypeMismatch(
1211                "my-topic: u8(value), my-other-topic: string(value)".to_string()
1212            )));
1213
1214        assert_eq!(
1215            res.readable(0),
1216            r#"Service `my-service` is invalid:
1217    Sources for service must be identical, but the sources had the following types:
1218        my-topic: u8(value), my-other-topic: string(value)
1219"#
1220        );
1221    }
1222
1223    #[test]
1224    fn test_validate_accepts_valid_sources() {
1225        let topics = topics();
1226        let types = SdfTypesMap::default();
1227
1228        let service = Operations {
1229            name: "my-service".to_string(),
1230            sources: vec![
1231                IoRef {
1232                    type_: IoType::Topic,
1233                    id: "my-topic".to_string(),
1234                    steps: vec![],
1235                },
1236                IoRef {
1237                    type_: IoType::Topic,
1238                    id: "my-other-topic".to_string(),
1239                    steps: vec![TransformOperator::Map(StepInvocation {
1240                        uses: "my-function".to_string(),
1241                        inputs: vec![NamedParameter {
1242                            name: "input".to_string(),
1243                            type_: TypeRef {
1244                                name: "u16".to_string(),
1245                            },
1246                            optional: false,
1247                            kind: ParameterKind::Value,
1248                        }],
1249                        output: Some(Parameter {
1250                            type_: TypeRef {
1251                                name: "u8".to_string(),
1252                            }
1253                            .into(),
1254                            ..Default::default()
1255                        }),
1256                        ..Default::default()
1257                    })],
1258                },
1259            ],
1260            sinks: vec![],
1261            transforms: Transforms { steps: vec![] },
1262            post_transforms: None,
1263            states: vec![],
1264        };
1265
1266        service
1267            .validate(&types, &topics, None)
1268            .expect("should validate")
1269    }
1270
1271    #[test]
1272    fn test_validate_sinks_validates_each_sink() {
1273        let types = SdfTypesMap::default();
1274        let service = Operations {
1275            name: "my-service".to_string(),
1276            sources: vec![IoRef {
1277                type_: IoType::Topic,
1278                id: "my-topic".to_string(),
1279                steps: vec![],
1280            }],
1281            sinks: vec![IoRef {
1282                type_: IoType::Topic,
1283                id: "my-sink-topic".to_string(),
1284                steps: vec![],
1285            }],
1286            transforms: Transforms { steps: vec![] },
1287            post_transforms: None,
1288            states: vec![],
1289        };
1290
1291        let res = service
1292            .validate(&types, &topics(), None)
1293            .expect_err("should error for missing sink topic");
1294
1295        assert!(res.errors.contains(&ServiceValidationError::InvalidSink(
1296            IoRefValidationFailure {
1297                name: "my-sink-topic".to_string(),
1298                errors: vec![IoRefValidationError::InvalidRef(
1299                    "my-sink-topic".to_string()
1300                )]
1301            }
1302        )));
1303
1304        assert_eq!(
1305            res.readable(0),
1306            r#"Service `my-service` is invalid:
1307    Sink `my-sink-topic` is invalid:
1308        Referenced topic `my-sink-topic` not found
1309"#
1310        );
1311    }
1312
1313    #[test]
1314    fn test_validate_rejects_sinks_when_key_types_differ() {
1315        let topics = topics();
1316        let types = SdfTypesMap::default();
1317        let service = Operations {
1318            name: "my-service".to_string(),
1319            sources: vec![IoRef {
1320                type_: IoType::Topic,
1321                id: "my-topic".to_string(),
1322                steps: vec![],
1323            }],
1324            sinks: vec![
1325                IoRef {
1326                    type_: IoType::Topic,
1327                    id: "my-topic-with-key".to_string(),
1328                    steps: vec![],
1329                },
1330                IoRef {
1331                    type_: IoType::Topic,
1332                    id: "my-topic-with-another-key".to_string(),
1333                    steps: vec![],
1334                },
1335            ],
1336            transforms: Transforms { steps: vec![] },
1337            post_transforms: None,
1338            states: vec![],
1339        };
1340
1341        let res = service
1342            .validate(&types, &topics, None)
1343            .expect_err("should error for sink type mismatch");
1344
1345        assert!(res.errors.contains(&ServiceValidationError::SinkTypeMismatch(
1346            "my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)".to_string()
1347        )));
1348
1349        assert_eq!(
1350            res.readable(0),
1351            r#"Service `my-service` is invalid:
1352    Sinks for service must be identical, but the sinks had the following types:
1353        my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)
1354"#
1355        );
1356    }
1357
1358    #[test]
1359    fn test_validate_rejects_sinks_when_value_types_differ() {
1360        let topics = topics();
1361        let types = SdfTypesMap::default();
1362
1363        let service = Operations {
1364            name: "my-service".to_string(),
1365            sources: vec![IoRef {
1366                type_: IoType::Topic,
1367                id: "my-topic".to_string(),
1368                steps: vec![],
1369            }],
1370            sinks: vec![
1371                IoRef {
1372                    type_: IoType::Topic,
1373                    id: "my-topic".to_string(),
1374                    steps: vec![],
1375                },
1376                IoRef {
1377                    type_: IoType::Topic,
1378                    id: "my-other-topic".to_string(),
1379                    steps: vec![],
1380                },
1381            ],
1382            transforms: Transforms { steps: vec![] },
1383            post_transforms: None,
1384            states: vec![],
1385        };
1386
1387        let res = service
1388            .validate(&types, &topics, None)
1389            .expect_err("should error for sink type mismatch");
1390
1391        assert!(res
1392            .errors
1393            .contains(&ServiceValidationError::SinkTypeMismatch(
1394                "my-topic: u8(value), my-other-topic: u16(value)".to_string()
1395            )));
1396
1397        assert_eq!(
1398            res.readable(0),
1399            r#"Service `my-service` is invalid:
1400    Sink `my-other-topic` is invalid:
1401        Transforms block is invalid:
1402            service output type `u8` does not match sink input type `u16`
1403    Sinks for service must be identical, but the sinks had the following types:
1404        my-topic: u8(value), my-other-topic: u16(value)
1405"#
1406        );
1407    }
1408
1409    #[test]
1410    fn test_validate_accepts_valid_sinks() {
1411        let topics = topics();
1412        let types = SdfTypesMap::default();
1413
1414        let service = Operations {
1415            name: "my-service".to_string(),
1416            sources: vec![IoRef {
1417                type_: IoType::Topic,
1418                id: "my-topic".to_string(),
1419                steps: vec![],
1420            }],
1421            sinks: vec![
1422                IoRef {
1423                    type_: IoType::Topic,
1424                    id: "my-topic".to_string(),
1425                    steps: vec![],
1426                },
1427                IoRef {
1428                    type_: IoType::Topic,
1429                    id: "my-other-topic".to_string(),
1430                    steps: vec![TransformOperator::Map(StepInvocation {
1431                        uses: "my-function".to_string(),
1432                        inputs: vec![NamedParameter {
1433                            name: "input".to_string(),
1434                            type_: TypeRef {
1435                                name: "u8".to_string(),
1436                            },
1437                            optional: false,
1438                            kind: ParameterKind::Value,
1439                        }],
1440                        output: Some(Parameter {
1441                            type_: TypeRef {
1442                                name: "u16".to_string(),
1443                            }
1444                            .into(),
1445                            ..Default::default()
1446                        }),
1447                        ..Default::default()
1448                    })],
1449                },
1450            ],
1451            transforms: Transforms { steps: vec![] },
1452            post_transforms: None,
1453            states: vec![],
1454        };
1455
1456        service
1457            .validate(&types, &topics, None)
1458            .expect("should validate")
1459    }
1460
1461    #[test]
1462    fn test_validate_validates_transforms() {
1463        let topics = topics();
1464        let types = SdfTypesMap::default();
1465
1466        let service = Operations {
1467            name: "my-service".to_string(),
1468            sources: vec![IoRef {
1469                type_: IoType::Topic,
1470                id: "my-topic".to_string(),
1471                steps: vec![],
1472            }],
1473            transforms: Transforms {
1474                steps: vec![TransformOperator::Map(StepInvocation {
1475                    uses: "my-function".to_string(),
1476                    inputs: vec![NamedParameter {
1477                        name: "input".to_string(),
1478                        type_: TypeRef {
1479                            name: "u8".to_string(),
1480                        },
1481                        optional: false,
1482                        kind: ParameterKind::Value,
1483                    }],
1484                    output: Some(Parameter {
1485                        type_: TypeRef {
1486                            name: "foobar".to_string(),
1487                        }
1488                        .into(),
1489                        ..Default::default()
1490                    }),
1491                    ..Default::default()
1492                })],
1493            },
1494            sinks: vec![],
1495            post_transforms: None,
1496            states: vec![],
1497        };
1498
1499        let res = service
1500            .validate(&types, &topics, None)
1501            .expect_err("should fail for output type not in scope");
1502
1503        assert!(res.errors.contains(&ServiceValidationError::InvalidTransformsSteps(
1504            ValidationFailure {
1505                errors: vec![ValidationError::new("function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")]
1506            }
1507        )));
1508
1509        assert_eq!(
1510            res.readable(0),
1511            r#"Service `my-service` is invalid:
1512    Transforms block is invalid:
1513        function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1514"#
1515        );
1516    }
1517
1518    #[test]
1519    fn test_validate_transforms_with_different_keys() {
1520        let topics = topics();
1521        let types = SdfTypesMap::default();
1522        let service = Operations {
1523            name: "my-service".to_string(),
1524            sources: vec![IoRef {
1525                type_: IoType::Topic,
1526                id: "my-topic-with-key".to_string(),
1527                steps: vec![],
1528            }],
1529            sinks: vec![],
1530            transforms: Transforms {
1531                steps: vec![
1532                    TransformOperator::Map(StepInvocation {
1533                        uses: "my-function".to_string(),
1534                        inputs: vec![NamedParameter {
1535                            name: "input".to_string(),
1536                            type_: TypeRef {
1537                                name: "u8".to_string(),
1538                            },
1539                            optional: false,
1540                            kind: ParameterKind::Value,
1541                        }],
1542                        output: Some(Parameter {
1543                            type_: TypeRef {
1544                                name: "string".to_string(),
1545                            }
1546                            .into(),
1547                            ..Default::default()
1548                        }),
1549                        ..Default::default()
1550                    }),
1551                    TransformOperator::Filter(StepInvocation {
1552                        uses: "my-other-function".to_string(),
1553                        inputs: vec![
1554                            NamedParameter {
1555                                name: "key".to_string(),
1556                                type_: TypeRef {
1557                                    name: "bytes".to_string(),
1558                                },
1559                                optional: false,
1560                                kind: ParameterKind::Key,
1561                            },
1562                            NamedParameter {
1563                                name: "input".to_string(),
1564                                type_: TypeRef {
1565                                    name: "string".to_string(),
1566                                },
1567                                optional: false,
1568                                kind: ParameterKind::Value,
1569                            },
1570                        ],
1571                        output: Some(Parameter {
1572                            type_: TypeRef {
1573                                name: "bool".to_string(),
1574                            }
1575                            .into(),
1576                            ..Default::default()
1577                        }),
1578                        ..Default::default()
1579                    }),
1580                ],
1581            },
1582
1583            states: vec![],
1584            post_transforms: None,
1585        };
1586
1587        let res = service
1588            .validate(&types, &topics, None)
1589            .expect_err("should fail for steps with different keys");
1590
1591        assert!(res
1592            .errors
1593            .contains(&ServiceValidationError::InvalidTransformsSteps(
1594                ValidationFailure {
1595                    errors: vec![ValidationError::new(
1596                    "in `my-other-function`, key type does not match expected key type. bytes != string"
1597                )],
1598                }
1599            )));
1600
1601        assert_eq!(
1602            res.readable(0),
1603            r#"Service `my-service` is invalid:
1604    Transforms block is invalid:
1605        in `my-other-function`, key type does not match expected key type. bytes != string
1606"#
1607        );
1608    }
1609
1610    #[test]
1611    fn test_validate_validates_partition() {
1612        let topics = topics();
1613        let types = SdfTypesMap::default();
1614
1615        let service = Operations {
1616            name: "my-service".to_string(),
1617            sources: vec![IoRef {
1618                type_: IoType::Topic,
1619                id: "my-topic".to_string(),
1620                steps: vec![],
1621            }],
1622            transforms: Transforms { steps: vec![] },
1623            sinks: vec![],
1624            post_transforms: Some(PostTransforms::Partition(PartitionOperator {
1625                assign_key: StepInvocation {
1626                    uses: "my-assign-key".to_string(),
1627                    inputs: vec![NamedParameter {
1628                        name: "input".to_string(),
1629                        type_: TypeRef {
1630                            name: "u8".to_string(),
1631                        },
1632                        optional: false,
1633                        kind: ParameterKind::Value,
1634                    }],
1635                    output: Some(Parameter {
1636                        type_: TypeRef {
1637                            name: "string".to_string(),
1638                        }
1639                        .into(),
1640                        ..Default::default()
1641                    }),
1642                    ..Default::default()
1643                },
1644                transforms: Transforms {
1645                    steps: vec![TransformOperator::Map(StepInvocation {
1646                        uses: "my-function".to_string(),
1647                        inputs: vec![NamedParameter {
1648                            name: "input".to_string(),
1649                            type_: TypeRef {
1650                                name: "u8".to_string(),
1651                            },
1652                            optional: false,
1653                            kind: ParameterKind::Value,
1654                        }],
1655                        output: Some(Parameter {
1656                            type_: TypeRef {
1657                                name: "foobar".to_string(),
1658                            }
1659                            .into(),
1660                            ..Default::default()
1661                        }),
1662                        ..Default::default()
1663                    })],
1664                },
1665                update_state: None,
1666            })),
1667            states: vec![],
1668        };
1669
1670        let res = service
1671            .validate(&types, &topics, None)
1672            .expect_err("should fail for output type not in scope");
1673
1674        assert!(res.errors.contains(&ServiceValidationError::InvalidPostTransforms(
1675            ValidationFailure {
1676                errors: vec![ValidationError::new("Partition transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")]
1677            }
1678        )));
1679
1680        assert_eq!(
1681            res.readable(0),
1682            r#"Service `my-service` is invalid:
1683    Partition transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1684"#
1685        );
1686    }
1687
1688    #[test]
1689    fn test_validate_validates_window() {
1690        let topics = topics();
1691        let types = SdfTypesMap::default();
1692
1693        let service = Operations {
1694            name: "my-service".to_string(),
1695            sources: vec![IoRef {
1696                type_: IoType::Topic,
1697                id: "my-topic".to_string(),
1698                steps: vec![],
1699            }],
1700            transforms: Transforms { steps: vec![] },
1701            sinks: vec![],
1702            post_transforms: Some(PostTransforms::AssignTimestamp(Window {
1703                transforms: Transforms {
1704                    steps: vec![TransformOperator::Map(StepInvocation {
1705                        uses: "my-function".to_string(),
1706                        inputs: vec![NamedParameter {
1707                            name: "input".to_string(),
1708                            type_: TypeRef {
1709                                name: "u8".to_string(),
1710                            },
1711                            optional: false,
1712                            kind: ParameterKind::Value,
1713                        }],
1714                        output: Some(Parameter {
1715                            type_: TypeRef {
1716                                name: "foobar".to_string(),
1717                            }
1718                            .into(),
1719                            ..Default::default()
1720                        }),
1721                        ..Default::default()
1722                    })],
1723                },
1724                ..Default::default()
1725            })),
1726            states: vec![],
1727        };
1728
1729        let res = service
1730            .validate(&types, &topics, None)
1731            .expect_err("should fail for output type not in scope");
1732        println!("{:#?}", res);
1733
1734        assert!(res.errors.iter().any(|e|{
1735            if let ServiceValidationError::InvalidPostTransforms(failure) = e {
1736                if failure.errors.contains(
1737                    &ValidationError::new("Window transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")
1738                ) {
1739                    return true;
1740                }
1741            }
1742
1743            false
1744        }));
1745
1746        assert_eq!(
1747            res.readable(0),
1748            r#"Service `my-service` is invalid:
1749    Window assign-timestamp type function `` should have exactly 2 input type, found 0
1750    Window assign-timestamp type function `` requires an output type
1751    Window transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1752"#
1753        );
1754    }
1755
1756    fn map_fn(name: String, input_type: String, output_type: String) -> TransformOperator {
1757        TransformOperator::Map(StepInvocation {
1758            uses: name,
1759            inputs: vec![NamedParameter {
1760                name: "input".to_string(),
1761                type_: TypeRef { name: input_type },
1762                optional: false,
1763                kind: ParameterKind::Value,
1764            }],
1765            output: Some(Parameter {
1766                type_: TypeRef { name: output_type }.into(),
1767                ..Default::default()
1768            }),
1769            ..Default::default()
1770        })
1771    }
1772
1773    fn assign_timestamp_fn(input_type: String) -> StepInvocation {
1774        StepInvocation {
1775            uses: "assign-timestamp".to_string(),
1776            inputs: vec![
1777                NamedParameter {
1778                    name: "input".to_string(),
1779                    type_: TypeRef { name: input_type },
1780                    optional: false,
1781                    kind: ParameterKind::Value,
1782                },
1783                NamedParameter {
1784                    name: "timestamp".to_string(),
1785                    type_: TypeRef {
1786                        name: "i64".to_string(),
1787                    },
1788                    optional: false,
1789                    kind: ParameterKind::Value,
1790                },
1791            ],
1792            output: Some(Parameter {
1793                type_: TypeRef {
1794                    name: "i64".to_string(),
1795                }
1796                .into(),
1797                ..Default::default()
1798            }),
1799            ..Default::default()
1800        }
1801    }
1802
1803    fn assign_key_fn(input_type: String) -> StepInvocation {
1804        StepInvocation {
1805            uses: "assign-timestamp".to_string(),
1806            inputs: vec![NamedParameter {
1807                name: "input".to_string(),
1808                type_: TypeRef { name: input_type },
1809                optional: false,
1810                kind: ParameterKind::Value,
1811            }],
1812            output: Some(Parameter {
1813                type_: TypeRef {
1814                    name: "string".to_string(),
1815                }
1816                .into(),
1817                ..Default::default()
1818            }),
1819            ..Default::default()
1820        }
1821    }
1822
1823    fn window_aggregate_fn(output_type: String) -> StepInvocation {
1824        StepInvocation {
1825            uses: "window-aggregate".to_string(),
1826            inputs: vec![],
1827            output: Some(Parameter {
1828                type_: TypeRef { name: output_type }.into(),
1829                ..Default::default()
1830            }),
1831            ..Default::default()
1832        }
1833    }
1834
1835    #[test]
1836    fn test_types_validate_throughout() {
1837        let topics = topics();
1838        let types = SdfTypesMap::default();
1839
1840        let service = Operations {
1841            name: "my-service".to_string(),
1842            sources: vec![IoRef {
1843                type_: IoType::Topic,
1844                id: "my-topic".to_string(),
1845                steps: vec![map_fn("1".to_string(), "u8".to_string(), "u32".to_string())],
1846            }],
1847            transforms: Transforms {
1848                steps: vec![map_fn(
1849                    "2".to_string(),
1850                    "u32".to_string(),
1851                    "u64".to_string(),
1852                )],
1853            },
1854            post_transforms: Some(PostTransforms::AssignTimestamp(Window {
1855                assign_timestamp: assign_timestamp_fn("u64".to_string()),
1856                transforms: Transforms {
1857                    steps: vec![map_fn("3".to_string(), "u64".to_string(), "i8".to_string())],
1858                },
1859                partition: Some(PartitionOperator {
1860                    assign_key: assign_key_fn("i8".to_string()),
1861                    transforms: Transforms {
1862                        steps: vec![map_fn("4".to_string(), "i8".to_string(), "i16".to_string())],
1863                    },
1864                    update_state: None,
1865                }),
1866                flush: Some(window_aggregate_fn("u16".to_string())),
1867                ..Default::default()
1868            })),
1869            sinks: vec![IoRef {
1870                type_: IoType::Topic,
1871                id: "my-other-topic".to_string(),
1872                steps: vec![],
1873            }],
1874            states: vec![],
1875        };
1876
1877        service
1878            .validate(&types, &topics, None)
1879            .expect("should validate");
1880    }
1881
1882    #[test]
1883    fn test_operators() {
1884        let operations = Operations {
1885            name: "my-service".to_string(),
1886            sources: vec![IoRef {
1887                type_: IoType::Topic,
1888                id: "my-topic".to_string(),
1889                steps: vec![map_fn(
1890                    "my-step".to_string(),
1891                    "u8".to_string(),
1892                    "u32".to_string(),
1893                )],
1894            }],
1895            transforms: Transforms {
1896                steps: vec![map_fn(
1897                    "my-map".to_string(),
1898                    "u32".to_string(),
1899                    "u64".to_string(),
1900                )],
1901            },
1902            post_transforms: None,
1903            sinks: vec![IoRef {
1904                type_: IoType::Topic,
1905                id: "my-other-topic".to_string(),
1906                steps: vec![],
1907            }],
1908            states: vec![],
1909        };
1910
1911        let ops = operations.operators();
1912
1913        assert_eq!(ops.len(), 1);
1914        assert_eq!(ops[0].0.uses, "my-map");
1915    }
1916
1917    #[test]
1918    fn test_validate_types_when_there_are_no_transforms_steps() {
1919        let topics = topics();
1920        let types = SdfTypesMap::default();
1921
1922        let service = Operations {
1923            name: "my-service".to_string(),
1924            sources: vec![IoRef {
1925                type_: IoType::Topic,
1926                id: "my-topic".to_string(),
1927                steps: vec![],
1928            }],
1929            transforms: Transforms { steps: vec![] },
1930            post_transforms: None,
1931            sinks: vec![IoRef {
1932                type_: IoType::Topic,
1933                id: "my-other-topic".to_string(),
1934                steps: vec![],
1935            }],
1936            states: vec![],
1937        };
1938
1939        let res = service
1940            .validate(&types, &topics, None)
1941            .expect_err("should validate");
1942
1943        assert!(res.errors.contains(&ServiceValidationError::InvalidSink(
1944            IoRefValidationFailure {
1945                name: "my-other-topic".to_string(),
1946                errors: vec![IoRefValidationError::InvalidTransformsBlock(vec![
1947                    ValidationError::new(
1948                        "service output type `u8` does not match sink input type `u16`"
1949                    )
1950                ])]
1951            }
1952        )));
1953
1954        assert_eq!(
1955            res.readable(0),
1956            r#"Service `my-service` is invalid:
1957    Sink `my-other-topic` is invalid:
1958        Transforms block is invalid:
1959            service output type `u8` does not match sink input type `u16`
1960"#
1961        );
1962    }
1963
1964    #[test]
1965    fn test_validate_topic_without_key_with_function_that_requires_key() {
1966        let topics = topics();
1967        let types = SdfTypesMap::default();
1968
1969        let service = Operations {
1970            name: "my-service".to_string(),
1971            sources: vec![IoRef {
1972                type_: IoType::Topic,
1973                id: "my-topic".to_string(),
1974                steps: vec![],
1975            }],
1976            transforms: Transforms {
1977                steps: vec![TransformOperator::Map(StepInvocation {
1978                    uses: "my-function".to_string(),
1979                    inputs: vec![
1980                        NamedParameter {
1981                            name: "k".to_string(),
1982                            type_: TypeRef {
1983                                name: "string".to_string(),
1984                            },
1985                            optional: false,
1986                            kind: ParameterKind::Key,
1987                        },
1988                        NamedParameter {
1989                            name: "value".to_string(),
1990                            type_: TypeRef {
1991                                name: "u8".to_string(),
1992                            },
1993                            optional: false,
1994                            kind: ParameterKind::Value,
1995                        },
1996                    ],
1997                    output: Some(Parameter {
1998                        type_: TypeRef {
1999                            name: "u8".to_string(),
2000                        }
2001                        .into(),
2002                        ..Default::default()
2003                    }),
2004                    ..Default::default()
2005                })],
2006            },
2007            post_transforms: None,
2008            sinks: vec![IoRef {
2009                type_: IoType::Topic,
2010                id: "my-other-topic".to_string(),
2011                steps: vec![],
2012            }],
2013            states: vec![],
2014        };
2015
2016        let resp = service
2017            .validate(&types, &topics, None)
2018            .expect_err("should validate");
2019
2020        assert!(
2021            resp.errors.contains(&ServiceValidationError::InvalidTransformsSteps(
2022                ValidationFailure {
2023                    errors: vec![ValidationError::new(
2024                        "my-function function requires a key, but none was found. Make sure that you define the right key in the topic configuration"
2025                    )]
2026                }
2027            ))
2028        );
2029    }
2030
2031    #[test]
2032    fn test_add_operator() {
2033        let mut service = service_with_window();
2034
2035        let operator_placement = OperatorPlacement {
2036            service_id: "listing-to-prospect-op".to_string(),
2037            transforms_index: Some(2),
2038            ..Default::default()
2039        };
2040
2041        let function = function();
2042
2043        service
2044            .add_operator(OperatorType::Map, operator_placement, function.clone())
2045            .expect("Failed to add imported operator");
2046
2047        let result_operator = service.transforms.steps[2].clone();
2048
2049        assert_eq!(result_operator, TransformOperator::Map(function));
2050    }
2051
2052    #[test]
2053    fn test_add_partition_transforms_operator() {
2054        let mut service = service_with_partition();
2055
2056        let operator_placement = OperatorPlacement {
2057            service_id: "listing-to-prospect-op".to_string(),
2058            transforms_index: Some(1),
2059            partition: true,
2060            window: false,
2061        };
2062
2063        let function = function();
2064
2065        service
2066            .add_operator(OperatorType::Map, operator_placement, function.clone())
2067            .expect("Failed to add imported operator");
2068
2069        let result_operator = match service.post_transforms {
2070            Some(PostTransforms::Partition(partition)) => partition.transforms.steps[1].clone(),
2071            _ => panic!("expected partition"),
2072        };
2073
2074        assert_eq!(result_operator, TransformOperator::Map(function));
2075    }
2076
2077    #[test]
2078    fn test_add_partition_operator_when_window_incorrectly_specified() {
2079        let mut service = service_with_window();
2080
2081        let operator_placement = OperatorPlacement {
2082            service_id: "listing-to-prospect-op".to_string(),
2083            window: false,
2084            partition: true,
2085            transforms_index: Some(0),
2086        };
2087
2088        let function = function();
2089
2090        let res = service.add_operator(OperatorType::Map, operator_placement, function);
2091
2092        assert!(res.is_err());
2093        assert_eq!(
2094            res.unwrap_err().to_string(),
2095            "Cannot add operator. Service does not have top level partition. To delete operator from window partition, please specify window"
2096        )
2097    }
2098
2099    #[test]
2100    fn test_add_partition_operator_with_no_partition() {
2101        let mut service = Operations {
2102            name: "listing-to-prospect-op".to_string(),
2103            sources: vec![],
2104            sinks: vec![],
2105            transforms: Transforms { steps: vec![] },
2106            post_transforms: None,
2107            states: vec![],
2108        };
2109
2110        let operator_placement = OperatorPlacement {
2111            service_id: "listing-to-prospect-op".to_string(),
2112            window: false,
2113            partition: true,
2114            transforms_index: Some(0),
2115        };
2116
2117        let function = function();
2118
2119        let res = service.add_operator(OperatorType::Map, operator_placement, function);
2120
2121        assert!(res.is_err());
2122        assert_eq!(
2123            res.unwrap_err().to_string(),
2124            "Cannot add operator. Parition was specified but service does not have a partition"
2125        )
2126    }
2127
2128    #[test]
2129    fn test_add_window_operator() {
2130        let mut service = service_with_window();
2131
2132        let operator_placement = OperatorPlacement {
2133            service_id: "listing-to-prospect-op".to_string(),
2134            window: true,
2135            partition: false,
2136            transforms_index: Some(0),
2137        };
2138
2139        let function = function();
2140        let res = service.add_operator(OperatorType::Map, operator_placement, function);
2141
2142        let post_transforms = service.post_transforms.as_ref().unwrap();
2143        let window = match post_transforms {
2144            PostTransforms::AssignTimestamp(w) => w,
2145            _ => panic!("expected window"),
2146        };
2147
2148        assert!(res.is_ok());
2149        assert_eq!(window.transforms.steps.len(), 2);
2150    }
2151
2152    #[test]
2153    fn test_add_window_operator_when_partition_but_no_window() {
2154        let mut service = service_with_partition();
2155
2156        let operator_placement = OperatorPlacement {
2157            service_id: "listing-to-prospect-op".to_string(),
2158            window: true,
2159            partition: false,
2160            transforms_index: Some(0),
2161        };
2162
2163        let function = function();
2164        let res = service.add_operator(OperatorType::Map, operator_placement, function);
2165
2166        assert!(res.is_err());
2167        assert_eq!(
2168            res.unwrap_err().to_string(),
2169            "Cannot add operator. Window was specified but service does not have a window"
2170        )
2171    }
2172
2173    #[test]
2174    fn test_add_window_operator_when_no_window() {
2175        let mut service = Operations {
2176            name: "listing-to-prospect-op".to_string(),
2177            sources: vec![],
2178            sinks: vec![],
2179            transforms: Transforms { steps: vec![] },
2180            post_transforms: None,
2181            states: vec![],
2182        };
2183
2184        let operator_placement = OperatorPlacement {
2185            service_id: "listing-to-prospect-op".to_string(),
2186            window: true,
2187            partition: false,
2188            transforms_index: Some(0),
2189        };
2190
2191        let function = function();
2192        let res = service.add_operator(OperatorType::Map, operator_placement, function);
2193
2194        assert!(res.is_err());
2195        assert_eq!(
2196            res.unwrap_err().to_string(),
2197            "Cannot add operator. Window was specified but service does not have a window"
2198        )
2199    }
2200
2201    // fn test_add_assign_key() {
2202    //}
2203
2204    // fn test_add_assign_key_without_partition() {
2205    // should Fail
2206    // }
2207
2208    // fn test_add_assign_timestamp() {
2209    //}
2210
2211    // fn test_add_assign_timestamp_without_window() {
2212    // should Fail
2213    //}
2214
2215    // fn test_add_update_state() {
2216    //}
2217
2218    // fn test_add_update_state_without_state() {
2219    // should Fail
2220    //}
2221
2222    #[test]
2223    fn test_delete_operator_deletes_op_from_transforms() {
2224        let mut service = service_with_window();
2225
2226        let operator_placement = OperatorPlacement {
2227            service_id: "listing-to-prospect-op".to_string(),
2228            window: false,
2229            partition: false,
2230            transforms_index: Some(0),
2231        };
2232
2233        let res = service.delete_operator(operator_placement);
2234
2235        assert!(res.is_ok());
2236        assert_eq!(service.transforms.steps.len(), 1);
2237    }
2238
2239    #[test]
2240    fn test_delete_operator_deletes_op_from_partition() {
2241        let mut service = service_with_partition();
2242
2243        let operator_placement = OperatorPlacement {
2244            service_id: "listing-to-prospect-op".to_string(),
2245            window: false,
2246            partition: true,
2247            transforms_index: Some(0),
2248        };
2249
2250        let res = service.delete_operator(operator_placement);
2251
2252        let post_transforms = service.post_transforms.as_ref().unwrap();
2253        let partition = match post_transforms {
2254            PostTransforms::Partition(p) => p,
2255            _ => panic!("expected partition"),
2256        };
2257
2258        assert!(res.is_ok());
2259        assert_eq!(partition.transforms.steps.len(), 0);
2260
2261        // doesn't incorrectly delete something from transforms
2262        assert_eq!(service.transforms.steps.len(), 2)
2263    }
2264
2265    #[test]
2266    fn test_delete_partition_operator_when_window_incorrectly_specified() {
2267        let mut service = service_with_window();
2268
2269        let operator_placement = OperatorPlacement {
2270            service_id: "listing-to-prospect-op".to_string(),
2271            window: false,
2272            partition: true,
2273            transforms_index: Some(0),
2274        };
2275
2276        let res = service.delete_operator(operator_placement);
2277
2278        assert!(res.is_err());
2279        assert_eq!(
2280            res.unwrap_err().to_string(),
2281            "Cannot delete operator. Service does not have top level partition. To delete operator from window partition, please specify window"
2282        )
2283    }
2284
2285    #[test]
2286    fn test_delete_partition_operator_with_no_partition() {
2287        let mut service = Operations {
2288            name: "listing-to-prospect-op".to_string(),
2289            sources: vec![],
2290            sinks: vec![],
2291            transforms: Transforms { steps: vec![] },
2292            post_transforms: None,
2293            states: vec![],
2294        };
2295
2296        let operator_placement = OperatorPlacement {
2297            service_id: "listing-to-prospect-op".to_string(),
2298            window: false,
2299            partition: true,
2300            transforms_index: Some(0),
2301        };
2302
2303        let res = service.delete_operator(operator_placement);
2304
2305        assert!(res.is_err());
2306        assert_eq!(
2307            res.unwrap_err().to_string(),
2308            "Cannot delete operator. Parition was specified but service does not have a partition"
2309        )
2310    }
2311
2312    #[test]
2313    fn test_delete_operator_deletes_op_from_window() {
2314        let mut service = service_with_window();
2315
2316        let operator_placement = OperatorPlacement {
2317            service_id: "listing-to-prospect-op".to_string(),
2318            window: true,
2319            partition: false,
2320            transforms_index: Some(0),
2321        };
2322
2323        let res = service.delete_operator(operator_placement);
2324
2325        let post_transforms = service.post_transforms.as_ref().unwrap();
2326        let window = match post_transforms {
2327            PostTransforms::AssignTimestamp(w) => w,
2328            _ => panic!("expected window"),
2329        };
2330
2331        assert!(res.is_ok());
2332        assert_eq!(window.transforms.steps.len(), 0);
2333
2334        // doesn't incorrectly delete something from transforms
2335        assert_eq!(service.transforms.steps.len(), 2)
2336    }
2337
2338    #[test]
2339    fn test_delete_window_operator_when_partition_but_no_window() {
2340        let mut service = service_with_partition();
2341
2342        let operator_placement = OperatorPlacement {
2343            service_id: "listing-to-prospect-op".to_string(),
2344            window: true,
2345            partition: false,
2346            transforms_index: Some(0),
2347        };
2348
2349        let res = service.delete_operator(operator_placement);
2350
2351        assert!(res.is_err());
2352        assert_eq!(
2353            res.unwrap_err().to_string(),
2354            "Cannot delete operator. Window was specified but service does not have a window"
2355        )
2356    }
2357
2358    #[test]
2359    fn test_delete_window_operator_when_no_window() {
2360        let mut service = Operations {
2361            name: "listing-to-prospect-op".to_string(),
2362            sources: vec![],
2363            sinks: vec![],
2364            transforms: Transforms { steps: vec![] },
2365            post_transforms: None,
2366            states: vec![],
2367        };
2368
2369        let operator_placement = OperatorPlacement {
2370            service_id: "listing-to-prospect-op".to_string(),
2371            window: true,
2372            partition: false,
2373            transforms_index: Some(0),
2374        };
2375
2376        let res = service.delete_operator(operator_placement);
2377
2378        assert!(res.is_err());
2379        assert_eq!(
2380            res.unwrap_err().to_string(),
2381            "Cannot delete operator. Window was specified but service does not have a window"
2382        )
2383    }
2384}